44

Redis 内置了一个 Kafka:Stream

 5 years ago
source link: https://mp.weixin.qq.com/s/14uzvTveOoN3u3Y6XO_rUA?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

流(Stream)是Redis从5.0.0版本新加入的一个数据结构,是一个类似于Kafka的消息系统。该结构相关的大部分命令使用字母 X 开头 如 XADD , XLEN , XRANGE 等。

在开始详细叙述之前,先说明一下:

本文内容主要是结合官网文章Introduction to Redis Streams 和个人理解整理而成。另外想吐槽下自己,游戏玩多了也不太好, Stream 总是不自觉的写成了 Steam

向Stream中添加数据

命令格式如

返回的 1541558444516-0 是消息id。 可以看到上面的命令ID参数传的是 * , 代表由系统生成的ID,当然,你可以显式的指定消息ID(基本不太常用),对于这个ID有几点需要注意:

  • ID必须是由  - 连接的两部分,前一部分默认情况下是当前Server当前的毫秒时间戳,后一部分是一个无符号64位长整型序列号;

  • 同一个KEY下,后加入的ID一定要比已加入的ID大。

获取Stream长度

使用 XLEN , 如

从Stream中获取数据

有三种方式从Stream中获取数据

按照范围查询

包括 XRANGEXREVRANGE 两个命令 ,分别是正序和反序, 以正序 XRANGE 为例:

从Stream这个key中返回ID范围是 startend[前count个] 数据。 如

- , + 分别表示最小和最大ID。

监听(XREAD)

XREAD 命令

执行完成后再执行:

发现Stream的长度没有变化,也就是说, XREAD 不会删除Stream里的数据。

上面的这个例子是一个非阻塞的方式监听。当使用 BLOCK 参数,并传递一个超时时间(0为永不超时),将启动一个阻塞方式的监听。 特殊的ID $ 表示从最新的ID开始监听。如:

启动一个监听客户端:

该命令阻塞等待, 此时另起一个客户端:

等待的客户端收到消息:

如果有多个客户端都在监听同一个流,这些客户端都可以得到流中的数据。

消费者组 (Consumer Group)

机制说明

涉及三个命令 分别是

  • XGROUP : 创建或者销毁一个 Consumer Group, 也可以从Consumer Group中删除一个 Consumer

  • XREADGROUP : 指定 Consumer Group 中的一个Consumer,消费一条消息

  • XACK : 在  XREADGROUP 调用时不指定  NOACK 时需要显式调用  XACK 命令 来确认该消息已被正确处理,可以删除。

消费者组的消费方式可以用下图表示

一个消息 msg 可通过 group1 和 group2 分发,并且 group1 中的 msg 会被 consumer1 或者 consumer2 消费,group2 中的 msg 会被 consumer3 或者 comsumer4 消费。

创建/消费/确认

使用 XGROUP 命令创建一个consumer group,如

这样就创建了一个名为foocg1的 consumer group, 其中 $ 表示该组将要消费当前时间开始的消息,然后我们向Stream中添加一些消息:

此时,使用foocg1下的c1消费者来消费一条消息

其中最后的ID字段 指定为 > , 表示只获取那些从来没有被分发的消息。

我们继续消费一条消息

然后,再消费历史上所有的数据

注意这里ID传的是 0-0 , 此时会发现消费的是第一条消息。也就是说,没有经过XACK的消息依旧会保留在队列中。

执行 XACK 操作:

此时再去消费历史数据

发现已经获取不到被 XACK 的消息了,当所有的历史数据全部被 XACK 后:

一个伪码表示的客户端

一个消费者组的实现的伪码表示可以写作:

XPENDING 和 XCLAIM

XPENDING 可以获取消息系统中已经分发但是未被 XACK 的消息的情况

如:

表示有8个未确认消息,最小ID是"1541573732130-0",最大ID是"1541581755413-0", 其中c1 消费者有8个未确认的消息。传递start, end, count参数可以获取指定范围指定数目的未确认消息的详细信息,传递consumer可获取指定consumer下未确认信息列表。

XCLAIM 可以将未被确认的消息重新声明给其他消费者

如下面命令可以获取到一条原属于c1的消息未被确认:

下面命令可以将将原本属于 消费者c1 的消息 1541581753377-0 在等待确认的时间>30000情况下重新声明给c2

此时

XPENDINGXCLAIM 可以用来处理当一个消费者获取到一个消息后,运行失败导致无法执行 XACK ,此时这个消息就永远不会进行 确认已消费 操作的情形。

其他命令

  • XINFO 可以查看流的一些信息

  • XTRIM 可以获得一个有长度上限的Stream

  • XDEL 可以从Stream中删除消息

这些命令可以在官网找到详细的说明,这里就不再赘述了。

其他说明

  1. Stream支持AOF和RDB格式的持久化

  2. 当调用XDEL等造成Stream长度为0时,为了保留可能存在的Consumer Group信息,Stream不会被删除。

  3. Redis Cluster场景下,由于Key存在于单节点下,所以同一个流的所有消息也会位于同一个节点下。

  4. 由于同一个Stream(Key)下的所有消息位于同一节点,类比Kafka分区更像是使用多个Key形成多个Stream来处理本质上是同一类消息的一个Stream,而不是Stream下的Consumer Group。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK