redis stream 消费者组

nxdong December 25, 2022 [redis, stream] #redis #stream

redis 消费者组 python 使用

启动redis && 依赖包

docker run --rm --log-opt max-size=100m --log-opt max-file=2 -p 6379:6379 --name myredis -v ${PWD}/data:/data redis
pip install redis

生产者代码

import redis

def main():
    client = redis.Redis(host='localhost', port=6379, db=0)
    print("生产者 Client: ", client)
    topic_name = "mytopic"
    for i in range(20):
        msg_info = {
            'data':  bytes("123456".encode("utf-8")),
            'id': i,
        }
        msg_id = client.xadd(topic_name, msg_info)
        print("消息id:", msg_id)
        # 可以提前获取队列长度进行判断,以控制队列的数量,防止队列信息过多。
        msg_len = client.xlen(topic_name)
        print("消息队列长度:", msg_len)
        print("index:", i)
        input()

    stream_info = client.xinfo_stream(topic_name)
    print("当前队列信息:", stream_info)

if __name__ == '__main__':
    main()

消费者代码

import redis
import uuid

def main():
    client = redis.Redis(host='localhost', port=6379, db=0)
    print("消费者 Client: ", client)
    topic_name = "mytopic"
    group_name = "group_name"

    group_info = client.xinfo_groups(topic_name)
    print("组信息:", group_info)
    if len(group_info) != 1:
        ret = client.xgroup_create(topic_name, group_name, id=0)
        print("创建消费者组:", ret)
    print("流信息:", client.xinfo_stream(topic_name))
    print("消费者信息:", client.xinfo_consumers(topic_name, group_name))
    consumer_id = str(uuid.uuid1())
    # 消费者id 应当根据应用需要设置id.避免消费者组中出现大量无效的消费者。
    # 也可以通过查询消费者组的信息,可以删除 idle 时间过长的消费者。
    # 消费者信息: [{'name': b'5036f1e6-8464-11ed-aafb-00155d33e6fa', 'pending': 0, 'idle': 42293}]
    rets = client.xreadgroup(group_name, consumer_id, {
        topic_name: ">"}, count=1)
    for ret in rets:
        topic_name, infos = ret
        print("主题名: ", topic_name, " 主题信息:", infos)
        for info in infos:
            print("========")
            msg_id, msg_data = info
            print("消息 id:", msg_id, " 消息内容:", msg_data)
            ack_ret = client.xack(topic_name, group_name, msg_id)
            print("ack_ret:", ack_ret)
            # 是否删除要根据实际情况,如果message还需要被别的消费者消费,则不需要立即删除。
            # 如果需要释放空间,则需要删除消息。
            del_ret = client.xdel(topic_name, msg_id)
            print("删除消息返回值:", del_ret)
            print("当前消息队列长度:", client.xlen(topic_name))
            print("========")
    print("队列长度:", client.xlen(topic_name))


if __name__ == '__main__':
    main()
    # # 移除消费组
    # r.xgroup_destroy(topic_name, group_name)

参考

Redis Stream 介绍
Redis Stream tutorial
Redis Stream 命令列表