104

多姿势扩展 Redis 命令 - 云+社区 - 腾讯云

 6 years ago
source link: https://cloud.tencent.com/developer/article/1005722?fromSource=gwzcw.706002.706002.706002
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

多姿势扩展 Redis 命令

修改于2017-12-08 09:09:42阅读 2K0

一、业务场景

空间宠物业务需要实现一个定时消息触发组件,如在特定时刻给用户推送收集糖果通知、biubiu球功能定时回收用户丢弃的球等。可见,消息只有在特定时间到达才能被处理。同时,消息的产生是无序的,即后产生的消息被处理的时间可能早于先产生的消息。

二、为何选择Redis

一些著名的消息队列组件,如ActiveMQ ,本身支持消息延迟投递,为何本文选择Redis呢?一方面是引入新组建有学习、运维、接入成本,而组内已积累一定Redis开发运维经验;另一方面则是基于Redis实现这样一个组件难度也不大。所以决定采用Redis。

三、原生能力探究

键空间通知

键空间通知可以在消息到达时插入一个key,并给key设置过期时间,键过期后会通过特定频道发布键过期通知,订阅方可收到通知并处理事件。但问题在于:

  • key过期并不保证立即删除,Redis只会每次执行server.c:databasesCron时随机删除若干key,大量key同时过期无法保证时效;
  • Pub/Sub机制不保证通知送达,若client掉线则通知丢失;
  • 若多个client同时订阅,则都会收到通知,导致重复处理。

基于原生ZSET

ZSET可在消息插入时根据score排序,从而使最早的消息排在最前面。但ZSET没有提供POP方法,取得第一个元素和删除需要执行两个命令。为保证原子性,可以采用事务,如:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> ZRANGE myzset 0 0 WITHSCORES
QUEUED
127.0.0.1:6379> ZREMRANGEBYRANK myzset 0 0
QUEUED
127.0.0.1:6379> EXEC
1) 1) "b"
   2) "2"
2) (integer) 1

或者使用pipelining,如:

$ (printf "ZRANGE myzset 0 0 WITHSCORES\r\nZREMRANGEBYRANK myzset 0 0\r\n"; sleep 1) | nc localhost 6379
*2
$1
c
$1
3
:1

但问题在于,虽然可顺序取出消息,但无法只在时间到达后取出消息。因此需要client端实现逻辑等待时间到达再推送。同时,消息产生是无序的,如果取得了一个10分钟后处理的消息,在此期间又产生了一个需要在5分钟后处理的消息,逻辑将变得复杂。

由于使用原生Redis无法满足需求,我们决定扩展Redis命令。

四、多姿势命令扩展

LUA脚本是利用3.X版官方特性实现命令扩展的途径。以下脚本将读出首元素,并与当前时间戳(以参数传入)比较,如果消息处理时间到达则删除消息并返回;所有操作将是原子的。目前我们线上服务使用该方案。

LUA脚本:

local rs = redis.call('ZRANGE', KEYS[1], '0', '0', 'WITHSCORES');
if table.getn(rs)<2 then return rs end;
if tonumber(rs[2]) < tonumber(ARGV[1]) then
    redis.call('ZREMRANGEBYRANK', KEYS[1], 0, 0);
    return rs
end;
return {}

client生成命令:

redisFormatCommand(&pCmd, "eval %s 1 %s %lld", szScript, szKey, (int64_t)time(NULL)));
  • 使用lua脚本有额外学习成本
  • 实现在客户端,无法很好的复用
  • 使用lua后要做好运维工作,配置脚本超时,注意脚本缓存内存占用

改源码,加一个命令。我们较早上线的一个服务使用了该方案。

/* 需要在server.c中加入实现的命令:
struct redisCommand redisCommandTable[] = {
    //......
    {"zlpopif",zlpopifCommand,3,"w",0,NULL,1,1,1,0,0},
    {"zrpopif",zrpopifCommand,3,"w",0,NULL,1,1,1,0,0},
};
*/

/* 实现在t_zset.c: */
void zpopGenericCommand(client *c, int reverse, int condition) {
	robj *key = c->argv[1];
	robj *zobj;
	int keyremoved = 0;
	unsigned long deleted = 0;
	long start = 0, end = 0, llen = 0;

	/* for deletion */
	unsigned char *zleptr, *zlsptr;

	/* for addReply */
	unsigned char zlvstr[128];
	unsigned int zlvlen = 0;
	long long zlvlong = 0;
	robj *slele;
	double node_score;

	/* Step 1: Lookup & range sanity checks if needed. */
	if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
		checkType(c,zobj,OBJ_ZSET)) return;

	llen = zsetLength(zobj);
	if (end >= llen) end = llen-1;

	if (start > end || start >= llen) {
		return;
	}

	/* Step 2: Get value of the node will be remove */
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		unsigned char *zl = zobj->ptr;
		unsigned char *vstr;

		if (reverse)
			zleptr = ziplistIndex(zl,-2-(2*start));
		else
			zleptr = ziplistIndex(zl,2*start);

		serverAssertWithInfo(c,zobj,zleptr != NULL);
		zlsptr = ziplistNext(zl,zleptr);

		serverAssertWithInfo(c,zobj,zleptr != NULL && zlsptr != NULL);
		serverAssertWithInfo(c,zobj,ziplistGet(zleptr,&vstr,&zlvlen,&zlvlong));

		/* copy the result, sice the node will be delete before addReply */
		node_score = zzlGetScore(zlsptr);
		if (vstr)
			strncpy((char *)zlvstr, (char *)vstr, zlvlen);
	} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
		zset *zs = zobj->ptr;
		zskiplist *zsl = zs->zsl;
		zskiplistNode *ln;

		/* Check if starting point is trivial, before doing log(N) lookup. */
		if (reverse) {
			ln = zsl->tail;
		} else {
			ln = zsl->header->level[0].forward;
		}

		serverAssertWithInfo(c,zobj,ln != NULL);
		slele = ln->obj;
		incrRefCount(slele); /* MUST call decrRefCount to free mem */
		node_score = ln->score;
	} else {
		serverPanic("Unknown sorted set encoding");
	}

	/* Step 3: Check if condition satisfied. */
	if (condition) {
		double condscore = 0;
		if (getDoubleFromObjectOrReply(c,c->argv[2],&condscore,NULL)
			!= C_OK) goto cleanup;

		if (!reverse && condscore < node_score) {
			addReply(c,shared.emptymultibulk);
			goto cleanup;
		}
		if (reverse && condscore > node_score) {
			addReply(c,shared.emptymultibulk);
			goto cleanup;
		}
	}

	/* Step 4: Perform the deletion operation. */
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		/* delete by ptr */
		serverAssertWithInfo(c,zobj,zleptr != NULL);
        zobj->ptr = zzlDelete(zobj->ptr,zleptr);
        deleted = 1;
        /* delete by range */
		/*zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);*/

		if (zzlLength(zobj->ptr) == 0) {
			dbDelete(c->db,key);
			keyremoved = 1;
		}
	} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
		zset *zs = zobj->ptr;

		/* delete by ptr */
		/*serverAssertWithInfo(c,zobj,slele != NULL);
		serverAssertWithInfo(c,zobj,zslDelete(zs->zsl,node_score,slele));
		dictDelete(zs->dict,slele);
		deleted = 1;*/
		/* delete by range */
		deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);

		if (htNeedsResize(zs->dict)) dictResize(zs->dict);
		if (dictSize(zs->dict) == 0) {
			dbDelete(c->db,key);
			keyremoved = 1;
		}
	} else {
		serverPanic("Unknown sorted set encoding");
	}

	/* Step 5: Notifications and reply. */
	if (deleted) {
		signalModifiedKey(c->db,key);
		notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); /* we reuse the built in "zrem" keyspace event for pop operation! */
		if (keyremoved)
			notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
		server.dirty += deleted;
	}

	/* Step 6: Return the result in form of a multi-bulk reply */
	if (deleted) {
		addReplyMultiBulkLen(c, 2); /* at most one element with score */
		if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
			if (zlvlen == 0)
				addReplyBulkLongLong(c,zlvlong);
			else
				addReplyBulkCBuffer(c,zlvstr,zlvlen);
			addReplyDouble(c,node_score);
		} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
			addReplyBulk(c,slele);
			addReplyDouble(c,node_score);
		}
	} else {
		addReply(c,shared.emptymultibulk);
	}

cleanup:;
	if (zobj->encoding == OBJ_ENCODING_SKIPLIST)
		decrRefCount(slele);
}

void zlpopifCommand(client *c) {
	zpopGenericCommand(c, 0, 1);
}

void zrpopifCommand(client *c) {
	zpopGenericCommand(c, 1, 1);
}

缺点是:后续官方更新都需要改代码。

使用[Redis 4.0模块实现。此处是GitHub传送门。

相比前两种方法,此方法逻辑收归在服务端,且不需要修改Redis源码便于升级。但需要注意资源释放、复制机制等细节,谨防踩坑。

五、修改源码、实现模块后一些问题

1 . 兼容性:要求所有从机、或加载AOF/RDB的实例均实现了新的命令,即均为修改版Redis或均加载了扩展模块。

2 . 命令写入AOF和从机的时机:

  • 对于3.2.X使用LUA法,默认复制脚本本身,但可以使Redis仅复 制导致变更的命令而非整个命令,参考脚本中有关”Replicating commands instead of scripts”和”Selective replication of commands”的内容。
  • 对于3.2.X版本修改源码法,在server.c:call中,仅当有变更设置dirty变量值大于0时,才会触发命令传播,因此如果命令没有成功pop元素将不会产生命令传播。
  • 对于4.0 modules,我们的实现中使用了低级API,则需要实现中根据需要调用RedisModule_ReplicateVerbatim复制命令。

3 . 消息处理失败处理:ZSET中消息被pop后才被client取得处理,若client处理失败则需要client在保证幂等的前提下自行重试。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK