redis stream 消费者组
redis 消费者组 python 使用
启动redis && 依赖包
docker run --rm --log-opt max-size=100m --log-opt max-file=2 -p 6379:6379 --name myredis -v ${PWD}/data:/data redis
pip install redis
生产者代码
import redis
def main():
client = redis.Redis(host='localhost', port=6379, db=0)
print("生产者 Client: ", client)
topic_name = "mytopic"
for i in range(20):
msg_info = {
'data': bytes("123456".encode("utf-8")),
'id': i,
}
msg_id = client.xadd(topic_name, msg_info)
print("消息id:", msg_id)
# 可以提前获取队列长度进行判断,以控制队列的数量,防止队列信息过多。
msg_len = client.xlen(topic_name)
print("消息队列长度:", msg_len)
print("index:", i)
input()
stream_info = client.xinfo_stream(topic_name)
print("当前队列信息:", stream_info)
if __name__ == '__main__':
main()
消费者代码
import redis
import uuid
def main():
client = redis.Redis(host='localhost', port=6379, db=0)
print("消费者 Client: ", client)
topic_name = "mytopic"
group_name = "group_name"
group_info = client.xinfo_groups(topic_name)
print("组信息:", group_info)
if len(group_info) != 1:
ret = client.xgroup_create(topic_name, group_name, id=0)
print("创建消费者组:", ret)
print("流信息:", client.xinfo_stream(topic_name))
print("消费者信息:", client.xinfo_consumers(topic_name, group_name))
consumer_id = str(uuid.uuid1())
# 消费者id 应当根据应用需要设置id.避免消费者组中出现大量无效的消费者。
# 也可以通过查询消费者组的信息,可以删除 idle 时间过长的消费者。
# 消费者信息: [{'name': b'5036f1e6-8464-11ed-aafb-00155d33e6fa', 'pending': 0, 'idle': 42293}]
rets = client.xreadgroup(group_name, consumer_id, {
topic_name: ">"}, count=1)
for ret in rets:
topic_name, infos = ret
print("主题名: ", topic_name, " 主题信息:", infos)
for info in infos:
print("========")
msg_id, msg_data = info
print("消息 id:", msg_id, " 消息内容:", msg_data)
ack_ret = client.xack(topic_name, group_name, msg_id)
print("ack_ret:", ack_ret)
# 是否删除要根据实际情况,如果message还需要被别的消费者消费,则不需要立即删除。
# 如果需要释放空间,则需要删除消息。
del_ret = client.xdel(topic_name, msg_id)
print("删除消息返回值:", del_ret)
print("当前消息队列长度:", client.xlen(topic_name))
print("========")
print("队列长度:", client.xlen(topic_name))
if __name__ == '__main__':
main()
# # 移除消费组
# r.xgroup_destroy(topic_name, group_name)