42

使用Redis实现延时任务(二)

 4 years ago
source link: https://www.tuicool.com/articles/vqauEvf
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

前一篇文章通过 Redis 的有序集合 Sorted Set 和调度框架 Quartz 实例一版简单的延时任务,但是有两个相对重要的问题没有解决:

  1. 分片。
  2. 监控。

这篇文章的内容就是要完善这两个方面的功能。前置文章: 使用Redis实现延时任务(一)

为什么需要分片

这里重新贴一下查询脚本 dequeue.lua 的内容:

-- 参考jesque的部分Lua脚本实现
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            -- unpack函数能把table转化为可变参数
            redis.call('ZREM', zset_key, unpack(list))
            local result = redis.call('HMGET', hash_key, unpack(list))
            redis.call('HDEL', hash_key, unpack(list))
            return result
        end
    end
end
return nil

这个脚本一共用到了四个命令 ZREVRANGEBYSCOREZREMHMGETHDELTYPE 命令的时间复杂度可以忽略):

命令 时间复杂度 参数说明 ZREVRANGEBYSCORE O(log(N)+M) N 是有序集合中的元素总数, M 是返回的元素的数量 ZREM O(M*log(N)) N 是有序集合中的元素总数, M 是成功移除的元素的数量 HMGET O(L) L 是成功返回的域的数量 HDEL O(L) L 是要删除的域的数量

接下来需要结合场景和具体参数分析,假如在生产环境,有序集合的元素总量维持在10000每小时(也就是说业务量是每小时下单1万笔),由于查询 Sorted SetHash 的数据同时做了删除,那么30分钟内常驻在这两个集合中的数据有5000条,也就是上面表中的 N = 5000 。假设我们初步定义查询的 LIMIT 值为100,也就是上面的 M 值为100,假设 Redis 中每个操作单元的耗时简单认为是 T ,那么分析一下5000条数据处理的耗时:

序号 集合基数 ZREVRANGEBYSCORE ZREM HMGET HDEL 1 5000 log(5000T) + 100T log(5000T) * 100 100T 100T 2 4900 log(4900T) + 100T log(4900T) * 100 100T 100T 3 4800 log(4800T) + 100T log(4800T) * 100 100T 100T … … … … … …

理论上,脚本用到的四个命令中, ZREM 命令的耗时是最大的,而 ZREVRANGEBYSCOREZREM 的时间复杂度函数都是 M * log(N) ,因此控制集合元素基数 N 对于降低 Lua 脚本运行的耗时是有一定帮助的。

分片

上面分析了 dequeue.lua 的时间复杂度,准备好的分片方案有两个:

  • 方案一:单 Redis 实例,对 Sorted SetHash 两个集合的数据进行分片。
  • 方案二:基于多个 Redis 实例(可以是哨兵或者集群),实施方案一的分片操作。

为了简单起见,后面的例子中分片的数量( shardingCount )设计为2 ,生产中分片数量应该根据实际情况定制。预设使用长整型的用户ID字段 userId 取模进行分片,假定测试数据中的 userId 是均匀分布的。

通用实体:

@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
    private String timestamp;
}

延迟队列接口:

public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index);

    List<OrderMessage> dequeue(int index);

    String enqueueSha();

    String dequeueSha();
}

单Redis实例分片

Redis 实例分片比较简单,示意图如下:

QjaMzaA.jpg!web

编写队列实现代码如下( 部分参数写死,仅供参考,切勿照搬到生产中 ):

@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {

    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    /**
     * 分片数量
     */
    private static final long SHARDING_COUNT = 2L;
    private static final String ORDER_QUEUE_PREFIX = "ORDER_QUEUE_";
    private static final String ORDER_DETAIL_QUEUE_PREFIX = "ORDER_DETAIL_QUEUE_";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();

    private final JedisProvider jedisProvider;

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        List<String> keys = Lists.newArrayList();
        long index = message.getUserId() % SHARDING_COUNT;
        keys.add(ORDER_QUEUE_PREFIX + index);
        keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
        try (Jedis jedis = jedisProvider.provide()) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(), keys, args);
        }
    }

    @Override
    public List<OrderMessage> dequeue(int index) {
        // 30分钟之前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE_PREFIX + index);
        keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
        try (Jedis jedis = jedisProvider.provide()) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), keys, args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    }

    @Override
    public String enqueueSha() {
        return ENQUEUE_LUA_SHA.get();
    }

    @Override
    public String dequeueSha() {
        return DEQUEUE_LUA_SHA.get();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 加载Lua脚本
        loadLuaScript();
    }

    private void loadLuaScript() throws Exception {
        try (Jedis jedis = jedisProvider.provide()) {
            ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
            String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            String sha = jedis.scriptLoad(luaContent);
            ENQUEUE_LUA_SHA.compareAndSet(null, sha);
            resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
            luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            sha = jedis.scriptLoad(luaContent);
            DEQUEUE_LUA_SHA.compareAndSet(null, sha);
        }
    }
}

消费者定时任务的实现如下:

DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    private static final AtomicInteger COUNTER = new AtomicInteger();
    /**
     * 初始化业务线程池
     */
    private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // 这里为了简单起见,分片的下标暂时使用Quartz的任务执行上下文存放
        int shardingIndex = context.getMergedJobDataMap().getInt("shardingIndex");
        LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]...", shardingIndex);
        List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
        if (null != dequeue) {
            final CountDownLatch latch = new CountDownLatch(1);
            BUSINESS_WORKER_POOL.execute(new ConsumeTask(latch, dequeue, shardingIndex));
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]...", shardingIndex);
    }

    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final CountDownLatch latch;
        private final List<OrderMessage> messages;
        private final int shardingIndex;

        @Override
        public void run() {
            try {
                for (OrderMessage message : messages) {
                    LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}", shardingIndex, JSON.toJSONString(message));
                    // 模拟耗时
                    TimeUnit.MILLISECONDS.sleep(50);
                }
            } catch (Exception ignore) {
            } finally {
                latch.countDown();
            }
        }
    }
}

启动定时任务和写入测试数据的 CommandLineRunner 实现如下:

@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void run(String... args) throws Exception {
        int shardingCount = 2;
        // 准备测试数据
        prepareOrderMessageData(shardingCount);
        for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
            scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
        }
    }

    private void prepareOrderMessageData(int shardingCount) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        try (Jedis jedis = jedisProvider.provide()) {
            List<OrderMessage> messages = Lists.newArrayList();
            for (int i = 0; i < 100; i++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(i));
                message.setOrderId("ORDER_ID_" + i);
                message.setUserId((long) i);
                message.setTimestamp(LocalDateTime.now().format(f));
                messages.add(message);
            }
            for (OrderMessage message : messages) {
                // 30分钟前
                Double score = Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
                long index = message.getUserId() % shardingCount;
                jedis.hset("ORDER_DETAIL_QUEUE_" + index, message.getOrderId(), JSON.toJSONString(message));
                jedis.zadd("ORDER_QUEUE_" + index, score, message.getOrderId());
            }
        }
    }

    private List<ConsumerTask> prepareConsumerTasks(int shardingCount) {
        List<ConsumerTask> tasks = Lists.newArrayList();
        for (int i = 0; i < shardingCount; i++) {
            JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
                    .withIdentity("OrderMessageConsumer-" + i, "DelayTask")
                    .usingJobData("shardingIndex", i)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
                    .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                    .build();
            tasks.add(new ConsumerTask(jobDetail, trigger));
        }
        return tasks;
    }

    @Getter
    @RequiredArgsConstructor
    private static class ConsumerTask {

        private final JobDetail jobDetail;
        private final Trigger trigger;
    }
}

启动应用,输出如下:

2019-08-28 00:13:20.648  INFO 50248 --- [           main] c.t.s.s.NoneJdbcSpringApplication        : Started NoneJdbcSpringApplication in 1.35 seconds (JVM running for 5.109)
2019-08-28 00:13:20.780  INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务开始执行,shardingIndex:[0]...
2019-08-28 00:13:20.781  INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务开始执行,shardingIndex:[1]...
2019-08-28 00:13:20.788  INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","timestamp":"2019-08-28 00:13:20.657","userId":99}
2019-08-28 00:13:20.788  INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","timestamp":"2019-08-28 00:13:20.657","userId":98}
2019-08-28 00:13:20.840  INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[1],处理订单消息,内容:{"amount":97,"orderId":"ORDER_ID_97","timestamp":"2019-08-28 00:13:20.657","userId":97}
2019-08-28 00:13:20.840  INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[0],处理订单消息,内容:{"amount":96,"orderId":"ORDER_ID_96","timestamp":"2019-08-28 00:13:20.657","userId":96}
// ... 省略大量输出
2019-08-28 00:13:21.298  INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务执行完毕,shardingIndex:[0]...
2019-08-28 00:13:21.298  INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务执行完毕,shardingIndex:[1]...
// ... 省略大量输出

多Redis实例分片

Redis 实例分片其实存在一个问题,就是 Redis 实例总是单线程处理客户端的命令,即使客户端是多个线程执行 Redis 命令,示意图如下:

6RZf2aR.jpg!web

这种情况下,虽然通过分片降低了 Lua 脚本命令的复杂度,但是 Redis 的命令处理模型( 单线程 )也有可能成为另一个性能瓶颈隐患。因此,可以考虑基于多 Redis 实例进行分片。

y2Y3Y3B.jpg!web

这里为了简单起见,用两个单点的 Redis 实例做编码示例。代码如下:

// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {

    private final Map<Long, JedisPool> pools = Maps.newConcurrentMap();
    private JedisPool defaultPool;

    @Override
    public void afterPropertiesSet() throws Exception {
        JedisPool pool = new JedisPool("localhost");
        defaultPool = pool;
        pools.put(0L, pool);
        // 这个是虚拟机上的redis实例
        pool = new JedisPool("192.168.56.200");
        pools.put(1L, pool);
    }

    public Jedis provide(Long index) {
        return pools.getOrDefault(index, defaultPool).getResource();
    }
}

// 订单消息
@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
}

// 订单延时队列接口
public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index);

    List<OrderMessage> dequeue(long index);

    String enqueueSha(long index);

    String dequeueSha(long index);
}

// 延时队列实现
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {

    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    private static final long SHARDING_COUNT = 2L;
    private static final String ORDER_QUEUE = "ORDER_QUEUE";
    private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final ConcurrentMap<Long, String> ENQUEUE_LUA_SHA = Maps.newConcurrentMap();
    private static final ConcurrentMap<Long, String> DEQUEUE_LUA_SHA = Maps.newConcurrentMap();

    private final JedisProvider jedisProvider;

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        List<String> keys = Lists.newArrayList();
        long index = message.getUserId() % SHARDING_COUNT;
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(index), keys, args);
        }
    }

    @Override
    public List<OrderMessage> dequeue(long index) {
        // 30分钟之前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    }

    @Override
    public String enqueueSha(long index) {
        return ENQUEUE_LUA_SHA.get(index);
    }

    @Override
    public String dequeueSha(long index) {
        return DEQUEUE_LUA_SHA.get(index);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 加载Lua脚本
        loadLuaScript();
    }

    private void loadLuaScript() throws Exception {
        for (long i = 0; i < SHARDING_COUNT; i++) {
            try (Jedis jedis = jedisProvider.provide(i)) {
                ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
                String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
                String sha = jedis.scriptLoad(luaContent);
                ENQUEUE_LUA_SHA.put(i, sha);
                resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
                luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
                sha = jedis.scriptLoad(luaContent);
                DEQUEUE_LUA_SHA.put(i, sha);
            }
        }
    }
}

// 消费者
public class OrderMessageConsumer implements Job {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    private static final AtomicInteger COUNTER = new AtomicInteger();
    // 初始化业务线程池
    private final ExecutorService businessWorkerPool = Executors.newSingleThreadExecutor(r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        long shardingIndex = context.getMergedJobDataMap().getLong("shardingIndex");
        LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]...", shardingIndex);
        List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
        if (null != dequeue) {
            // 这里的倒数栅栏,在线程池资源充足的前提下可以去掉
            final CountDownLatch latch = new CountDownLatch(1);
            businessWorkerPool.execute(new ConsumeTask(latch, dequeue, shardingIndex));
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]...", shardingIndex);
    }

    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final CountDownLatch latch;
        private final List<OrderMessage> messages;
        private final long shardingIndex;

        @Override
        public void run() {
            try {
                for (OrderMessage message : messages) {
                    LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}", shardingIndex, JSON.toJSONString(message));
                    // 模拟处理耗时50毫秒
                    TimeUnit.MILLISECONDS.sleep(50);
                }
            } catch (Exception ignore) {
            } finally {
                latch.countDown();
            }
        }
    }
}

// 配置
@Configuration
public class QuartzConfiguration {

    @Bean
    public AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory() {
        return new AutowiredSupportQuartzJobFactory();
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setSchedulerName("RamScheduler");
        factory.setAutoStartup(true);
        factory.setJobFactory(autowiredSupportQuartzJobFactory);
        return factory;
    }

    public static class AutowiredSupportQuartzJobFactory extends AdaptableJobFactory implements BeanFactoryAware {

        private AutowireCapableBeanFactory autowireCapableBeanFactory;

        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
        }

        @Override
        protected Object createJobInstance(@Nonnull TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            autowireCapableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }
}

// CommandLineRunner
@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void run(String... args) throws Exception {
        long shardingCount = 2;
        prepareData(shardingCount);
        for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
            scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
        }
    }

    private void prepareData(long shardingCount) {
        for (long i = 0L; i < shardingCount; i++) {
            Map<String, Double> z = Maps.newHashMap();
            Map<String, String> h = Maps.newHashMap();
            for (int k = 0; k < 100; k++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(k));
                message.setUserId((long) k);
                message.setOrderId("ORDER_ID_" + k);
                // 30 min ago
                z.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
                h.put(message.getOrderId(), JSON.toJSONString(message));
            }
            Jedis jedis = jedisProvider.provide(i);
            jedis.hmset("ORDER_DETAIL_QUEUE", h);
            jedis.zadd("ORDER_QUEUE", z);
        }
    }

    private List<ConsumerTask> prepareConsumerTasks(long shardingCount) {
        List<ConsumerTask> tasks = Lists.newArrayList();
        for (long i = 0; i < shardingCount; i++) {
            JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
                    .withIdentity("OrderMessageConsumer-" + i, "DelayTask")
                    .usingJobData("shardingIndex", i)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
                    .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                    .build();
            tasks.add(new ConsumerTask(jobDetail, trigger));
        }
        return tasks;
    }

    @Getter
    @RequiredArgsConstructor
    private static class ConsumerTask {

        private final JobDetail jobDetail;
        private final Trigger trigger;
    }
}

新增一个启动函数并且启动,控制台输出如下:

// ...省略大量输出
2019-09-01 14:08:27.664  INFO 13056 --- [           main] c.t.multi.NoneJdbcSpringApplication      : Started NoneJdbcSpringApplication in 1.333 seconds (JVM running for 5.352)
2019-09-01 14:08:27.724  INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务开始执行,shardingIndex:[1]...
2019-09-01 14:08:27.724  INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务开始执行,shardingIndex:[0]...
2019-09-01 14:08:27.732  INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99}
2019-09-01 14:08:27.732  INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[0],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99}
2019-09-01 14:08:27.782  INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
2019-09-01 14:08:27.782  INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[1],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
// ...省略大量输出
2019-09-01 14:08:28.239  INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务执行完毕,shardingIndex:[1]...
2019-09-01 14:08:28.240  INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务执行完毕,shardingIndex:[0]...
// ...省略大量输出

生产中应该避免 Redis 服务单点,一般常用 哨兵配合树状主从的部署方式(参考《Redis开发与运维》) ,2套 Redis 哨兵的部署示意图如下:

2UzQreZ.jpg!web

需要什么监控项

我们需要相对实时地知道 Redis 中的延时队列集合有多少积压数据,每次出队的耗时大概是多少等等监控项参数,这样我们才能更好地知道延时队列模块是否正常运行、是否存在性能瓶颈等等。具体的监控项,需要按需定制,这里为了方便举例,只做两个监控项的监控:

Sorted Set
dequeue.lua

采用的是应用实时上报数据的方式,依赖于 spring-boot-starter-actuatorPrometheusGrafana 搭建的监控体系,如果并不熟悉这个体系可以看两篇前置文章:

监控

引入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
    <version>1.2.0</version>
</dependency>

这里选用 GaugeMeter 进行监控数据收集,添加监控类 OrderDelayQueueMonitor :。

// OrderDelayQueueMonitor
@Component
public class OrderDelayQueueMonitor implements InitializingBean {

    private static final long SHARDING_COUNT = 2L;
    private final ConcurrentMap<Long, AtomicLong> remain = Maps.newConcurrentMap();
    private final ConcurrentMap<Long, AtomicLong> lua = Maps.newConcurrentMap();
    private ScheduledExecutorService executor;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void afterPropertiesSet() throws Exception {
        executor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, "OrderDelayQueueMonitor");
            thread.setDaemon(true);
            return thread;
        });
        for (long i = 0L; i < SHARDING_COUNT; i++) {
            AtomicLong l = new AtomicLong();
            Metrics.gauge("order.delay.queue.lua.cost", Collections.singleton(Tag.of("index", String.valueOf(i))),
                    l, AtomicLong::get);
            lua.put(i, l);
            AtomicLong r = new AtomicLong();
            Metrics.gauge("order.delay.queue.remain", Collections.singleton(Tag.of("index", String.valueOf(i))),
                    r, AtomicLong::get);
            remain.put(i, r);
        }
        // 每五秒上报一次集合中的剩余数据
        executor.scheduleWithFixedDelay(new MonitorTask(jedisProvider), 0, 5, TimeUnit.SECONDS);
    }

    public void recordRemain(Long index, long count) {
        remain.get(index).set(count);
    }

    public void recordLuaCost(Long index, long count) {
        lua.get(index).set(count);
    }

    @RequiredArgsConstructor
    private class MonitorTask implements Runnable {

        private final JedisProvider jedisProvider;

        @Override
        public void run() {
            for (long i = 0L; i < SHARDING_COUNT; i++) {
                try (Jedis jedis = jedisProvider.provide(i)) {
                    recordRemain(i, jedis.zcount("ORDER_QUEUE", "-inf", "+inf"));
                }
            }
        }
    }
}

原来的 RedisOrderDelayQueue#dequeue() 进行改造:

@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
    
    // ... 省略没有改动的代码
    private final OrderDelayQueueMonitor orderDelayQueueMonitor;

    // ... 省略没有改动的代码

    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            long start = System.nanoTime();
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
            long end = System.nanoTime();
            // 添加dequeue的耗时监控-单位微秒
            orderDelayQueueMonitor.recordLuaCost(index, TimeUnit.NANOSECONDS.toMicros(end - start));
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    } 

    // ... 省略没有改动的代码

}

其他配置这里简单说一下。

application.yaml 要开放 prometheus 端点的访问权限:

server:
  port: 9091
management:
  endpoints:
    web:
      exposure:
        include: 'prometheus'

Prometheus 服务配置尽量减少查询的间隔时间,暂定为5秒:

# my global config
global:
  scrape_interval:     5s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'
    metrics_path: '/actuator/prometheus'
    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
    static_configs:
    - targets: ['localhost:9091']

Grafana 的基本配置项如下:

出队耗时 order_delay_queue_lua_cost 分片编号-{{index}}
订单延时队列积压量 order_delay_queue_remain 分片编号-{{index}}

最终可以在 Grafana 配置每5秒刷新,见效果如下:

vquqqy7.jpg!web

这里的监控项更多时候应该按需定制,说实话,监控的工作往往是最复杂和繁琐的。

小结

全文相对详细地介绍了基于 Redis 实现延时任务的分片和监控的具体实施过程,核心代码仅供参考,还有一些具体的细节例如 PrometheusGrafana 的一些应用,这里限于篇幅不会详细地展开。说实话,基于实际场景做一次中间件和架构的选型并不是一件简单的事,而且往往初期的实施并不是最大的难点,更大的难题在后面的优化以及监控。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK