40

来!给我做一个分钟级业务监控系统 【实战】

 4 years ago
source link: https://www.tuicool.com/articles/BbuMfui
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

如何做一个实时的业务统计的监控?比如分钟级?也就是每分钟可以快速看到业务的变化趋势,及可以做一些简单的分组查询?

哎,你可能说很简单了,直接从数据库 count 就可以了! 你是对的。

但如果不允许你使用db进行count呢?因为线上数据库资源可是很宝贵的哦,你这一count可能会给db带来灾难了。

那不然咋整?

没有db,我们还有其他数据源嘛,比如: 消息队列?埋点数据? 本文将是基于该前提而行。

做监控,尽量不要侵入业务太多!所以有一个消息中间件是至关重要的。针对大数据系统,一般是: kafka 或者 类kafka. (如本文基础 loghub)

有了消息中间件,如何进行分钟级监控? 这个应该就很简单了吧。不过如果要自己实现,其实坑也不少的!

如果自己实现计数,那么你可能需要做以下几件事:

1. 每消费一个消息,你需要一个累加器;

2. 每隔一个周期,你可能需要一个归档操作;

3. 你可能需要考虑各种并发安全问题;

4. 你可能需要考虑种性能问题;

5. 你可能需要考虑各种机器故障问题;

6. 你可能需要考虑各种边界值问题;

哎,其实没那么难。时间序列数据库,就专门为这类事情而生!如OpenTSDB: http://opentsdb.net/overview.html

可以说,TSDB 是这类应用场景的杀手锏。或者基于流计算框架: 如flink, 也是很轻松完成的事。但是不是本文的方向,略过!

本文是基于 loghub 的现有数据,进行分钟级统计后,入库 mysql 中,从而支持随时查询。(因loghub每次查询都是要钱的,所以,不可能直接查询)

loghub 数据结构如: 2019-07-10 10:01:11,billNo,userId,productCode,...

由于loghub提供了很多强大的查询统计功能,所以我们可以直接使用了。

核心功能就是一个统计sql,还是比较简单的。但是需要考虑的点也不少,接下来,将为看官们奉上一个完整的解决方案!

撸代码去!

1. 核心统计任务实现类 MinuteBizDataCounterTask

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogContent;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.common.QueriedLog;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.GetLogsResponse;
import com.my.service.statistics.StatisticsService;
import com.my.entity.BizDataStatisticsMin;
import com.my.model.LoghubQueryCounterOffsetModel;
import com.my.util.loghub.LogHubProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;

/**
 * 基于loghub 的分钟级 统计任务
 */
@Component
@Slf4j
public class MinuteBizDataCounterTask implements Runnable {

    @Resource
    private LogHubProperties logHubProperties;

    @Resource
    private StatisticsService statisticsService;

    @Resource(name = "defaultOffsetQueryTaskCallback")
    private DefaultOffsetQueryTaskCallbackImpl defaultOffsetQueryTaskCallback;

    /**
     * loghub 客户端
     */
    private volatile Client mClient;

    /**
     * 过滤的topic
     */
    private static final String LOGHUB_TOPIC = "topic_test";

    /**
     * 单次扫描loghub最大时间 间隔分钟数
     */
    @Value("${loghub.offset.counter.perScanMaxMinutesGap}")
    private Integer perScanMaxMinutesGap;

    /**
     * 单次循环最大数
     */
    @Value("${loghub.offset.counter.perScanMaxRecordsLimit}")
    private Integer perScanMaxRecordsLimit;

    /**
     * 构造必要实例信息
     */
    public ProposalPolicyBizDataCounterTask() {

    }

    @Override
    public void run() {
        if(mClient == null) {
            this.mClient = new Client(logHubProperties.getEndpoint(),
                                logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey());
        }
        while (!Thread.interrupted()) {
            try {
                updateLastMinutePolicyNoCounter();
                Thread.sleep(60000);
            }
            catch (InterruptedException e) {
                log.error("【分钟级统计task】, sleep 中断", e);
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                // 注意此处可能有风险,发生异常后将快速死循环
                log.error("【分钟级统计task】更新异常", e);
                try {
                    Thread.sleep(10000);
                }
                catch (InterruptedException ex) {
                    log.error("【分钟级统计task】异常,且sleep异常", ex);
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    /**
     * 更新最近的数据 (分钟级)
     *
     * @throws LogException loghub查询异常时抛出
     */
    private void updateLastMinutePolicyNoCounter() throws LogException {
        updateMinutePolicyNoCounter(null);
    }

    /**
     * 更新最近的数据
     */
    public Integer updateMinutePolicyNoCounter(LoghubQueryCounterOffsetModel specifyOffset) throws LogException {
        // 1. 获取偏移量
        // 2. 根据偏移量,判定是否可以一次性取完,或者多次获取更新
        // 3. 从loghub中设置偏移量,获取统计数据,更新
        // 4. 更新db数据统计值
        // 5. 更新偏移量
        // 6. 等待下一次更新

        // 指定offset时,可能为补数据
        final LoghubQueryCounterOffsetModel destOffset = enhanceQueryOffset(specifyOffset);
        initSharedQueryOffset(destOffset, destOffset == specifyOffset);

        Integer totalAffectNum = 0;

        while (!isScanFinishOnDestination(destOffset)) {
            // 完整扫描一次时间周期
            calcNextSharedQueryOffset(destOffset);
            while (true) {
                calcNextInnerQueryOffset();
                ArrayList<QueriedLog> logs = queryPerMinuteStatisticFromLoghubOnCurrentOffset();
                Integer affectNum = handleMiniOffsetBatchCounter(logs);
                totalAffectNum += affectNum;
                log.info("【分钟级统计task】本次更新数据:{}, offset:{}", affectNum, getCurrentSharedQueryOffset());
                if(!hasMoreDataOffset(logs.size())) {
                    rolloverOffsetAndCommit();
                    break;
                }
            }
        }
        log.info("【分钟级统计task】本次更新数据,总共:{}, destOffset:{}, curOffset:{}",
                            totalAffectNum, destOffset, getCurrentSharedQueryOffset());
        rolloverOffsetAndCommit();
        return totalAffectNum;
    }

    /**
     * 处理一小批的统计数据
     *
     * @param logs 小批统计loghub数据
     * @return 影响行数
     */
    private Integer handleMiniOffsetBatchCounter(ArrayList<QueriedLog> logs) {
        if (logs == null || logs.isEmpty()) {
            return 0;
        }
        List<BizDataStatisticsMin> statisticsMinList = new ArrayList<>();
        for (QueriedLog log1 : logs) {
            LogItem getLogItem = log1.GetLogItem();
            BizDataStatisticsMin statisticsMin1 = adaptStatisticsMinDbData(getLogItem);
            statisticsMin1.setEventCode(PROPOSAL_FOUR_IN_ONE_TOPIC);
            statisticsMin1.setEtlVersion(getCurrentScanTimeDuring() + ":" + statisticsMin1.getStatisticsCount());
            statisticsMinList.add(statisticsMin1);
        }
        return statisticsService.batchUpsertPremiumStatistics(statisticsMinList, getCurrentOffsetCallback());
    }


    /**
     * 获取共享偏移信息
     *
     * @return 偏移
     */
    private LoghubQueryCounterOffsetModel getCurrentSharedQueryOffset() {
        return defaultOffsetQueryTaskCallback.getCurrentOffset();
    }

    /**
     * 判断本次是否扫描完成
     *
     * @param destOffset 目标偏移
     * @return true:扫描完成, false: 未完成
     */
    private boolean isScanFinishOnDestination(LoghubQueryCounterOffsetModel destOffset) {
        return defaultOffsetQueryTaskCallback.getEndTime() >= destOffset.getEndTime();
    }

    /**
     * 获取偏移提交回调器
     *
     * @return 回调实例
     */
    private OffsetQueryTaskCallback getCurrentOffsetCallback() {
        return defaultOffsetQueryTaskCallback;
    }

    /**
     * 初始化共享的查询偏移变量
     *
     * @param destOffset 目标偏移
     * @param isSpecifyOffset 是否是手动指定的偏移
     */
    private void initSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset, boolean isSpecifyOffset) {
        // 整分花时间数据
        Integer queryStartTime = destOffset.getStartTime();
        if(queryStartTime % 60 != 0) {
            queryStartTime = queryStartTime / 60 * 60;
        }
        // 将目标扫描时间终点 设置为起点,以备后续迭代
        defaultOffsetQueryTaskCallback.initCurrentOffset(queryStartTime, queryStartTime,
                                                        destOffset.getOffsetStart(), destOffset.getLimit(),
                                                        destOffset.getIsNewStep(), isSpecifyOffset);
        if(defaultOffsetQueryTaskCallback.getIsNewStep()) {
            resetOffsetDefaultSettings();
        }
    }

    /**
     * 计算下一次统计偏移时间
     *
     * @param destOffset 目标偏移值
     */
    private void calcNextSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset) {
        int perScanMaxSecondsGap = perScanMaxMinutesGap * 60;
        if(destOffset.getEndTime() - defaultOffsetQueryTaskCallback.getStartTime() > perScanMaxSecondsGap) {
            defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());
            int nextExpectEndTime = defaultOffsetQueryTaskCallback.getStartTime() + perScanMaxSecondsGap;
            if(nextExpectEndTime > destOffset.getEndTime()) {
                nextExpectEndTime = destOffset.getEndTime();
            }
            defaultOffsetQueryTaskCallback.setEndTime(nextExpectEndTime);
        }
        else {
            defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());
            defaultOffsetQueryTaskCallback.setEndTime(destOffset.getEndTime());
        }
        resetOffsetDefaultSettings();
    }

    /**
     * 重置偏移默认配置
     */
    private void resetOffsetDefaultSettings() {
        defaultOffsetQueryTaskCallback.setIsNewStep(true);
        defaultOffsetQueryTaskCallback.setOffsetStart(0);
        defaultOffsetQueryTaskCallback.setLimit(0);
    }

    /**
     * 计算下一次小偏移,此种情况应对 一次外部偏移未查询完成的情况
     */
    private void calcNextInnerQueryOffset() {
        defaultOffsetQueryTaskCallback.setIsNewStep(false);
        // 第一次计算时,limit 为0, 所以得出的 offsetStart 也是0
        defaultOffsetQueryTaskCallback.setOffsetStart(
                defaultOffsetQueryTaskCallback.getOffsetStart() + defaultOffsetQueryTaskCallback.getLimit());
        defaultOffsetQueryTaskCallback.setLimit(perScanMaxRecordsLimit);
    }

    /**
     * 获取当前循环的扫描区间
     *
     * @return 15567563433-1635345099 区间
     */
    private String getCurrentScanTimeDuring() {
        return defaultOffsetQueryTaskCallback.getStartTime() + "-" + defaultOffsetQueryTaskCallback.getEndTime();
    }

    /**
     * 从loghub查询每分钟的统计信息
     *
     * @return 查询到的统计信息
     * @throws LogException loghub 异常时抛出
     */
    private ArrayList<QueriedLog> queryPerMinuteStatisticFromLoghubOnCurrentOffset() throws LogException {
        // 先按保单号去重,再进行计数统计
        String countSql = "* | split(bizData, ',')[5] policyNo, bizData GROUP by split(bizData, ',')[5] " +
                " | select count(1) as totalCountMin, " +
                "split(bizData, ',')[2] as productCode," +
                "split(bizData, ',')[3] as schemaCode," +
                "split(bizData, ',')[4] as channelCode," +
                "substr(split(bizData, ',')[1], 1, 16) as myDateTimeMinute " +
                "group by substr(split(bizData, ',')[1], 1, 16), split(bizData, ',')[2],split(bizData, ',')[3], split(bizData, ',')[4],split(bizData, ',')[7], split(bizData, ',')[8]";
        countSql += " limit " + defaultOffsetQueryTaskCallback.getOffsetStart() + "," + defaultOffsetQueryTaskCallback.getLimit();
        GetLogsResponse countResponse = mClient.GetLogs(logHubProperties.getProjectName(), logHubProperties.getBizCoreDataLogStore(),
                defaultOffsetQueryTaskCallback.getStartTime(), defaultOffsetQueryTaskCallback.getEndTime(),
                LOGHUB_TOPIC, countSql);
        if(!countResponse.IsCompleted()) {
            log.error("【分钟级统计task】扫描获取到未完整的数据,请速检查原因,offSet:{}", getCurrentSharedQueryOffset());
        }
        return countResponse.GetLogs() == null
                    ? new ArrayList<>()
                    : countResponse.GetLogs();
    }

    /**
     * 根据上一次返回的记录数量,判断是否还有更多数据
     *
     * @param lastGotRecordsCount 上次返回的记录数 (数据量大于最大数说明还有未取完数据)
     * @return true: 是还有更多数据应该再循环获取, false: 无更多数据结束本期任务
     */
    private boolean hasMoreDataOffset(int lastGotRecordsCount) {
        return lastGotRecordsCount >= perScanMaxRecordsLimit;
    }

    /**
     * 加强版的 offset 优先级: 指定偏移 -> 基于缓存的偏移 -> 新生成偏移标识
     *
     * @param specifyOffset 指定偏移(如有)
     * @return 偏移标识
     */
    private LoghubQueryCounterOffsetModel enhanceQueryOffset(LoghubQueryCounterOffsetModel specifyOffset) {
        if(specifyOffset != null) {
            return specifyOffset;
        }
        LoghubQueryCounterOffsetModel offsetBaseOnCache = getNextOffsetBaseOnCache();
        if(offsetBaseOnCache != null) {
            return offsetBaseOnCache;
        }
        return generateNewOffset();
    }

    /**
     * 基于缓存获取一下偏移标识
     *
     * @return 偏移
     */
    private LoghubQueryCounterOffsetModel getNextOffsetBaseOnCache() {
        LoghubQueryCounterOffsetModel offsetFromCache = defaultOffsetQueryTaskCallback.getCurrentOffsetFromCache();
        if(offsetFromCache == null) {
            return null;
        }
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),
                                                    now.getHour(), now.getMinute());
        // 如果上次仍未内部循环完成,则使用原来的
        if(offsetFromCache.getIsNewStep()) {
            offsetFromCache.setStartTime(offsetFromCache.getEndTime());
            long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));
            offsetFromCache.setEndTime((int) endTime);
        }
        return offsetFromCache;
    }

    /**
     * 生成新的完整的 偏移标识
     *
     * @return 新偏移
     */
    private LoghubQueryCounterOffsetModel generateNewOffset() {
        LoghubQueryCounterOffsetModel offsetNew = new LoghubQueryCounterOffsetModel();
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),
                now.getHour(), now.getMinute());
        long startTime = nowMinTime.minusDays(1).toEpochSecond(ZoneOffset.of("+8"));
        long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));
        offsetNew.setStartTime((int) startTime);
        offsetNew.setEndTime((int) endTime);
        return offsetNew;
    }
    /**
     * 将日志返回数据 适配到数据库记录中
     *
     * @param logItem 日志详情
     * @return db数据结构对应
     */
    private BizDataStatisticsMin adaptStatisticsMinDbData(LogItem logItem) {
        ArrayList<LogContent> logContents = logItem.GetLogContents();
        BizDataStatisticsMin statisticsMin1 = new BizDataStatisticsMin();
        for (LogContent logContent : logContents) {
            switch (logContent.GetKey()) {
                case "totalCountMin":
                    statisticsMin1.setStatisticsCount(Integer.valueOf(logContent.GetValue()));
                    break;
                case "productCode":
                    statisticsMin1.setProductCode(logContent.GetValue());
                    break;
                case "myDateTimeMinute":
                    String signDtMinStr = logContent.GetValue();
                    String[] dateTimeArr = signDtMinStr.split(" ");
                    String countDate = dateTimeArr[0];
                    String[] timeArr = dateTimeArr[1].split(":");
                    String countHour = timeArr[0];
                    String countMin = timeArr[1];
                    statisticsMin1.setCountDate(countDate);
                    statisticsMin1.setCountHour(countHour);
                    statisticsMin1.setCountMin(countMin);
                    break;
                default:
                    break;
            }
        }
        return statisticsMin1;
    }

    /**
     * 重置默认值,同时提交当前 (滚动到下一个偏移点)
     */
    private void rolloverOffsetAndCommit() {
        resetOffsetDefaultSettings();
        commitOffsetSync();
    }

    /**
     * 提交偏移量
     *
     */
    private void commitOffsetSync() {
        defaultOffsetQueryTaskCallback.commit();
    }

}

主要实现逻辑如下:

1. 每隔一分钟进行一个查询;

2. 发生异常后,容错继续查询;

3. 对于一个新统计,默认倒推一天范围进行统计;

4. 统计时间范围间隔可设置,避免一次查询数量太大,费用太高且查询返回数量有限;

5. 对于每次小批量查询,支持分布操作,直到取完数据;

6. 小批量数据完成后,自动提交查询偏移;

7. 后续查询将基础提交的偏移进行;

8. 支持断点查询;

2. 偏移提交管理器 OffsetQueryTaskCallback

主任务中,只管进行数据统计查询,提交偏移操作由其他类进行;

/**
 * 普通任务回调接口定义, 考虑到多种类型的统计任务偏移操作方式可能不一,定义一个通用型偏移接口
 *
 */
public interface OffsetQueryTaskCallback {

    /**
     * 回调方法入口, 提交偏移
     */
    public void commit();

    /**
     * 设置初始化绑定当前偏移(期间不得改变)
     *
     * @param startTime 偏移开始时间
     * @param endTime 偏移结束时间
     * @param offsetStart 偏移开始值(分页)
     * @param limit 单次取值最大数(分页)
     * @param isNewStep 是否是新的查询
     * @param isSpecifyOffset 是否是指定的偏移
     */
    public void initCurrentOffset(Integer startTime, Integer endTime,
                                  Integer offsetStart, Integer limit,
                                  Boolean isNewStep, Boolean isSpecifyOffset);

    /**
     * 从当前环境中获取当前偏移信息
     *
     * @return 偏移变量实例
     */
    public LoghubQueryCounterOffsetModel getCurrentOffset();

}


import com.alibaba.fastjson.JSONObject;
import com.my.util.constants.RedisKeysConstantEnum;
import com.my.util.redis.RedisPoolUtil;
import com.my.model.LoghubQueryCounterOffsetModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 默认偏移回调实现
 *
 */
@Component("defaultOffsetQueryTaskCallback")
@Slf4j
public class DefaultOffsetQueryTaskCallbackImpl implements OffsetQueryTaskCallback {

    @Resource
    private RedisPoolUtil redisPoolUtil;

    /**
     * 当前偏移信息
     */
    private ThreadLocal<LoghubQueryCounterOffsetModel> currentOffsetHolder = new ThreadLocal<>();


    @Override
    public void commit() {
        if(!currentOffsetHolder.get().getIsSpecifyOffset()) {
            redisPoolUtil.set(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey(),
                    JSONObject.toJSONString(currentOffsetHolder.get()));
        }
    }

    @Override
    public void initCurrentOffset(Integer startTime, Integer endTime,
                                  Integer offsetStart, Integer limit,
                                  Boolean isNewStep, Boolean isSpecifyOffset) {
        LoghubQueryCounterOffsetModel currentOffset = new LoghubQueryCounterOffsetModel();
        currentOffset.setStartTime(startTime);
        currentOffset.setEndTime(endTime);
        currentOffset.setOffsetStart(offsetStart);
        currentOffset.setIsNewStep(isNewStep);
        currentOffset.setIsSpecifyOffset(isSpecifyOffset);
        currentOffsetHolder.set(currentOffset);
    }

    @Override
    public LoghubQueryCounterOffsetModel getCurrentOffset() {
        return currentOffsetHolder.get();
    }

    /**
     * 从缓存中获取当前偏移信息
     *
     * @return 缓存偏移或者 null
     */
    public LoghubQueryCounterOffsetModel getCurrentOffsetFromCache() {
        String offsetCacheValue = redisPoolUtil.get(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey());
        if (StringUtils.isBlank(offsetCacheValue)) {
            return null;
        }
        return JSONObject.parseObject(offsetCacheValue, LoghubQueryCounterOffsetModel.class);
    }

    public Integer getStartTime() {
        return currentOffsetHolder.get().getStartTime();
    }

    public void setStartTime(Integer startTime) {
        currentOffsetHolder.get().setStartTime(startTime);
    }

    public Integer getEndTime() {
        return currentOffsetHolder.get().getEndTime();
    }

    public void setEndTime(Integer endTime) {
        currentOffsetHolder.get().setEndTime(endTime);
    }

    public Integer getOffsetStart() {
        return currentOffsetHolder.get().getOffsetStart();
    }

    public void setOffsetStart(Integer offsetStart) {
        currentOffsetHolder.get().setOffsetStart(offsetStart);
    }

    public Integer getLimit() {
        return currentOffsetHolder.get().getLimit();
    }

    public void setLimit(Integer limit) {
        currentOffsetHolder.get().setLimit(limit);
    }

    public Boolean getIsNewStep() {
        return currentOffsetHolder.get().getIsNewStep();
    }

    public void setIsNewStep(Boolean isNewStep) {
        currentOffsetHolder.get().setIsNewStep(isNewStep);
    }

}

/**
 * loghub 查询偏移量 数据容器
 *
 */
@Data
public class LoghubQueryCounterOffsetModel implements Serializable {

    private static final long serialVersionUID = -3749552331349228045L;

    /**
     * 开始时间
     */
    private Integer startTime;

    /**
     * 结束时间
     */
    private Integer endTime;

    /**
     * 起始偏移
     */
    private Integer offsetStart = 0;

    /**
     * 每次查询的 条数限制, 都需要进行设置后才可用, 否则查无数据
     */
    private Integer limit = 0;

    /**
     * 是否新的偏移循环,如未完成,应继续子循环 limit
     *
     * true: 是, offsetStart,limit 失效, false: 否, 需借助 offsetStart,limit 进行limit相加
     */
    private Boolean isNewStep = true;

    /**
     * 是否是手动指定的偏移,如果是说明是在手动被数据,偏移量将不会被更新
     *
     *      此变量是瞬时值,将不会被持久化到偏移标识中
     */
    private transient Boolean isSpecifyOffset;

}

3. 批量更新统计结果数据库的实现

因每次统计的数据量是不确定的,因尽可能早的提交一次统计结果,防止一次提交太多,或者 机器故障时所有统计白费,所以需要分小事务进行。

    
@Service
public class StatisticsServiceImpl implements StatisticsService {
    /**
     * 批量更新统计分钟级数据 (事务型提交)
     *
     * @param statisticsMinList 新统计数据
     * @return 影响行数
     */
    @Transactional(isolation = Isolation.READ_COMMITTED, rollbackFor = Throwable.class)
    public Integer batchUpsertPremiumStatistics(List<BizProposalPolicyStatisticsMin> statisticsMinList,
            OffsetQueryTaskCallback callback) {
        AtomicInteger updateCount = new AtomicInteger(0);
        statisticsMinList.forEach(item -> {
            int affectNum = 0;
            BizProposalPolicyStatisticsMin oldStatistics = bizProposalPolicyStasticsMinMapper.selectOneByCond(item);
            if (oldStatistics == null) {
                item.setEtlVersion(item.getEtlVersion() + ":0");
                affectNum = bizProposalPolicyStasticsMinMapper.insert(item);
            } else {
                oldStatistics.setStatisticsCount(oldStatistics.getStatisticsCount() + item.getStatisticsCount());
                String versionFull = versionKeeperFilter(oldStatistics.getEtlVersion(), item.getEtlVersion());
                oldStatistics.setEtlVersion(versionFull + ":" + oldStatistics.getStatisticsCount());
                // todo: 优化更新版本号问题
                affectNum = bizProposalPolicyStasticsMinMapper.updateByPrimaryKey(oldStatistics);
            }
            updateCount.addAndGet(affectNum);
        });
        callback.commit();
        return updateCount.get();
    }

    /**
     * 版本号过滤器(组装版本信息)
     *
     * @param oldVersion     老版本信息
     * @param currentVersion 当前版本号
     * @return 可用的版本信息
     */
    private String versionKeeperFilter(String oldVersion, String currentVersion) {
        String versionFull = oldVersion + "," + currentVersion;
        if (versionFull.length() >= 500) {
            // 从150以后,第一版本号开始保留
            versionFull = versionFull.substring(versionFull.indexOf(',', 150));
        }
        return versionFull;
    }

}

4. 你需要一个启动任务的地方

/**
 * 启动时运行的任务调度服务
 *
 */
@Service
@Slf4j
public class TaskAutoRunScheduleService {

    @Resource
    private MinuteBizDataCounterTask minuteBizDataCounterTask;

    @PostConstruct
    public void bizDataAutoRun() {
        log.info("============= bizDataAutoRun start =================");
        ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("Biz-data-counter-%d"));
        executorService.submit(minuteBizDataCounterTask);
    }

}

5. 将每分钟的数据从db查询出来展示到页面

以上将数据统计后以分钟级汇总到数据,接下来,监控页面就只需从db中进行简单聚合就可以了,咱们就不费那精力去展示了。

6. 待完善的地方

集群环境的任务运行将会出问题,解决办法是:加一个分布式锁即可。 你可以的!

唠叨: 踩坑不一定是坏事!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK