7

[从源码学设计]蚂蚁金服SOFARegistry之消息总线异步处理

 3 years ago
source link: http://www.cnblogs.com/rossiXYZ/p/14088011.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

[从源码学设计]蚂蚁金服SOFARegistry之消息总线异步处理

目录

  • [从源码学设计]蚂蚁金服SOFARegistry之消息总线异步处理

0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。

本文为第五篇,介绍SOFARegistry消息总线的异步处理。

0x01 为何分离

前文我们讲述了SOFARegistry的消息总线,本文我们讲讲一个变种 DataChangeEventCenter。

DataChangeEventCenter 是被独立出来的,专门处理数据变化相关的消息。

为什么要分离呢?因为:

  • 从架构说,DataChangeEventCenter 是专门处理数据变化消息,这是一种解耦;
  • 从技术上来说,DataChangeEventCenter 也和 EventCenter 有具体实现技巧的不同,所以需要分开处理;
  • 但更深入的原因是业务场景不同,下面分析中我们可以看出,DataChangeEventCenter 和业务耦合的相当紧密;

0x02 业务领域

2.1 应用场景

DataChangeEventCenter 的独特业务场景如下:

  • 需要提供归并功能。即短期内会有多个通知来到,不需要逐一处理,只处理最后一个即可;
  • 异步处理消息;
  • 需要保证消息顺序;
  • 有延迟操作;
  • 需要提高处理能力,并行处理;

因此,DataChangeEventCenter 代码和业务联系相当紧密,前文的 EventCenter 已经不适合了。

2.2 延迟和归并

关于延迟和归并操作,我们单独说明下。

2.2.1 业务特点

蚂蚁金服业务的一个 特点 是: 通过连接敏感的特性对服务宕机做到秒级发现

因此 SOFARegistry 在健康检测的设计方面决定“服务数据与服务发布者的实体连接绑定在一起,断连马上清数据”,简称此特点叫做 连接敏感性 。连接敏感性是指在 SOFARegistry 里所有 Client 都与 SessionServer 保持长连接,每条长连接都设置基于 SOFABolt 的连接心跳,如果长连接断连客户端立即发起重新建连,时刻保持 Client 与 SessionServer 之间可靠的连接。

2.2.2 问题

但带来了 一个问题 就是:可能因为网络问题,短期内会出现大量重新建连操作。比如只是网络问题导致连接断开,实际的服务进程没有宕机,此时客户端立即发起重新连接 SessionServer 并且重新注册所有服务数据。

但是 假如此过程耗时足够短暂(例如 500ms 内发生断连和重连),服务订阅者 应该 感受不到服务下线。从而 SOFARegistry 内部 应该做相应处理

2.2.3 解决

SOFARegistry 内部做了归并和延迟操作来保证用户不受影响。比如 DataServer 内部的数据通过 mergeDatum 延迟合并变更的 Publisher 服务信息,version 是合并后最新的版本号。

对于 DataChangeEventCenter,就是通过消息的延迟和归并来协助完成这个功能

2.3 蚂蚁金服实现

下面是 DataChangeEventCenter 总体的功能描述:

  • 当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler;
  • Handler 首先会判断当前节点的状态:
    • 若是非工作状态则返回请求失败;
    • 若是工作状态,Handler 会往 dataChangeEventCenter 中添加一个数据变更事件,则触发数据变化事件中心 DataChangeEventCenter 的 onChange 方法。用于异步地通知事件变更中心数据的变更;
  • 事件变更中心收到该事件之后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不同的事件类型异步地对上下线数据进行相应的处理;
  • 与此同时,DataChangeHandler 会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步;

0x03 DataChangeEventCenter

3.1 总述

DataChangeEventCenter具体分成四部分:

  • Event Center:组织成消息中心;
  • Event Queue:用于多路分别处理,增加处理能力;
  • Event Task:每一个Queue内部启动一个线程,用于异步处理,增加处理能力;
  • Event Handler:用于处理内部ChangeData;

接下来我们一一介绍,因为 DataChangeEventCenter 和业务结合紧密,所以我们会深入结合业务进行讲解。

3.2 DataChangeEventCenter

3.2.1 定义

DataChangeEventCenter 中维护着一个 DataChangeEventQueue 队列数组,这是核心。数组中的每个元素是一个事件队列。具体定义如下:

public class DataChangeEventCenter {

    /**
     * count of DataChangeEventQueue
     */
    private int                    queueCount;

    /**
     * queues of DataChangeEvent
     */
    private DataChangeEventQueue[] dataChangeEventQueues;

    @Autowired
    private DataServerConfig       dataServerConfig;

    @Autowired
    private DatumCache             datumCache;
}

3.2.2 消息类型

DataChangeEventCenter 专门处理 IDataChangeEvent 类型消息,其具体实现为三种:

  • public class ClientChangeEvent implements IDataChangeEvent
  • public class DataChangeEvent implements IDataChangeEvent
  • public class DatumSnapshotEvent implements IDataChangeEvent

这些不同类型的消息可以放入同一个队列, 具体放入哪个队列,是根据特定判别方式来决定 ,比如根据Publisher的DataInfoId来做hash,以此决定放入哪个Queue。

即,当对应 handler 的 onChange 方法被触发时,会计算该变化服务的 dataInfoId 的 Hash 值,从而进一步确定出该服务注册数据所在的队列编号,进而把该变化的数据封装成一个数据变化对象,传入到队列中。

3.2.3 初始化

在初始化函数中,构建了EventQueue,每一个Queue启动了一个线程,用来处理消息。

@PostConstruct
public void init() {
    if (isInited.compareAndSet(false, true)) {
        queueCount = dataServerConfig.getQueueCount();
        dataChangeEventQueues = new DataChangeEventQueue[queueCount];
        for (int idx = 0; idx < queueCount; idx++) {
            dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this,datumCache);
            dataChangeEventQueues[idx].start();
        }
    }
}

3.2.4 Put 消息

put消息比较简单,具体如何判别应该把Event放入哪一个Queue是根据具体方式来判断,比如根据Publisher的DataInfoId来做hash,以此决定放入哪个Queue:

int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB, datum));

3.2.5 如何处理消息

具体是通过 dataChangeEventQueues.onChange 来做处理,比如如下几个函数,分别处理不同的消息类型。具体都是找到queue,然后调用:

public void onChange(Publisher publisher, String dataCenter) {
    int idx = hash(publisher.getDataInfoId());
    Datum datum = new Datum(publisher, dataCenter);
    if (publisher instanceof UnPublisher) {
        datum.setContainsUnPub(true);
    }
    if (publisher.getPublishType() != PublishType.TEMPORARY) {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB, datum));
    } else {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB_TEMP, datum));
    }
}

public void onChange(ClientChangeEvent event) {
    for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) {
        dataChangeEventQueue.onChange(event);
    }
}

public void onChange(DatumSnapshotEvent event) {
    for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) {
        dataChangeEventQueue.onChange(event);
    }
}

public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {
    int idx = hash(datum.getDataInfoId());
    DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum);
    dataChangeEventQueues[idx].onChange(event);
}

3.3 DataChangeEvent

因为 DataChangeEvent 最常用,所以我们单独拿出来说明。

DataChangeEvent会根据DataChangeTypeEnum和DataSourceTypeEnum来进行区分,就是处理类型和消息来源。

DataChangeTypeEnum具体分为:

  • MERGE,如果变更类型是MERGE,则会更新缓存中需要更新的新Datum,并且更新版本号;
  • COVER,如果变更类型是 COVER,则会覆盖原有的缓存;

DataSourceTypeEnum 具体分为:

  • PUB :pub by client;
  • PUB_TEMP :pub temporary data;
  • SYNC:sync from dataservers in other datacenter;
  • BACKUP:from dataservers in the same datacenter;
  • CLEAN:local dataInfo check,not belong this node schedule remove;
  • SNAPSHOT:Snapshot data, after renew finds data inconsistent;

具体定义如下:

public class DataChangeEvent implements IDataChangeEvent {

    /**
     * type of changed data, MERGE or COVER
     */
    private DataChangeTypeEnum changeType;

    private DataSourceTypeEnum sourceType;

    /**
     * data changed
     */
    private Datum              datum;
}

3.4 DataChangeEventQueue

DataChangeEventQueue 是这个子模块的核心,用于 多路分别处理,增加处理能力 。每一个Queue内部启动一个线程,用于 异步处理,也能增加处理能力

3.4.1 核心变量

这里的核心是:

  • BlockingQueue eventQueue;

  • Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

  • DelayQueue CHANGE_QUEUE = new DelayQueue();

讲解如下:

  • 可以看到,这里操作的数据类型是ChangeData, 把Datum转换成 ChangeData 可以把消息处理方式 或者 来源统一起来处理
  • eventQueue 用来存储投放的消息,所有消息block在queue上,这可以保证消息的顺序处理;
  • CHANGE_DATA_MAP_FOR_MERGE。顾名思义,主要处理消息归并。这是按照 dataCenter,dataInfoId 作为维度,分别存储 ChangeData,可以理解为一个矩阵Map,使用putIfAbsent方法添加键值对,如果map集合中没有该key对应的值,则直接添加,并返回null,如果已经存在对应的值,则依旧为原来的值。 这样如果短期内向map中添加多个消息,这样就对多余的消息做了归并
  • CHANGE_QUEUE 的作用是用于统一处理投放的ChangeData,无论是哪个 data center的数据,都会统一在这里处理;这里需要注意的是 使用了DelayQueue来进行延迟操作 ,就是我们之前业务中提到的延迟操作;

具体定义如下:

public class DataChangeEventQueue {

    private final String                               name;

    /**
     * a block queue that stores all data change events
     */
    private final BlockingQueue<IDataChangeEvent>      eventQueue;

    private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

    private final DelayQueue<ChangeData>               CHANGE_QUEUE              = new DelayQueue();

    private final int                                  notifyIntervalMs;

    private final int                                  notifyTempDataIntervalMs;

    private final ReentrantLock                        lock                      = new ReentrantLock();

    private final int                                  queueIdx;

    private DataServerConfig                           dataServerConfig;

    private DataChangeEventCenter                      dataChangeEventCenter;

    private DatumCache                                 datumCache;
}

3.4.2 启动和引擎

DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的时候被一个新的线程调用, 该线程会源源不断地从队列中获取新增事件,并且进行分发。新增数据会由此添加进节点内,实现分片 。因为 eventQueue 是一个 BlockingQueue,所以可以使用while (true)来控制。

当event被取出之后,会根据 DataChangeScopeEnum.DATUM 的不同,会做不同的处理。

  • 如果是DataChangeScopeEnum.DATUM,则判断dataChangeEvent.getSourceType();
    • 如果是 DataSourceTypeEnum.PUB_TEMP,则addTempChangeData,就是往CHANGE_QUEUE添加ChangeData;
    • 如果不是,则handleDatum;
  • 如果是DataChangeScopeEnum.CLIENT,则handleClientOff((ClientChangeEvent) event);
  • 如果是DataChangeScopeEnum.SNAPSHOT,则handleSnapshot((DatumSnapshotEvent) event);

具体代码如下:

public void start() {
    Executor executor = ExecutorFactory
            .newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName()));
    executor.execute(() -> {
        while (true) {
            try {
                IDataChangeEvent event = eventQueue.take();
                DataChangeScopeEnum scope = event.getScope();
                if (scope == DataChangeScopeEnum.DATUM) {
                    DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
                    //Temporary push data will be notify as soon as,and not merge to normal pub data;
                    if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) {
                        addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(),
                                dataChangeEvent.getSourceType());
                    } else {
                        handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(),
                                dataChangeEvent.getDatum());
                    }
                } else if (scope == DataChangeScopeEnum.CLIENT) {
                    handleClientOff((ClientChangeEvent) event);
                } else if (scope == DataChangeScopeEnum.SNAPSHOT) {
                    handleSnapshot((DatumSnapshotEvent) event);
                }
            } 
        }
    });
}

具体如下图:

+----------------------------+
      |   DataChangeEventCenter    |
      |                            |
      | +-----------------------+  |
      | | DataChangeEventQueue[]|  |
      | +-----------------------+  |
      +----------------------------+
                   |
                   |
                   v
+------------------+------------------------+
|          DataChangeEventQueue             |
|                                           |
| +---------------------------------------+ |
| |                                       | |
| |    BlockingQueue<IDataChangeEvent> +-------------+
| |                                       | |        |
| |                                       | |      +-v---------+
| | Map<String, Map<String, ChangeData<>  | | <--> |           |
| |                                       | |      | Executor  |
| |                                       | |      |           |
| |         start +------------------------------> |           |
| |                                       | |      +-+---------+
| |                                       | |        |
| |      DelayQueue<ChangeData>  <-------------------+
| |                                       | |
| +---------------------------------------+ |
+-------------------------------------------+

3.4.3 ChangeData

handleDatum 具体处理是把Datum转换为 ChangeData来处理,

为什么要转换成 ChangeData来存储呢。

因为无论是消息处理方式或者来源,都有不同的类型。 比如 在 NotifyFetchDatumHandler . fetchDatum 函数中,会先从其他 data server 获取 Datum,然后会根据 Datum 向dataChangeEventCenter中投放消息,通知本 Data Server 进行 BACKUP 操作,类型是 COVER 类型。

转换成 ChangeData就可以把消息处理方式或者来源统一起来处理

用户会存储一个包含 datum 的消息。

dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, DataSourceTypeEnum.BACKUP, datum);

DataChangeEventQueue 会从 DataChangeEvent 中获取 Datum,然后把 Datum 转换为 ChangeData,存储起来。

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType,
                         Datum targetDatum) {
            //get changed datum
            ChangeData changeData = getChangeData(targetDatum.getDataCenter(),
                targetDatum.getDataInfoId(), sourceType, changeType);
            Datum cacheDatum = changeData.getDatum();
            if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
                changeData.setDatum(targetDatum);
            } 
}

ChangeData 定义如下:

public class ChangeData implements Delayed {

    /** data changed */
    private Datum              datum;

    /** change time */
    private Long               gmtCreate;

    /** timeout */
    private long               timeout;

    private DataSourceTypeEnum sourceType;

    private DataChangeTypeEnum changeType;
}

3.4.4 处理Datum

3.4.4.1 加入Datum

这里是处理真实ChangeData缓存,以及新加入的Datum。

  • 首先从 CHANGE_DATA_MAP_FOR_MERGE 获取之前存储的变更的ChangeData,如果没有,就生成一个加入,此时要为后续可能的归并做准备;
  • 拿到ChangeData之后
    • 如果变更类型是 COVER,则会覆盖原有的缓存。changeData.setDatum(targetDatum);
    • 否则是MERGE,则会更新缓存中需要更新的新Datum,并且更新版本号;

具体如下:

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType,
                         Datum targetDatum) {
    lock.lock();
    try {
        //get changed datum
        ChangeData changeData = getChangeData(targetDatum.getDataCenter(),
            targetDatum.getDataInfoId(), sourceType, changeType);
        Datum cacheDatum = changeData.getDatum();
        if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
            changeData.setDatum(targetDatum);
        } else {
            Map<String, Publisher> targetPubMap = targetDatum.getPubMap();
            Map<String, Publisher> cachePubMap = cacheDatum.getPubMap();
            for (Publisher pub : targetPubMap.values()) {
                String registerId = pub.getRegisterId();
                Publisher cachePub = cachePubMap.get(registerId);
                if (cachePub != null) {
                    // if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means
                    // that pub is not the newest data, should be ignored
                    if (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) {
                        continue;
                    }
                    // if pub and cachePub both are publisher, and sourceAddress of both are equal,
                    // and version of cachePub is greater than version of pub, should be ignored
                    if (!(pub instanceof UnPublisher) && !(cachePub instanceof UnPublisher)
                        && pub.getSourceAddress().equals(cachePub.getSourceAddress())
                        && cachePub.getVersion() > pub.getVersion()) {
                        continue;
                    }
                }
                cachePubMap.put(registerId, pub);
                cacheDatum.setVersion(targetDatum.getVersion());
            }
        }
    } finally {
        lock.unlock();
    }
}

3.4.4.2 提出Datum

当提取时候,使用take函数,从CHANGE_QUEUE 和 CHANGE_DATA_MAP_FOR_MERGE 提出ChangeData。

public ChangeData take() throws InterruptedException {
    ChangeData changeData = CHANGE_QUEUE.take();
    lock.lock();
    try {
        removeMapForMerge(changeData);
        return changeData;
    } finally {
        lock.unlock();
    }
}

具体提取Datum会在DataChangeHandler。

3.5 DataChangeHandler

DataChangeHandler 会定期提取DataChangeEventCenter中的消息,然后进行处理 ,主要功能就是执行ChangeNotifier 来通知相关模块:hi,这里有新数据变化来到了,兄弟们走起来。

3.5.1 类定义

public class DataChangeHandler {

    @Autowired
    private DataServerConfig          dataServerConfig;

    @Autowired
    private DataChangeEventCenter     dataChangeEventCenter;

    @Autowired
    private DatumCache                datumCache;

    @Resource
    private List<IDataChangeNotifier> dataChangeNotifiers;
}

3.5.2 执行引擎ChangeNotifier

DataChangeHandler 会遍历 DataChangeEventCenter 中所有 DataChangeEventQueue,然后从 DataChangeEventQueue 之中取出ChangeData,针对每一个ChangeData,生成一个ChangeNotifier。

每个ChangeNotifier都是一个处理线程。

每个 dataChangeEventQueue 生成了 5 个 ChangeNotifier。

@PostConstruct
public void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
  
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                 final ChangeData changeData = dataChangeEventQueue.take();
                 notifyExecutor.execute(new ChangeNotifier(changeData, name));
            }
        });
    }
}

3.5.3 Notify

我们回顾下业务:

当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler ,Handler 会往 dataChangeEventCenter 中添加一个数据变更事件,用于异步地通知事件变更中心数据的变更。事件变更中心收到该事件之后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不同的事件类型异步地对上下线数据进行相应的处理。

对于 ChangeData,会生成 ChangeNotifier 进行处理。会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步

private class ChangeNotifier implements Runnable {

    private ChangeData changeData;
    private String     name;

    @Override
    public void run() {
        if (changeData instanceof SnapshotData) {
           ......
        } else {
            Datum datum = changeData.getDatum();

            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            DataSourceTypeEnum sourceType = changeData.getSourceType();
            DataChangeTypeEnum changeType = changeData.getChangeType();

            if (changeType == DataChangeTypeEnum.MERGE
                && sourceType != DataSourceTypeEnum.BACKUP
                && sourceType != DataSourceTypeEnum.SYNC) {
                //update version for pub or unPub merge to cache
                //if the version product before merge to cache,it may be cause small version override big one
                datum.updateVersion();
            }

            long version = datum.getVersion();

            try {
                if (sourceType == DataSourceTypeEnum.CLEAN) {
                    if (datumCache.cleanDatum(dataCenter, dataInfoId)) {
                      ......
                    }

                } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                    notifyTempPub(datum, sourceType, changeType);
                } else {
                    MergeResult mergeResult = datumCache.putDatum(changeType, datum);
                    Long lastVersion = mergeResult.getLastVersion();

                    if (lastVersion != null
                        && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
                        return;
                    }

                    //lastVersion null means first add datum
                    if (lastVersion == null || version != lastVersion) {
                        if (mergeResult.isChangeFlag()) {
                            notify(datum, sourceType, lastVersion);
                        }
                    }
                }
            } 
        }

    }
}

notify函数会遍历dataChangeNotifiers

private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
    for (IDataChangeNotifier notifier : dataChangeNotifiers) {
        if (notifier.getSuitableSource().contains(sourceType)) {
            notifier.notify(datum, lastVersion);
        }
    }
}

对应的Bean是:

@Bean(name = "dataChangeNotifiers")
public List<IDataChangeNotifier> dataChangeNotifiers() {
    List<IDataChangeNotifier> list = new ArrayList<>();
    list.add(sessionServerNotifier());
    list.add(tempPublisherNotifier());
    list.add(backUpNotifier());
    return list;
}

至于如何处理通知,我们后续会撰文处理。

至此,DataChangeEventCenter 整体逻辑如下图所示

+----------------------------+
                |   DataChangeEventCenter    |
                |                            |
                | +-----------------------+  |
                | | DataChangeEventQueue[]|  |
                | +-----------------------+  |
                +----------------------------+
                             |
                             |
                             v
          +------------------+------------------------+
          |          DataChangeEventQueue             |
          |                                           |
          | +---------------------------------------+ |
          | |                                       | |
          | |    BlockingQueue<IDataChangeEvent> +-------------+
          | |                                       | |        |
          | |                                       | |      +-v---------+
          | | Map<String, Map<String, ChangeData<>  | | <--> |           |
          | |                                       | |      | Executor  |
          | |                                       | |      |           |
          | |         start +------------------------------> |           |
          | |                                       | |      +-+---------+
          | |                                       | |        |
+----------------+ DelayQueue<ChangeData>  <-------------------+
|         | |                                       | |
|         | +---------------------------------------+ |
|         +-------------------------------------------+
|
|
|         +--------------------------+
|  take   |                          |    notify   +-------------------+
+-------> |    DataChangeHandler     | +---------> |dataChangeNotifiers|
          |                          |             +-------------------+
          +--------------------------+

手机如下图:

3IzQVfq.png!mobile

0x04 结论

因为独特的业务场景,所以阿里把 DataChangeEventCenter 单独分离出来,满足了以下业务需求。如果大家在实际工作中有类似的需求,可以参考借鉴,具体处理方式如下:

  • 需要提高处理能力,并行处理;
    • queue数组 实现,每一个Queue都可以处理消息,增加处理能力;
  • 异步处理消息;
    • 每一个Queue内部启动一个线程,用于异步处理;
  • 需要保证消息顺序;
    • eventQueue 用来存储投放的消息,所有消息block在queue上,这可以保证消息的顺序处理;
  • 有延迟操作;
    • 使用了DelayQueue来进行延迟操作;
  • 需要归并操作,即短期内会有多个通知来到,不需要逐一处理,只处理最后一个即可;
    • 使用putIfAbsent方法添加键值对,如果map集合中没有该key对应的值,则直接添加,并返回null,如果已经存在对应的值,则依旧为原来的值。 这样如果短期内向map中添加多个消息,这样就对多余的消息做了归并

0xFF 参考

Guava中EventBus分析


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK