写BUG的派大星

Patrick Star

  • 首页
  • 归档

  • 搜索
设计模式 Gis Kafka Druid 微信小程序 Java 开源项目源码 物体识别 机器学习 Mybatis 微服务 Feign OpenVPN CSS Streamsets CDH SpringCloud SpringBoot maven 分布式 Shell Tree Linux js WebSocket 多线程 集群 Hadoop 大数据 JDK ElasticSearch MySQL 数据库 Redis Http Nginx

Python使用kafka生产或消费消息

发表于 2020-10-29 | 分类于 Python | 0 | 阅读次数 989

安装依赖

pip install kafka-python

Producer 发送消息

from kafka.producer import KafkaProducer

producer = KafkaProducer(bootstrap_servers = ['192.168.0.210:9092', '192.168.0.211:9092', '192.168.0.212:9092'])


def on_send_success(*args, **kwargs):
    """
    发送成功的回调函数
    :param args:
    :param kwargs:
    :return:
    """
    print("发送成功的回调函数")
    return args


def on_send_error(*args, **kwargs):
    """
    发送失败的回调函数
    :param args:
    :param kwargs:
    :return:
    """
    print("发送失败的回调函数")
    return args


def prcoess_kafka(message):
    topic = 'test'
    producer.send(topic, bytes(str(message), encoding='utf8')).add_callback(on_send_success).add_errback(on_send_error)
    producer.flush()
    producer.close()

if __name__ == '__main__':
    message = '========================='
    prcoess_kafka(message)

这里要说明的是如果直接调用producer.send(),没有增加回调函数的话,可能会由于主线程的退出而导致消息发送失败。 这是由于在发送消息时,Python采用了新开一个线程处理,此时消息是放在缓冲区的。 只有调用了producer.flush()才会将消息真正的退出去。 既然是异步的,就会有主线程退出而发送失败的问题。

增加回调函数可以阻塞主线程,等待回调处理完成后才会退出。也就可以保证程序正常发送消息了。

Consumer 接受消息

from kafka import KafkaConsumer

consumer = KafkaConsumer('unattended.history_vehicle',group_id='testgroupid', bootstrap_servers=['192.168.0.210:9092','192.168.0.211:9092','192.168.0.212:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))

指定了topic、group_id、brokers(bootstrap_servers)等信息后就可以正常发送消息了。

  • 本文作者: Patrick
  • 本文链接: https://www.write1bug.cn/archives/python使用kafka生产或消费消息
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 设计模式 # Gis # Kafka # Druid # 微信小程序 # Java # 开源项目源码 # 物体识别 # 机器学习 # Mybatis # 微服务 # Feign # OpenVPN # CSS # Streamsets # CDH # SpringCloud # SpringBoot # maven # 分布式 # Shell # Tree # Linux # js # WebSocket # 多线程 # 集群 # Hadoop # 大数据 # JDK # ElasticSearch # MySQL # 数据库 # Redis # Http # Nginx
Druid中SQLStatement相关的源码阅读(用于修改SQL中的某些属性)
Mybatis Log plugin + logback配置
  • 文章目录
  • 站点概览
Patrick

Patrick

不是在改BUG,就是在写BUG。

52 日志
9 分类
36 标签
RSS
E-mail
Creative Commons
© 2018 — 2023 Patrick
人生如逆旅|我亦是行人
鲁ICP备18043140号-1