27

Redis Stream | wsztrush

 4 years ago
source link: http://wsztrush.com/2019/10/18/redis-stream/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Redis Stream

2019-10-18

  |   工具

  |   0 Comments

  |   511

消息中间件内部用的都是MetaQ,非常标准的生产、消费的用法,但是感觉有几点不太方便(这几点可能普通场景确实没什么必要):

  • 能够根据时间消费过去的消息;
  • 能阻塞读;
  • 能方便地增加或减少Topic;
  • 能方便地把数据发送到指定的Topic;

总体来说:做性能比较高的队列,可以方便的把增量数据中转到各个节点上。网上逛发现Redis 5的Stream貌似看起来挺符合的。

  • XADD:唯一的向Stream中插入数据的命令,传入多个KEY-VALUE,返回Stream ID(递增唯一键,可以手工指定):
    • XADD key ID field string [field string ...]
    • XADD mystream * name Sara surname OConnor
    • XADD mystream * field1 value1 field2 value2 field3 value3
    • XADD mystream MAXLEN ~ 1000 * ... entry fields here ...(设置最大长度)
  • XLEN:Stream中的数量,Stream不存在时返回0;
  • XRANGE:根据Stream ID返回一定范围内记录(闭区间);
    • XRANGE key start end [COUNT count]
    • XRANGE writers - + COUNT 2
    • XRANGE somestream 1526985054069 1526985055069(仅使用Stream ID的第一部分)
    • XRANGE mystream 1526984818136-0 1526984818136-0
  • XREVRANGE:XRANGE的倒序;
  • XREAD:支持从一个或多个流中读取数据,且支持阻塞;
    • XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
    • XREAD COUNT 2 STREAMS mystream writers 0-0 0-0(同时从两个流里面拉数据)
    • XREAD BLOCK 1000 STREAMS mystream 1526999626221-0
    • XREAD BLOCK 5000 COUNT 100 STREAMS mystream $(阻塞等待新的消息)
  • XTRIM:把Stream里面老的数据删掉;
    • XTRIM key MAXLEN [~] count
  • XDEL:指定删除数据;

组相关(暂时用不到先简单记下):

  • XGROUP
    • XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • XPENDING
    • XPENDING key group [start end count] [consumer]
  • XREADGROUP
    • XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • XCLAIM
    • XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
  • XACK
    • XACK key group ID [ID ...]
  • XINFO
    • XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

下载后安装,修改redis.conf(不然远程连不上):

#bind 127.0.0.1
protected-mode no

然后就可以启动服务了./redis-server ../redis.conf

需要的Maven依赖:

<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-all</artifactId>
<version>3.9.1</version>
</dependency>
  • 没有redisson会报错java.lang.NoClassDefFoundError: reactor/core/scheduler/Schedulers,而依赖io.projectreactor:reactor-core和springboot 1.5好像又冲突了🤔
  • 据说lettuce比jedis性能更高:
    • jedis是直连模式,且非线程安全;
    • lettuce基于netty,且线程安全;

开始撸代码

在lettuce比较新的版本里面对Redis 5的接口有封装,方法名字跟Redis命令长差不多:

public static String test(int count) {
RedisClient client = RedisClient.create(RedisURI.create("xx.xx.xx.xx", 6379));
StatefulRedisConnection<String, String> connection = client.connect();
RedisCommands<String, String> commands = connection.sync();
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
String ret = commands.xadd("my_stream_0001", Collections.singletonMap("" + i, "" + i));
return "COST:" + (System.currentTimeMillis() - start);

对Redis整体的执行流程还不懂,先针对Stream部分硬着头皮看看吧,大概能看明白点的地方先做个笔记😂

%E6%95%B0%E6%8D%AE%E7%BB%93%E6%9E%84.svg
  • 每个listpack都有一个master entry来存储所有的field,后面的消息如果有相同的field就不需要再次存储了(既灵活又省空间);
  • 保存新消息时会找到最后一个rax node并拿到对应的listpack,如果可以的话共用同一个listpack。所以就导致了listpack可能存在多个消息(为了减少内存的碎片?);

基数树(或者叫压缩前缀树)用来做前缀匹配比较快,先看一个字典树(前缀树)的例子:

%E5%AD%97%E5%85%B8%E6%A0%91.svg

其中包含的字符串有:

  • ACFGH
  • ACFGI

很简单但也很强大:

  • 能够快速的检索
  • 自带排序:通过每一层有序地遍历就可以顺序的输出

也有缺点:

  • 访问存储的次数太多,如果结构保存在磁盘上就跪下了
  • 指针占用了不少空间

基数树则是用一个非常简单的道理做了一个非常简单的优化:

如果子节点是父节点唯一的子节点,那么该子节点可以与其父节点合并。实际在操作时并不是合并,而是构建时分裂。

压缩后如下图:

%E5%9F%BA%E6%95%B0%E6%A0%91.svg

XREAD

比较好奇BLOCK是怎么实现的,入口在xreadCommand方法:

void xreadCommand(client *c) {
// ....
for (int i = 0; i < streams_count; i++) {
robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
if (o == NULL) continue;
stream *s = o->ptr;
streamID *gt = ids+i;
int serve_synchronously = 0;
// 判断是否有数据,如果有数据则设置 serve_synchronously = 1
if (groups) {
if (gt->ms != UINT64_MAX || gt->seq != UINT64_MAX) {
serve_synchronously = 1;
serve_history = 1;
} else {
streamID *last = &groups[i]->last_id;
if (s->length && (streamCompareID(&s->last_id, last) > 0)) {
serve_synchronously = 1;
*gt = *last;
} else {
if (s->length && (streamCompareID(&s->last_id, gt) > 0)) {
serve_synchronously = 1;
if (serve_synchronously) {
// ....
// 根据范围读取数据
streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL,
consumer, flags, &spi);
if (groups) server.dirty++;
// 如果能读到数据(多个流里面随便有点就行),就不会到这里了
if (timeout != -1) {
// ...
// 把客户端与相应的事件关联(同时保存参数)
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
timeout, NULL, ids);
// ...

大概的步骤是:

  • 判断是否已经有数据能读了;
    • 有,streamReplyWithRange直接返回数据(如果同时在读多个流,只要有一个流里面有数据就会返回);
    • 无,把客户端挂起来;

再来看消息的写入:

void xaddCommand(client *c) {
/* ... */
/* 保存数据 */
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
&id, id_given ? &id : NULL) == C_ERR) {
addReplyError(c,"The ID specified in XADD is equal or smaller than the "
"target stream top item");
return;
addReplyStreamID(c,&id);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
/* 当数量超过限制,开始删除数据 */
if (maxlen >= 0) {
if (streamTrimByLength(s,maxlen,approx_maxlen)) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
/* ... */
/* 通知阻塞读的客户端 */
if (server.blocked_clients_by_type[BLOCKED_STREAM])
signalKeyAsReady(c->db, c->argv[1]);

数据结构处理的核心逻辑在streamAppendItem里面,可以对照上面的数据结构来看;


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK