35

使用 Redis 流实现消息队列

 4 years ago
source link: https://www.tuicool.com/articles/eQjeQfe
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Note

本文摘录自《Redis使用手册》, 更多信息请见: RedisGuide.com

在介绍了 Redis 流的基本功能之后, 现在是时候使用这些功能来构建一些实际的应用了。 消息队列作为流的典型应用之一, 具有非常好的示范性, 因此我们将使用 Redis 流的相关功能构建一个消息队列应用, 这个消息队列跟我们之前使用其他 Redis 数据结构构建的消息队列具有相似的功能。

代码清单 10-1 展示了一个具有基本功能的消息队列实现:

  • 代码最开头的是几个转换函数, 它们负责对程序的相关输入输出进行转换和格式化;

  • MessageQueue 类用于实现消息队列, 它的添加消息、移除消息以及返回消息数量三个方法分别使用了流的 XADD 命令、 XDEL 命令和 XLEN 命令;

  • 消息队列的两个获取方法 get_message()get_by_range() 分别以两种形式调用了流的 XRANGE 命令;

  • 最后, 用于迭代消息的 iterate() 方法使用了 XREAD 命令对流进行迭代。

代码清单 10-1 使用 Redis 流实现的消息队列: /stream/message_queue.py

def reconstruct_message_list(message_list):
    """
    为了让多条消息能够以更结构化的方式返回给调用者,
    将 Redis 返回的多条消息从原来的格式:
    [(id1, {k1:v1, k2:v2, ...}), (id2, {k1:v1, k2:v2, ...}), ...]
    转换成以下格式:
    [{id1: {k1:v1, k2:v2, ...}}, {id2: {k1:v1, k2:v2, ...}}, ...]
    """
    result = []
    for id, kvs in message_list:
        result.append({id: kvs})
    return result

def get_message_from_nested_list(lst):
    """
    从嵌套列表中取出消息本体。
    """
    return lst[0][1]


class MessageQueue:
    """
    使用 Redis 流实现的消息队列。
    """

    def __init__(self, client, stream_key):
        self.client = client
        self.stream = stream_key

    def add_message(self, key_value_pairs):
        """
        将给定的键值对存入到消息里面,并返回相应的消息 ID 。
        """
        return self.client.xadd(self.stream, key_value_pairs)

    def get_message(self, message_id):
        """
        根据给定的消息 ID 返回相应的消息,如果消息不存在则返回 None 。
        """
        reply = self.client.xrange(self.stream, message_id, message_id)
        if len(reply) == 1:
            return get_message_from_nested_list(reply)

    def remove_message(self, message_id):
        """
        根据给定的消息 ID 删除相应的消息,如果消息不存在则忽略该动作。
        """
        self.client.xdel(self.stream, message_id)

    def len(self):
        """
        返回消息队列的长度。
        """
        return self.client.xlen(self.stream)

    def get_by_range(self, start_id, end_id, max_item=10):
        """
        根据给定的 ID 区间范围返回队列中的消息。
        """
        reply = self.client.xrange(self.stream, start_id, end_id, max_item)
        return reconstruct_message_list(reply)

    def iterate(self, start_id=0, max_item=10):
        """
        对消息队列进行迭代,返回最多 N 条大于给定 ID 的消息。
        """
        reply = self.client.xread({self.stream: start_id}, max_item)
        if len(reply) == 0:
            return list()
        else:
            messages = get_message_from_nested_list(reply)
            return reconstruct_message_list(messages)

对于这个消息队列实现, 我们可以通过执行以下代码, 创建出它的实例:

>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> client = Redis(decode_responses=True)
>>> mq = MessageQueue(client, "mq")

然后通过执行以下代码, 向队列里面添加十条消息:

>>> for i in range(10):
...   key = "key{0}".format(i)
...   value = "value{0}".format(i)
...   msg = {key:value}
...   mq.add_message(msg)
...
'1554113926280-0'
'1554113926280-1'
'1554113926281-0'
'1554113926281-1'
'1554113926281-2'
'1554113926281-3'
'1554113926281-4'
'1554113926281-5'
'1554113926281-6'
'1554113926282-0'

还可以根据 ID 获取指定的消息, 又或者使用 get_by_range() 方法同时获取多条消息:

>>> mq.get_message('1554113926280-0')
{'key0': 'value0'}
>>> mq.get_message('1554113926280-1')
{'key1': 'value1'}
>>> mq.get_by_range("-", "+", 3)
[{'1554113926280-0': {'key0': 'value0'}}, {'1554113926280-1': {'key1': 'value1'}}, {'1554113926281-0': {'key2': 'value2'}}]

又或者使用 iterate() 方法对消息队列进行迭代, 等等:

>>> mq.iterate(0, 3)
[{'1554113926280-0': {'key0': 'value0'}}, {'1554113926280-1': {'key1': 'value1'}}, {'1554113926281-0': {'key2': 'value2'}}]
>>> mq.iterate('1554113926281-0', 3)
[{'1554113926281-1': {'key3': 'value3'}}, {'1554113926281-2': {'key4': 'value4'}}, {'1554113926281-3': {'key5': 'value5'}}]

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK