nxdong December 25, 2022 [redis, stream] #redis #stream
redis 消费者组 python 使用
启动redis && 依赖包
docker run --rm --log-opt max-size=100m --log-opt max-file=2 -p 6379:6379 --name myredis -v ${PWD}/data:/data redis
pip install redis
生产者代码
import redis
def main():
client = redis.Redis(host='localhost', port=6379, db=0)
print("生产者 Client: ", client)
topic_name = "mytopic"
for i in range(20):
msg_info = {
'data': bytes("123456".encode("utf-8")),
'id': i,
}
msg_id = client.xadd(topic_name, msg_info)
print("消息id:", msg_id)
msg_len = client.xlen(topic_name)
print("消息队列长度:", msg_len)
print("index:", i)
input()
stream_info = client.xinfo_stream(topic_name)
print("当前队列信息:", stream_info)
if __name__ == '__main__':
main()
消费者代码
import redis
import uuid
def main():
client = redis.Redis(host='localhost', port=6379, db=0)
print("消费者 Client: ", client)
topic_name = "mytopic"
group_name = "group_name"
group_info = client.xinfo_groups(topic_name)
print("组信息:", group_info)
if len(group_info) != 1:
ret = client.xgroup_create(topic_name, group_name, id=0)
print("创建消费者组:", ret)
print("流信息:", client.xinfo_stream(topic_name))
print("消费者信息:", client.xinfo_consumers(topic_name, group_name))
consumer_id = str(uuid.uuid1())
rets = client.xreadgroup(group_name, consumer_id, {
topic_name: ">"}, count=1)
for ret in rets:
topic_name, infos = ret
print("主题名: ", topic_name, " 主题信息:", infos)
for info in infos:
print("========")
msg_id, msg_data = info
print("消息 id:", msg_id, " 消息内容:", msg_data)
ack_ret = client.xack(topic_name, group_name, msg_id)
print("ack_ret:", ack_ret)
del_ret = client.xdel(topic_name, msg_id)
print("删除消息返回值:", del_ret)
print("当前消息队列长度:", client.xlen(topic_name))
print("========")
print("队列长度:", client.xlen(topic_name))
if __name__ == '__main__':
main()
参考
Redis Stream 介绍
Redis Stream tutorial
Redis Stream 命令列表