5

退避算法实现之客户端优雅回调

 3 years ago
source link: http://www.cnblogs.com/yougewe/p/13615445.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

针对有些耗时比较长的任务,我们一般会想到使用异步化的方式来进行优化逻辑。即客户端先发起一次任务请求并携带回调地址callbackUrl,然后服务端收到请求后立即返回成功,然后在后台处理具体事务,等任务完成后再回调客户端,通知完成。

首先这个方案是值得肯定的,但是我们得注意几点:1. 客户端回调是否可靠?2. 是否接受客户端的主动查询,从而从另一角度弥补各种环境的不确定性?

实际上,要提供一个状态查询的服务很简单,只需查询具体状态值返回即可。但要实现一个可靠的回调却是有点难度的,今天我们就来提供一个实现思路和实现,希望能帮助到需要的同学。

1. 要实现的目标

需要先给自己定个小目标,否则就没了方向。总体上是:要求稳定、可靠、不积压。细化如下:

1. 正常情况下能够及时通知到客户端结果状态;

2. 客户端服务短暂异常的情况下,仍然能够接到通知;

3. 服务端服务短暂异常的情况下,仍然能推送结果到客户端;

4. 网络环境异常时,仍然能尽可能通知到客户端;

5. 服务端回调尽量不要积压太多;

2. 实现思路

要达到以上目标,我们主要做的事也就相应出来了:

1. 使用重试机制保证尽量通知;

2. 使用次数限制保证积压不会太严重;

尽管看起来只是一个重试而已,但如果控制不好,要么给自己带来巨大的服务压力,要么就是进行无效地重试。所以,我们使用一种退避算法,提供一些重试测试,保证重试的合理性。

具体点说就是,回调失败后会进行重试,但每次重试都会有一定的延时控制,越往后延时越大,直到达到最大重试次数后结束。比如:

1. 第1次回调失败后,设置下一个回调时间间隔为30秒;

2. 第2次回调也失败后,设置下一个回调时间间隔为1分钟;

3. 第3次回调也失败后,设置下一个回调时间间隔为3分钟;

...

那么退避策略配置就为 30/60/180...

另外,我们需要借助于db的持久化,保证回调的可靠性,不至于因为机器宕机而丢失回调信息。

3. 具体代码实现

我们将此实现全部封装到一个类中,对外仅暴露一个 submitNewJobCallbackTask() 方法。如下:

import com.alibaba.fastjson.JSONObject;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import com.github.pagehelper.PageHelper;
import com.my.common.util.HttpUtils;
import com.my.common.util.SleepUtil;
import com.my.enums.CallbackStatusEnum;
import com.my.dao.entity.DistributeLock;
import com.my.dao.entity.JobDataCallbackInfo;
import com.my.dao.mapper.JobDataCallbackInfoMapper;
import com.my.model.enums.DataJobStatus;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.util.NamedThreadFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

/**
 * 功能描述: 查询结果成功执行回调处理任务
 *
 */
@Log4j2
@Component
public class ResultCallbackWorker implements Runnable {

    @Resource
    private JobDataCallbackInfoMapper jobDataCallbackInfoMapper;

    @Resource
    private LockService lockService;


    /**
     * 正在运行的回调任务容器,方便进行close
     */
    private CallbackTaskWrapperContainer
                runningCallbacksContainer = new CallbackTaskWrapperContainer();

    /**
     * 执行回调的线程池(队列无限,需外部限制)
     */
    private ScheduledExecutorService executorService
                    = new ScheduledThreadPoolExecutor(4,
                            new NamedThreadFactory("JobCallbackWorker"),
                            new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * 自动运行该任务
     */
    @PostConstruct
    public void init() {
        new Thread(this,
                "ResultCallbackWorker")
                .start();
    }

    @Override
    public void run() {
        Random random = new Random();
        int baseSleepSec = 50;
        int maxRangeSec = 120;
        while (!Thread.currentThread().isInterrupted()) {
            // dataNumLevel 代表任务饱和度,该值越大,说明等待任务越多,需要更频繁执行
            int dataNumLevel = 0;
            try {
                if(!tryCallbackTaskLock()) {
                    dataNumLevel = random.nextInt(30);
                    continue;
                }
                try {
                    dataNumLevel = pullCallbackInfoFromDb();
                }
                finally {
                    unlock();
                }
            }
            catch (Throwable e) {
                log.error("【任务回调】执行数据查询时发生了异常", e);
            }
            finally {
                // 添加一个随机10s, 避免集群机器同时请求,从而导致分配不均衡
                int rndSleepSec = random.nextInt(10);
                int realSleepSec = baseSleepSec + rndSleepSec
                                        + maxRangeSec * (100 - dataNumLevel) * 100 / 100;
                SleepUtil.sleepSecs(realSleepSec);
            }
        }
        log.warn("【任务回调】任务结束");
    }

    /**
     * 获取回调运行分布式锁()
     *
     * @return true: 成功, false: 未获取,不执行后续逻辑
     */
    private boolean tryCallbackTaskLock() {
        String methodName = "ResultCallbackWorker";
        // 悲观锁实现, 乐观锁实现
        return lockService.lock(methodName);
    }

    /**
     * 释放分布式锁
     *
     * @return true:成功
     */
    private boolean unlock() {
        String methodName = "ResultCallbackWorker";
        return lockService.unlock(methodName);
    }

    /**
     * 从db中拉取待回调列表并处理
     *
     * @return 更新数据的饱和度: 满分100, 用于后续更新拉取速率
     */
    private Integer pullCallbackInfoFromDb() {
        Integer dealNums = getNoHandlerTaskAndUpdate(10);
        log.info("【任务回调】本次处理无handler的任务数:{}", dealNums);
        dealNums = getCallbackStatusTimeoutTaskAndUpdate(10);
        log.info("【任务回调】本次处理回调超时的任务数:{}", dealNums);
        return dealNums * 100 / 20;
    }

    /**
     * 获取未被任何机器处理的回调任务
     *
     * @return 处理行数
     */
    private Integer getNoHandlerTaskAndUpdate(int limit) {
        PageHelper.startPage(1, limit, false);
        String[] statusEnums = { CallbackStatusEnum.WAIT_HANDLER.name() };
        Map<String, Object> cond = new HashMap<>();
        cond.put("statusList", statusEnums);
        // 拉取 5小时 ~ 1分钟 前应该回调的数据, 进行重试
        cond.put("nextRetryTimeGt", new Date(System.currentTimeMillis() - 5 * 3600_000));
        cond.put("nextRetryTimeLt", new Date(System.currentTimeMillis() - 60_000));
        List<JobDataCallbackInfo> waitingCallbackInfos
                        = jobDataCallbackInfoMapper.getExpiredCallbackTaskInfo(cond);
        return addRequeueCallbackTaskFromDb(waitingCallbackInfos);
    }

    /**
     * 获取未被任何机器处理的回调任务
     *
     * @return 处理行数
     */
    private Integer getCallbackStatusTimeoutTaskAndUpdate(int limit) {
        PageHelper.startPage(1, limit, false);
        String[] statusEnums = { CallbackStatusEnum.HANDLER_RETRYING.name() };
        Map<String, Object> cond = new HashMap<>();
        cond.put("statusList", statusEnums);
        // 只处理6小时前的数据
        cond.put("updateTimeGt", new Date(System.currentTimeMillis() - 6 * 3600_000L));
        // 5小时 ~ 1分钟 前应该回调的数据
        cond.put("nextRetryTimeGt", new Date(System.currentTimeMillis() - 5 * 3600_000));
        cond.put("nextRetryTimeLt", new Date(System.currentTimeMillis() - 60_000));
        cond.put("nowMinusUpdateTimeGt", 600);
        List<JobDataCallbackInfo> waitingCallbackInfos
                = jobDataCallbackInfoMapper.getExpiredCallbackTaskInfo(cond);
        return addRequeueCallbackTaskFromDb(waitingCallbackInfos);
    }

    /**
     * 将从db捞取出的待回调的任务,放入本地队列进行回调
     *
     * @param waitingCallbackInfos 待处理的任务(from db)
     */
    private Integer addRequeueCallbackTaskFromDb(List<JobDataCallbackInfo> waitingCallbackInfos) {
        int submittedTaskNum = 0;
        for (JobDataCallbackInfo callbackInfo : waitingCallbackInfos) {
            // 队列已满,不再添加数据
            if(!submitTaskImmediately(callbackInfo)) {
                return submittedTaskNum;
            }
            submittedTaskNum++;
            updateCallbackFinalStatus(callbackInfo,
                    CallbackStatusEnum.HANDLER_RETRYING, false);
        }
        return submittedTaskNum;
    }

    /**
     * 提交一个新的job回调任务
     *
     * @param jobId 异步任务id
     * @param jobStatus 任务状态
     * @param callbackUrl 回调地址
     * @param bizId 业务id
     */
    public void submitNewJobCallbackTask(String jobId,
                                         DataJobStatus jobStatus,
                                         String callbackUrl,
                                         String bizId) {
        JobDataCallbackInfo callbackInfo = new JobDataCallbackInfo();
        callbackInfo.setJobId(jobId);
        callbackInfo.setBizId(bizId);
        callbackInfo.setBizType("offline_pull_data_job");
        callbackInfo.setJobStatus(jobStatus.name());
        callbackInfo.setCallbackStatus(CallbackStatusEnum.HANDLER_RETRYING);
        int retryTimes = 0;
        callbackInfo.setNextRetryTime(
                        getNextRetryTimeWithPolicy(retryTimes));
        callbackInfo.setRetryTimes(retryTimes);
        callbackInfo.setCallbackUrl(callbackUrl);
        jobDataCallbackInfoMapper.insert(callbackInfo);
        if(!submitTaskImmediately(callbackInfo)) {
            updateCallbackFinalStatus(callbackInfo, CallbackStatusEnum.WAIT_HANDLER);
        }
    }

    /**
     * 立即提交一个任务到
     *
     * @param callbackInfo 回调任务信息
     * @return true: 提交成功, false: 提交失败
     */
    private boolean submitTaskImmediately(JobDataCallbackInfo callbackInfo) {
        if(runningCallbacksContainer.reachMaxQueue()) {
            return true;
        }
        Future<?> taskFuture = executorService.submit(() -> callback(callbackInfo));
        boolean addSuccess = runningCallbacksContainer.addTask(callbackInfo, taskFuture);
        assert addSuccess;
        return true;
    }

    /**
     * 执行某个回调任务的处理逻辑
     *
     * @param callbackInfo 回调参数信息
     */
    private void callback(JobDataCallbackInfo callbackInfo) {
        boolean callSuccess = false;
        try {
            callSuccess = doCallback(callbackInfo.getCallbackUrl(),
                                callbackInfo.getJobId(), callbackInfo.getBizId(),
                                DataJobStatus.valueOf(callbackInfo.getJobStatus()));
        }
        catch (Throwable e) {
            log.error("【回调任务】回调调用方失败,稍后将进行重试, jobId:{}",
                                                callbackInfo.getBizId(), e);
        }
        finally {
            log.info("【回调任务】回调完成:{}, jobId:{}",
                        callbackInfo.getCallbackUrl(), callbackInfo.getJobId());
            if(callSuccess) {
                updateCallbackFinalStatus(callbackInfo, CallbackStatusEnum.SUCCESS);
            }
            else {
                requeueFailedCallbackTaskIfNecessary(callbackInfo);
            }
        }
    }

    /**
     * 关机时,保存当前任务状态
     */
    public void shutdown() {
        runningCallbacksContainer.cancelAllTask();
    }

    /**
     * 重新入队回调失败的队列(延时自行断定)
     *
     * @param callbackInfo 上一次回调信息
     */
    private void requeueFailedCallbackTaskIfNecessary(JobDataCallbackInfo callbackInfo) {
        Config config = ConfigService.getAppConfig();
        Integer maxRetryTimes = config.getIntProperty(
                "_job_finish_callback_retry_max_times", 7);
        if(callbackInfo.getRetryTimes() >= maxRetryTimes) {
            updateCallbackFinalStatus(callbackInfo, CallbackStatusEnum.FAILED);
            return;
        }
        nextRetryCallback(callbackInfo);
    }

    /**
     * 进入下一次回调重试操作
     *
     * @param callbackInfo 回调任务信息
     */
    private void nextRetryCallback(JobDataCallbackInfo callbackInfo) {
        int retryTimes = callbackInfo.getRetryTimes() + 1;
        callbackInfo.setRetryTimes(retryTimes);
        Date nextRetryTime = getNextRetryTimeWithPolicy(retryTimes);
        callbackInfo.setNextRetryTime(nextRetryTime);
        jobDataCallbackInfoMapper.update(callbackInfo);
        // 延时调度
        Future<?> taskFuture = executorService.schedule(() -> callback(callbackInfo),
                                nextRetryTime.getTime() - System.currentTimeMillis(),
                                        TimeUnit.MILLISECONDS);
        boolean addSuccess = runningCallbacksContainer.addTask(callbackInfo, taskFuture);
        assert !addSuccess;
    }

    /**
     * 回调任务终态更新(SUCCESS, FAILED, CANCELED)
     *
     *      或者不再被本次调用的任务,都会更新当前状态
     *
     * @param callbackInfo 回调任务基本信息
     * @param callbackStatus 当次回调结果
     */
    private void updateCallbackFinalStatus(JobDataCallbackInfo callbackInfo,
                                           CallbackStatusEnum callbackStatus) {
        updateCallbackFinalStatus(callbackInfo, callbackStatus, true);
    }

    /**
     * 更新db状态,同时处理本地队列
     *
     * @param removeRunningTask 是否移除本地队列
     * @see #updateCallbackFinalStatus(JobDataCallbackInfo, CallbackStatusEnum)
     */
    private void updateCallbackFinalStatus(JobDataCallbackInfo callbackInfo,
                                           CallbackStatusEnum callbackStatus,
                                           boolean removeRunningTask) {
        callbackInfo.setCallbackStatus(callbackStatus);
        jobDataCallbackInfoMapper.update(callbackInfo);
        if(removeRunningTask) {
            runningCallbacksContainer.taskFinish(callbackInfo);
        }
    }

    /**
     * 回调客户端,通知任务结果
     *
     * @param jobId 任务jobId
     * @param jobStatus 执行状态
     * @return true: 成功
     */
    private boolean doCallback(String callbackUrl,
                                   String jobId, String bizId,
                                   DataJobStatus jobStatus) throws Exception {
        log.info("【回调任务】回调客户端:{} jobId:{}, jobStatus:{}",
                                                callbackUrl, jobId, jobStatus);
        Map<String, Object> params = new HashMap<>();
        params.put("jobId", jobId);
        params.put("jobStatus", jobStatus);
        params.put("bizId", bizId);
        String response = HttpUtils.post(callbackUrl, JSONObject.toJSONString(params));
        log.info("【回调任务】回调成功:{}, response:{}", callbackUrl, response);
        // 业务收到请求,应尽快响应成功结果, 响应 success 则成功
        return "success".equals(response);
    }

    /**
     * 根据重试次数,获取相应的延时策略生成下一次重试时间
     *
     *      退避算法实现1
     *
     * @param retryTimes 重试次数, 0, 1, 2...
     * @return 下一次重试时间
     */
    private Date getNextRetryTimeWithPolicy(int retryTimes) {
        if(retryTimes < 1) {
            retryTimes = 1;
        }
        Config config = ConfigService.getAppConfig();
        String retryIntervalPolicy = config.getProperty(
                    "job_finish_callback_retry_policy",
                    "30/60/180/1800/1800/1800/3600");
        String[] retryIntervalArr = retryIntervalPolicy.split("/");
        if(retryTimes > retryIntervalArr.length) {
            retryTimes = retryIntervalArr.length;
        }
        String hitPolicy = retryIntervalArr[retryTimes - 1];
        return new Date(System.currentTimeMillis()
                        + Integer.valueOf(hitPolicy) * 1000L);
    }

    /**
     * 回调任务管理容器
     */
    private class CallbackTaskWrapperContainer {

        /**
         * 正在运行的回调任务容器,方便进行close
         */
        private Map<Long, CallbackTaskWrapper>
                    runningCallbacksContainer = new ConcurrentHashMap<>();

        /**
         * 添加一个回调任务(正在执行)
         *
         * @param callbackInfo 回调信息
         * @param taskFuture 异步任务实例
         */
        boolean addTask(JobDataCallbackInfo callbackInfo, Future<?> taskFuture) {
            CallbackTaskWrapper oldTaskWrapper
                                    = runningCallbacksContainer.put(callbackInfo.getId(),
                                            new CallbackTaskWrapper(callbackInfo, taskFuture));
            return oldTaskWrapper == null;
        }

        /**
         * 某任务完成处理
         */
        void taskFinish(JobDataCallbackInfo callbackInfo) {
            runningCallbacksContainer.remove(callbackInfo.getId());
        }

        /**
         * 某任务取消处理
         */
        void cancelTask(JobDataCallbackInfo callbackInfo) {
            taskWrapper.cancel();
            updateCallbackFinalStatus(callbackInfo,
                                        CallbackStatusEnum.CANCELED);
            taskFinish(callbackInfo);
        }

        /**
         * 取消所有内存任务, 重新放入等待队列
         */
        void cancelAllTask() {
            // 遍历 running task, 更新为 WAIT_HANDLER
            for (CallbackTaskWrapper taskWrapper : runningCallbacksContainer.values()) {
                taskWrapper.cancel();
                updateCallbackFinalStatus(taskWrapper.getCallbackInfo(),
                                            CallbackStatusEnum.WAIT_HANDLER);
                taskFinish(taskWrapper.getCallbackInfo());
            }
        }

        /**
         * 检查回调任务队列是否达到最大值
         *
         * @return true:已到最大值, false:还可以接收新数据
         */
        boolean reachMaxQueue() {
            int retryQueueMaxSize = 4096;
            return runningCallbacksContainer.size() > retryQueueMaxSize;
        }
    }

    /**
     * 回调任务包装器
     */
    private class CallbackTaskWrapper {

        /**
         * 任务信息实体
         */
        private JobDataCallbackInfo callbackInfo;

        /**
         * 异步任务控制
         */
        private Future<?> taskFuture;

        CallbackTaskWrapper(JobDataCallbackInfo callbackInfo, Future<?> taskFuture) {
            this.callbackInfo = callbackInfo;
            this.taskFuture = taskFuture;
        }
        
        void rolloverFuture(Future<?> taskFuture) {
            this.taskFuture = taskFuture;
        }

        JobDataCallbackInfo getCallbackInfo() {
            return callbackInfo;
        }

        Future<?> getTaskFuture() {
            return taskFuture;
        }

        void cancel() {
            taskFuture.cancel(true);
            callbackInfo = null;
        }
    }
}

其中,有一个重要的回调任务信息的数据结构参考如下:

CREATE TABLE `t_job_data_callback_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `job_id` varchar(64) NOT NULL COMMENT '任务id',
  `callback_status` varchar(30) NOT NULL DEFAULT 'WAIT_HANDLER' COMMENT '回调状态,SUCCESS:回调成功,HANDLER_RETRYING:被执行回调中, WAIT_HANDLER:回调任务等待被接收处理, FAILED:回调最终失败, CANCELED:主动取消',
  `callback_url` varchar(300) DEFAULT '' COMMENT '回调地址',
  `job_status` varchar(30) DEFAULT NULL COMMENT '任务执行状态,冗余字段,回调时使用',
  `retry_times` int(6) NOT NULL DEFAULT '0' COMMENT '已重试次数',
  `biz_id` varchar(200) DEFAULT '' COMMENT '业务id, 看业务作用',
  `next_retry_time` datetime NOT NULL COMMENT '下一次执行回调重试的时间',
  `err_msg` varchar(3000) DEFAULT '' COMMENT '错误信息描述',
  `server_ip` varchar(32) NOT NULL DEFAULT '' COMMENT '执行任务的机器',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `job_id` (`job_id`),
  KEY `next_retry_time` (`next_retry_time`,`callback_status`),
  KEY `update_time` (`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='异步任务回调客户端信息表';

以上基本就是整个的可靠优雅的回调实现了,其中一基础的db操作,枚举类之类的就不用细化了。

核心大部分可以简单描述为前面所说的重试机制. 但还有一点值得说明的是, 为了避免任务在集群环境中分布不均匀, 所以使用了一个饱和度+随机值延时的方式, 让每个机器都有差不多的机会执行回调任务.(不过具体的分布均匀性, 还需要实践去验证才行, 可以通过统计server_ip查看)

4. 时序图

下面以一个时序图, 展示整体工作流程的全貌:

aUZrIne.png!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK