nxdong   December 25, 2022   [redis, stream] #redis  #stream 
    
redis 消费者组 python 使用
启动redis && 依赖包
docker run --rm --log-opt max-size=100m --log-opt max-file=2 -p 6379:6379 --name myredis -v ${PWD}/data:/data redis
pip install redis
生产者代码
import redis
def main():
    client = redis.Redis(host='localhost', port=6379, db=0)
    print("生产者 Client: ", client)
    topic_name = "mytopic"
    for i in range(20):
        msg_info = {
            'data':  bytes("123456".encode("utf-8")),
            'id': i,
        }
        msg_id = client.xadd(topic_name, msg_info)
        print("消息id:", msg_id)
                msg_len = client.xlen(topic_name)
        print("消息队列长度:", msg_len)
        print("index:", i)
        input()
    stream_info = client.xinfo_stream(topic_name)
    print("当前队列信息:", stream_info)
if __name__ == '__main__':
    main()
消费者代码
import redis
import uuid
def main():
    client = redis.Redis(host='localhost', port=6379, db=0)
    print("消费者 Client: ", client)
    topic_name = "mytopic"
    group_name = "group_name"
    group_info = client.xinfo_groups(topic_name)
    print("组信息:", group_info)
    if len(group_info) != 1:
        ret = client.xgroup_create(topic_name, group_name, id=0)
        print("创建消费者组:", ret)
    print("流信息:", client.xinfo_stream(topic_name))
    print("消费者信息:", client.xinfo_consumers(topic_name, group_name))
    consumer_id = str(uuid.uuid1())
                rets = client.xreadgroup(group_name, consumer_id, {
        topic_name: ">"}, count=1)
    for ret in rets:
        topic_name, infos = ret
        print("主题名: ", topic_name, " 主题信息:", infos)
        for info in infos:
            print("========")
            msg_id, msg_data = info
            print("消息 id:", msg_id, " 消息内容:", msg_data)
            ack_ret = client.xack(topic_name, group_name, msg_id)
            print("ack_ret:", ack_ret)
                                    del_ret = client.xdel(topic_name, msg_id)
            print("删除消息返回值:", del_ret)
            print("当前消息队列长度:", client.xlen(topic_name))
            print("========")
    print("队列长度:", client.xlen(topic_name))
if __name__ == '__main__':
    main()
        
参考
Redis Stream 介绍
Redis Stream tutorial
Redis Stream 命令列表