使用Redis实现延时任务(二)
source link: https://www.tuicool.com/articles/vqauEvf
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
前一篇文章通过 Redis
的有序集合 Sorted Set
和调度框架 Quartz
实例一版简单的延时任务,但是有两个相对重要的问题没有解决:
- 分片。
- 监控。
这篇文章的内容就是要完善这两个方面的功能。前置文章: 使用Redis实现延时任务(一) 。
为什么需要分片
这里重新贴一下查询脚本 dequeue.lua
的内容:
-- 参考jesque的部分Lua脚本实现 local zset_key = KEYS[1] local hash_key = KEYS[2] local min_score = ARGV[1] local max_score = ARGV[2] local offset = ARGV[3] local limit = ARGV[4] -- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代 local status, type = next(redis.call('TYPE', zset_key)) if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit) if list ~= nil and #list > 0 then -- unpack函数能把table转化为可变参数 redis.call('ZREM', zset_key, unpack(list)) local result = redis.call('HMGET', hash_key, unpack(list)) redis.call('HDEL', hash_key, unpack(list)) return result end end end return nil
这个脚本一共用到了四个命令 ZREVRANGEBYSCORE
、 ZREM
、 HMGET
和 HDEL
( TYPE
命令的时间复杂度可以忽略):
ZREVRANGEBYSCORE
O(log(N)+M)
N
是有序集合中的元素总数, M
是返回的元素的数量
ZREM
O(M*log(N))
N
是有序集合中的元素总数, M
是成功移除的元素的数量
HMGET
O(L)
L
是成功返回的域的数量
HDEL
O(L)
L
是要删除的域的数量
接下来需要结合场景和具体参数分析,假如在生产环境,有序集合的元素总量维持在10000每小时(也就是说业务量是每小时下单1万笔),由于查询 Sorted Set
和 Hash
的数据同时做了删除,那么30分钟内常驻在这两个集合中的数据有5000条,也就是上面表中的 N = 5000
。假设我们初步定义查询的 LIMIT
值为100,也就是上面的 M
值为100,假设 Redis
中每个操作单元的耗时简单认为是 T
,那么分析一下5000条数据处理的耗时:
ZREVRANGEBYSCORE
ZREM
HMGET
HDEL
1
5000
log(5000T) + 100T
log(5000T) * 100
100T
100T
2
4900
log(4900T) + 100T
log(4900T) * 100
100T
100T
3
4800
log(4800T) + 100T
log(4800T) * 100
100T
100T
…
…
…
…
…
…
理论上,脚本用到的四个命令中, ZREM
命令的耗时是最大的,而 ZREVRANGEBYSCORE
和 ZREM
的时间复杂度函数都是 M * log(N)
,因此控制集合元素基数 N
对于降低 Lua
脚本运行的耗时是有一定帮助的。
分片
上面分析了 dequeue.lua
的时间复杂度,准备好的分片方案有两个:
- 方案一:单
Redis
实例,对Sorted Set
和Hash
两个集合的数据进行分片。 - 方案二:基于多个
Redis
实例(可以是哨兵或者集群),实施方案一的分片操作。
为了简单起见,后面的例子中分片的数量( shardingCount
)设计为2 ,生产中分片数量应该根据实际情况定制。预设使用长整型的用户ID字段 userId
取模进行分片,假定测试数据中的 userId
是均匀分布的。
通用实体:
@Data public class OrderMessage { private String orderId; private BigDecimal amount; private Long userId; private String timestamp; }
延迟队列接口:
public interface OrderDelayQueue { void enqueue(OrderMessage message); List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index); List<OrderMessage> dequeue(int index); String enqueueSha(); String dequeueSha(); }
单Redis实例分片
单 Redis
实例分片比较简单,示意图如下:
编写队列实现代码如下( 部分参数写死,仅供参考,切勿照搬到生产中 ):
@RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean { private static final String MIN_SCORE = "0"; private static final String OFFSET = "0"; private static final String LIMIT = "10"; /** * 分片数量 */ private static final long SHARDING_COUNT = 2L; private static final String ORDER_QUEUE_PREFIX = "ORDER_QUEUE_"; private static final String ORDER_DETAIL_QUEUE_PREFIX = "ORDER_DETAIL_QUEUE_"; private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua"; private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua"; private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>(); private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>(); private final JedisProvider jedisProvider; @Override public void enqueue(OrderMessage message) { List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); args.add(String.valueOf(System.currentTimeMillis())); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); List<String> keys = Lists.newArrayList(); long index = message.getUserId() % SHARDING_COUNT; keys.add(ORDER_QUEUE_PREFIX + index); keys.add(ORDER_DETAIL_QUEUE_PREFIX + index); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), keys, args); } } @Override public List<OrderMessage> dequeue(int index) { // 30分钟之前 String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000); return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index); } @SuppressWarnings("unchecked") @Override public List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); List<String> keys = Lists.newArrayList(); keys.add(ORDER_QUEUE_PREFIX + index); keys.add(ORDER_DETAIL_QUEUE_PREFIX + index); try (Jedis jedis = jedisProvider.provide()) { List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), keys, args); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } @Override public String enqueueSha() { return ENQUEUE_LUA_SHA.get(); } @Override public String dequeueSha() { return DEQUEUE_LUA_SHA.get(); } @Override public void afterPropertiesSet() throws Exception { // 加载Lua脚本 loadLuaScript(); } private void loadLuaScript() throws Exception { try (Jedis jedis = jedisProvider.provide()) { ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION); String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); String sha = jedis.scriptLoad(luaContent); ENQUEUE_LUA_SHA.compareAndSet(null, sha); resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.compareAndSet(null, sha); } } }
消费者定时任务的实现如下:
DisallowConcurrentExecution @Component public class OrderMessageConsumer implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class); private static final AtomicInteger COUNTER = new AtomicInteger(); /** * 初始化业务线程池 */ private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement()); return thread; }); @Autowired private OrderDelayQueue orderDelayQueue; @Override public void execute(JobExecutionContext context) throws JobExecutionException { // 这里为了简单起见,分片的下标暂时使用Quartz的任务执行上下文存放 int shardingIndex = context.getMergedJobDataMap().getInt("shardingIndex"); LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]...", shardingIndex); List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex); if (null != dequeue) { final CountDownLatch latch = new CountDownLatch(1); BUSINESS_WORKER_POOL.execute(new ConsumeTask(latch, dequeue, shardingIndex)); try { latch.await(); } catch (InterruptedException ignore) { //ignore } } LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]...", shardingIndex); } @RequiredArgsConstructor private static class ConsumeTask implements Runnable { private final CountDownLatch latch; private final List<OrderMessage> messages; private final int shardingIndex; @Override public void run() { try { for (OrderMessage message : messages) { LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}", shardingIndex, JSON.toJSONString(message)); // 模拟耗时 TimeUnit.MILLISECONDS.sleep(50); } } catch (Exception ignore) { } finally { latch.countDown(); } } } }
启动定时任务和写入测试数据的 CommandLineRunner
实现如下:
@Component public class QuartzJobStartCommandLineRunner implements CommandLineRunner { @Autowired private Scheduler scheduler; @Autowired private JedisProvider jedisProvider; @Override public void run(String... args) throws Exception { int shardingCount = 2; // 准备测试数据 prepareOrderMessageData(shardingCount); for (ConsumerTask task : prepareConsumerTasks(shardingCount)) { scheduler.scheduleJob(task.getJobDetail(), task.getTrigger()); } } private void prepareOrderMessageData(int shardingCount) throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); try (Jedis jedis = jedisProvider.provide()) { List<OrderMessage> messages = Lists.newArrayList(); for (int i = 0; i < 100; i++) { OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(i)); message.setOrderId("ORDER_ID_" + i); message.setUserId((long) i); message.setTimestamp(LocalDateTime.now().format(f)); messages.add(message); } for (OrderMessage message : messages) { // 30分钟前 Double score = Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)); long index = message.getUserId() % shardingCount; jedis.hset("ORDER_DETAIL_QUEUE_" + index, message.getOrderId(), JSON.toJSONString(message)); jedis.zadd("ORDER_QUEUE_" + index, score, message.getOrderId()); } } } private List<ConsumerTask> prepareConsumerTasks(int shardingCount) { List<ConsumerTask> tasks = Lists.newArrayList(); for (int i = 0; i < shardingCount; i++) { JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class) .withIdentity("OrderMessageConsumer-" + i, "DelayTask") .usingJobData("shardingIndex", i) .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask") .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever()) .build(); tasks.add(new ConsumerTask(jobDetail, trigger)); } return tasks; } @Getter @RequiredArgsConstructor private static class ConsumerTask { private final JobDetail jobDetail; private final Trigger trigger; } }
启动应用,输出如下:
2019-08-28 00:13:20.648 INFO 50248 --- [ main] c.t.s.s.NoneJdbcSpringApplication : Started NoneJdbcSpringApplication in 1.35 seconds (JVM running for 5.109) 2019-08-28 00:13:20.780 INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[0]... 2019-08-28 00:13:20.781 INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[1]... 2019-08-28 00:13:20.788 INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","timestamp":"2019-08-28 00:13:20.657","userId":99} 2019-08-28 00:13:20.788 INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","timestamp":"2019-08-28 00:13:20.657","userId":98} 2019-08-28 00:13:20.840 INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":97,"orderId":"ORDER_ID_97","timestamp":"2019-08-28 00:13:20.657","userId":97} 2019-08-28 00:13:20.840 INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":96,"orderId":"ORDER_ID_96","timestamp":"2019-08-28 00:13:20.657","userId":96} // ... 省略大量输出 2019-08-28 00:13:21.298 INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[0]... 2019-08-28 00:13:21.298 INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[1]... // ... 省略大量输出
多Redis实例分片
单 Redis
实例分片其实存在一个问题,就是 Redis
实例总是单线程处理客户端的命令,即使客户端是多个线程执行 Redis
命令,示意图如下:
这种情况下,虽然通过分片降低了 Lua
脚本命令的复杂度,但是 Redis
的命令处理模型( 单线程 )也有可能成为另一个性能瓶颈隐患。因此,可以考虑基于多 Redis
实例进行分片。
这里为了简单起见,用两个单点的 Redis
实例做编码示例。代码如下:
// Jedis提供者 @Component public class JedisProvider implements InitializingBean { private final Map<Long, JedisPool> pools = Maps.newConcurrentMap(); private JedisPool defaultPool; @Override public void afterPropertiesSet() throws Exception { JedisPool pool = new JedisPool("localhost"); defaultPool = pool; pools.put(0L, pool); // 这个是虚拟机上的redis实例 pool = new JedisPool("192.168.56.200"); pools.put(1L, pool); } public Jedis provide(Long index) { return pools.getOrDefault(index, defaultPool).getResource(); } } // 订单消息 @Data public class OrderMessage { private String orderId; private BigDecimal amount; private Long userId; } // 订单延时队列接口 public interface OrderDelayQueue { void enqueue(OrderMessage message); List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index); List<OrderMessage> dequeue(long index); String enqueueSha(long index); String dequeueSha(long index); } // 延时队列实现 @RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean { private static final String MIN_SCORE = "0"; private static final String OFFSET = "0"; private static final String LIMIT = "10"; private static final long SHARDING_COUNT = 2L; private static final String ORDER_QUEUE = "ORDER_QUEUE"; private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE"; private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua"; private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua"; private static final ConcurrentMap<Long, String> ENQUEUE_LUA_SHA = Maps.newConcurrentMap(); private static final ConcurrentMap<Long, String> DEQUEUE_LUA_SHA = Maps.newConcurrentMap(); private final JedisProvider jedisProvider; @Override public void enqueue(OrderMessage message) { List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); List<String> keys = Lists.newArrayList(); long index = message.getUserId() % SHARDING_COUNT; keys.add(ORDER_QUEUE); keys.add(ORDER_DETAIL_QUEUE); try (Jedis jedis = jedisProvider.provide(index)) { jedis.evalsha(ENQUEUE_LUA_SHA.get(index), keys, args); } } @Override public List<OrderMessage> dequeue(long index) { // 30分钟之前 String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000); return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index); } @SuppressWarnings("unchecked") @Override public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); List<String> keys = Lists.newArrayList(); keys.add(ORDER_QUEUE); keys.add(ORDER_DETAIL_QUEUE); try (Jedis jedis = jedisProvider.provide(index)) { List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } @Override public String enqueueSha(long index) { return ENQUEUE_LUA_SHA.get(index); } @Override public String dequeueSha(long index) { return DEQUEUE_LUA_SHA.get(index); } @Override public void afterPropertiesSet() throws Exception { // 加载Lua脚本 loadLuaScript(); } private void loadLuaScript() throws Exception { for (long i = 0; i < SHARDING_COUNT; i++) { try (Jedis jedis = jedisProvider.provide(i)) { ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION); String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); String sha = jedis.scriptLoad(luaContent); ENQUEUE_LUA_SHA.put(i, sha); resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.put(i, sha); } } } } // 消费者 public class OrderMessageConsumer implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class); private static final AtomicInteger COUNTER = new AtomicInteger(); // 初始化业务线程池 private final ExecutorService businessWorkerPool = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement()); return thread; }); @Autowired private OrderDelayQueue orderDelayQueue; @Override public void execute(JobExecutionContext context) throws JobExecutionException { long shardingIndex = context.getMergedJobDataMap().getLong("shardingIndex"); LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]...", shardingIndex); List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex); if (null != dequeue) { // 这里的倒数栅栏,在线程池资源充足的前提下可以去掉 final CountDownLatch latch = new CountDownLatch(1); businessWorkerPool.execute(new ConsumeTask(latch, dequeue, shardingIndex)); try { latch.await(); } catch (InterruptedException ignore) { //ignore } } LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]...", shardingIndex); } @RequiredArgsConstructor private static class ConsumeTask implements Runnable { private final CountDownLatch latch; private final List<OrderMessage> messages; private final long shardingIndex; @Override public void run() { try { for (OrderMessage message : messages) { LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}", shardingIndex, JSON.toJSONString(message)); // 模拟处理耗时50毫秒 TimeUnit.MILLISECONDS.sleep(50); } } catch (Exception ignore) { } finally { latch.countDown(); } } } } // 配置 @Configuration public class QuartzConfiguration { @Bean public AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory() { return new AutowiredSupportQuartzJobFactory(); } @Bean public SchedulerFactoryBean schedulerFactoryBean(AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory) { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setSchedulerName("RamScheduler"); factory.setAutoStartup(true); factory.setJobFactory(autowiredSupportQuartzJobFactory); return factory; } public static class AutowiredSupportQuartzJobFactory extends AdaptableJobFactory implements BeanFactoryAware { private AutowireCapableBeanFactory autowireCapableBeanFactory; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory; } @Override protected Object createJobInstance(@Nonnull TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } } } // CommandLineRunner @Component public class QuartzJobStartCommandLineRunner implements CommandLineRunner { @Autowired private Scheduler scheduler; @Autowired private JedisProvider jedisProvider; @Override public void run(String... args) throws Exception { long shardingCount = 2; prepareData(shardingCount); for (ConsumerTask task : prepareConsumerTasks(shardingCount)) { scheduler.scheduleJob(task.getJobDetail(), task.getTrigger()); } } private void prepareData(long shardingCount) { for (long i = 0L; i < shardingCount; i++) { Map<String, Double> z = Maps.newHashMap(); Map<String, String> h = Maps.newHashMap(); for (int k = 0; k < 100; k++) { OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(k)); message.setUserId((long) k); message.setOrderId("ORDER_ID_" + k); // 30 min ago z.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000))); h.put(message.getOrderId(), JSON.toJSONString(message)); } Jedis jedis = jedisProvider.provide(i); jedis.hmset("ORDER_DETAIL_QUEUE", h); jedis.zadd("ORDER_QUEUE", z); } } private List<ConsumerTask> prepareConsumerTasks(long shardingCount) { List<ConsumerTask> tasks = Lists.newArrayList(); for (long i = 0; i < shardingCount; i++) { JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class) .withIdentity("OrderMessageConsumer-" + i, "DelayTask") .usingJobData("shardingIndex", i) .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask") .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever()) .build(); tasks.add(new ConsumerTask(jobDetail, trigger)); } return tasks; } @Getter @RequiredArgsConstructor private static class ConsumerTask { private final JobDetail jobDetail; private final Trigger trigger; } }
新增一个启动函数并且启动,控制台输出如下:
// ...省略大量输出 2019-09-01 14:08:27.664 INFO 13056 --- [ main] c.t.multi.NoneJdbcSpringApplication : Started NoneJdbcSpringApplication in 1.333 seconds (JVM running for 5.352) 2019-09-01 14:08:27.724 INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[1]... 2019-09-01 14:08:27.724 INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[0]... 2019-09-01 14:08:27.732 INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99} 2019-09-01 14:08:27.732 INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99} 2019-09-01 14:08:27.782 INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98} 2019-09-01 14:08:27.782 INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98} // ...省略大量输出 2019-09-01 14:08:28.239 INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[1]... 2019-09-01 14:08:28.240 INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[0]... // ...省略大量输出
生产中应该避免 Redis
服务单点,一般常用 哨兵配合树状主从的部署方式(参考《Redis开发与运维》) ,2套 Redis
哨兵的部署示意图如下:
需要什么监控项
我们需要相对实时地知道 Redis
中的延时队列集合有多少积压数据,每次出队的耗时大概是多少等等监控项参数,这样我们才能更好地知道延时队列模块是否正常运行、是否存在性能瓶颈等等。具体的监控项,需要按需定制,这里为了方便举例,只做两个监控项的监控:
Sorted Set dequeue.lua
采用的是应用实时上报数据的方式,依赖于 spring-boot-starter-actuator
、 Prometheus
、 Grafana
搭建的监控体系,如果并不熟悉这个体系可以看两篇前置文章:
监控
引入依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> <version>1.2.0</version> </dependency>
这里选用 Gauge
的 Meter
进行监控数据收集,添加监控类 OrderDelayQueueMonitor
:。
// OrderDelayQueueMonitor @Component public class OrderDelayQueueMonitor implements InitializingBean { private static final long SHARDING_COUNT = 2L; private final ConcurrentMap<Long, AtomicLong> remain = Maps.newConcurrentMap(); private final ConcurrentMap<Long, AtomicLong> lua = Maps.newConcurrentMap(); private ScheduledExecutorService executor; @Autowired private JedisProvider jedisProvider; @Override public void afterPropertiesSet() throws Exception { executor = Executors.newSingleThreadScheduledExecutor(r -> { Thread thread = new Thread(r, "OrderDelayQueueMonitor"); thread.setDaemon(true); return thread; }); for (long i = 0L; i < SHARDING_COUNT; i++) { AtomicLong l = new AtomicLong(); Metrics.gauge("order.delay.queue.lua.cost", Collections.singleton(Tag.of("index", String.valueOf(i))), l, AtomicLong::get); lua.put(i, l); AtomicLong r = new AtomicLong(); Metrics.gauge("order.delay.queue.remain", Collections.singleton(Tag.of("index", String.valueOf(i))), r, AtomicLong::get); remain.put(i, r); } // 每五秒上报一次集合中的剩余数据 executor.scheduleWithFixedDelay(new MonitorTask(jedisProvider), 0, 5, TimeUnit.SECONDS); } public void recordRemain(Long index, long count) { remain.get(index).set(count); } public void recordLuaCost(Long index, long count) { lua.get(index).set(count); } @RequiredArgsConstructor private class MonitorTask implements Runnable { private final JedisProvider jedisProvider; @Override public void run() { for (long i = 0L; i < SHARDING_COUNT; i++) { try (Jedis jedis = jedisProvider.provide(i)) { recordRemain(i, jedis.zcount("ORDER_QUEUE", "-inf", "+inf")); } } } } }
原来的 RedisOrderDelayQueue#dequeue()
进行改造:
@RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean { // ... 省略没有改动的代码 private final OrderDelayQueueMonitor orderDelayQueueMonitor; // ... 省略没有改动的代码 @Override public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); List<String> keys = Lists.newArrayList(); keys.add(ORDER_QUEUE); keys.add(ORDER_DETAIL_QUEUE); try (Jedis jedis = jedisProvider.provide(index)) { long start = System.nanoTime(); List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args); long end = System.nanoTime(); // 添加dequeue的耗时监控-单位微秒 orderDelayQueueMonitor.recordLuaCost(index, TimeUnit.NANOSECONDS.toMicros(end - start)); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } // ... 省略没有改动的代码 }
其他配置这里简单说一下。
application.yaml
要开放 prometheus
端点的访问权限:
server: port: 9091 management: endpoints: web: exposure: include: 'prometheus'
Prometheus
服务配置尽量减少查询的间隔时间,暂定为5秒:
# my global config global: scrape_interval: 5s # Set the scrape interval to every 15 seconds. Default is every 1 minute. evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute. # scrape_timeout is set to the global default (10s). # Alertmanager configuration alerting: alertmanagers: - static_configs: - targets: # - alertmanager:9093 # Load rules once and periodically evaluate them according to the global 'evaluation_interval'. rule_files: # - "first_rules.yml" # - "second_rules.yml" # A scrape configuration containing exactly one endpoint to scrape: # Here it's Prometheus itself. scrape_configs: # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config. - job_name: 'prometheus' metrics_path: '/actuator/prometheus' # metrics_path defaults to '/metrics' # scheme defaults to 'http'. static_configs: - targets: ['localhost:9091']
Grafana
的基本配置项如下:
出队耗时 order_delay_queue_lua_cost 分片编号-{{index}} 订单延时队列积压量 order_delay_queue_remain 分片编号-{{index}}
最终可以在 Grafana
配置每5秒刷新,见效果如下:
这里的监控项更多时候应该按需定制,说实话,监控的工作往往是最复杂和繁琐的。
小结
全文相对详细地介绍了基于 Redis
实现延时任务的分片和监控的具体实施过程,核心代码仅供参考,还有一些具体的细节例如 Prometheus
、 Grafana
的一些应用,这里限于篇幅不会详细地展开。说实话,基于实际场景做一次中间件和架构的选型并不是一件简单的事,而且往往初期的实施并不是最大的难点,更大的难题在后面的优化以及监控。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK