安装依赖
pip install kafka-python
Producer 发送消息
from kafka.producer import KafkaProducer
producer = KafkaProducer(bootstrap_servers = ['192.168.0.210:9092', '192.168.0.211:9092', '192.168.0.212:9092'])
def on_send_success(*args, **kwargs):
"""
发送成功的回调函数
:param args:
:param kwargs:
:return:
"""
print("发送成功的回调函数")
return args
def on_send_error(*args, **kwargs):
"""
发送失败的回调函数
:param args:
:param kwargs:
:return:
"""
print("发送失败的回调函数")
return args
def prcoess_kafka(message):
topic = 'test'
producer.send(topic, bytes(str(message), encoding='utf8')).add_callback(on_send_success).add_errback(on_send_error)
producer.flush()
producer.close()
if __name__ == '__main__':
message = '========================='
prcoess_kafka(message)
这里要说明的是如果直接调用producer.send()
,没有增加回调函数的话,可能会由于主线程的退出而导致消息发送失败。 这是由于在发送消息时,Python采用了新开一个线程处理,此时消息是放在缓冲区的。 只有调用了producer.flush()
才会将消息真正的退出去。 既然是异步的,就会有主线程退出而发送失败的问题。
增加回调函数可以阻塞主线程,等待回调处理完成后才会退出。也就可以保证程序正常发送消息了。
Consumer 接受消息
from kafka import KafkaConsumer
consumer = KafkaConsumer('unattended.history_vehicle',group_id='testgroupid', bootstrap_servers=['192.168.0.210:9092','192.168.0.211:9092','192.168.0.212:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
指定了topic
、group_id
、brokers(bootstrap_servers)
等信息后就可以正常发送消息了。