作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发。
由于篇幅问题,我们将“Openflowplugin中Switch生命周期”这个大问题拆分为几个篇章:Switch生命周期对象ContextChain创建;控制节点的Master选举及ContextChain/Context服务实例化;MastershipChangeService以ReconciliationFramework;控制节点成为Slave;Switch下线过程。
本文为Openflowplugin(0.6.2)源码分析第五篇,深入OFP中Mastership及ReconciliationFramework是如何向北向应用提供服务。
附:
第一篇:(一)ODL OpenflowPlugin启动流程源码分析
第二篇:(二)ODL Openflowplugin Switch连上控制器Handshake过程源码分析
第三篇:(三)ODL Openflowplugin Switch生命周期对象ContextChain创建源码分析
第四篇:(四)ODL Openflowplugin Master选举及Context服务实例化源码分析
读者约定:基本掌握Opendaylight的思想/有一定实践经验,想要深入理解openflowplugin源码/想对openflowplugin源码修改。
回顾上一篇笔记,“当完成Switch的ContextChain在leader节点上实例化后,会调用MastershipChangeServiceManager方法,其为ofp底层对上层应用提供的钩子触发服务。当实例化完成(isMasterd)后,触发上层应用。如果使用ReconciliationFramework,则调用becomeMasterBeforeSubmittedDS方法,否则调用becomeMaster方法。”
如上述,无论是否使用ReconciliationFramework,成为Master均会调用ownershipChangeListener对象的方法:becomeMasterBeforeSubmittedDS或者becomeMaster。而ownershipChangeListener对象即MastershipChangeServiceManager!
1.MastershipChangeServiceManager是什么
首先MastershipChangeServiceManager是什么?
我们首先看到ContextChainHolderImpl中的ownershipChangeListener变量。在OpenFlowPluginProviderImpl
创建ContextChainHolderImpl
对象时传入的mastershipChangeServiceManager
。即ownershipChangeListener变量是
MastershipChangeServiceManager。
contextChainHolder = new ContextChainHolderImpl(
executorService,
singletonServicesProvider,
entityOwnershipService,
mastershipChangeServiceManager);
contextChainHolder=newContextChainHolderImpl(
executorService,
singletonServicesProvider,
entityOwnershipService,
mastershipChangeServiceManager);
继续深入,mastershipChangeServiceManager是在openflowplugin.xml
中创建OpenFlowPluginProviderImpl
时传入。是在blueprint中引用的服务:
<reference id="mastershipChangeServiceManager"
interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>
<reference id="mastershipChangeServiceManager"
interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>
我们需要找到mastershipChangeServiceManager是怎么注入到OSGi环境的?一步一步探索!最终定位到mastershipChangeServiceManager是在openflowplugin/openflowplugin-impl/src/main/resources/org/opendaylight/blueprint/openflowplugin-impl.xml
中实例化,并注入到OSGI:
<bean id="mastershipChangeServiceManager" class="org.opendaylight.openflowplugin.impl.mastership.MastershipChangeServiceManagerImpl"/>
<service ref="mastershipChangeServiceManager" interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>
<bean id="mastershipChangeServiceManager"class="org.opendaylight.openflowplugin.impl.mastership.MastershipChangeServiceManagerImpl"/>
<service ref="mastershipChangeServiceManager"interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>
至此,我们可以找到MastershipChangeServiceManager完整调用链!下面继续来深入MastershipChangeServiceManager的两个方法becomeMasterBeforeSubmittedDS和becomeMaster。
回顾上篇笔记,我们可以看到当Switch contextChain选举成功后,且各个Context均初始化完成后会根据是否使用reconciliationFramework来判断调用MastershipChangeServiceManager.becomeMasterBeforeSubmittedDS
方法还是MastershipChangeServiceManager.becomeMaster
。这两个也是OpenflowPlugin留给我们的钩子,让上层应用可以感知device上线后写选举初始化完成,触发我们上层北向应用!
2.MastershipChangeServiceManager最初设计
先来看看不使用ReconciliationFramework,是怎样的过程?ReconciliationFramework在下面再展开!
当Switch ContextChain选举成功且实例化完成后,当没有使用ReconciliationFramework时就会调用此方法becomeMaster
。我个人理解,这个Openflowplugin最开始设计提供钩子,在最开始设计是没有ReconciliationFramework,我们的应用同样可以感知“Switch选举后且完整初始化完成(isMaster)”,下面展开。
MastershipChangeServiceManager.becomeMaster
方法,核心逻辑是调用serviceGroup
变量中所有的service的onBecomeOwner
方法:
@Override
public void becomeMaster(@Nonnull final DeviceInfo deviceInfo) {
serviceGroup.forEach(mastershipChangeService -> mastershipChangeService.onBecomeOwner(deviceInfo));
}
@Override
publicvoidbecomeMaster(@NonnullfinalDeviceInfo deviceInfo){
serviceGroup.forEach(mastershipChangeService->mastershipChangeService.onBecomeOwner(deviceInfo));
}
而serviceGroup
变量中的service是通过调用MastershipChangeServiceManager.register
方法注册:
@Nonnull
@Override
public MastershipChangeRegistration register(@Nonnull MastershipChangeService service) {
final MastershipServiceDelegate registration =
new MastershipServiceDelegate(service, () -> serviceGroup.remove(service));
serviceGroup.add(service);
if (masterChecker != null && masterChecker.isAnyDeviceMastered()) {
masterChecker.listOfMasteredDevices().forEach(service::onBecomeOwner);
}
return registration;
}
@Nonnull
@Override
publicMastershipChangeRegistration register(@NonnullMastershipChangeService service){
finalMastershipServiceDelegate registration=
newMastershipServiceDelegate(service,()->serviceGroup.remove(service));
serviceGroup.add(service);
if(masterChecker!=null&&masterChecker.isAnyDeviceMastered()){
masterChecker.listOfMasteredDevices().forEach(service::onBecomeOwner);
}
returnregistration;
}
所以在不使用ReconciliationFramework情况下,上层应用可以直接在OSGi引用MastershipChangeServiceManager,调用其register方法注册服务,这样当ofp成为Switch的Master就会触发我们注册的服务(实现特定接口:实现方法onBecomeOwner等)。
可以看到becomeMaster最为直接粗暴,这样做有什么缺点?为什么后面会出现ReconciliationFramework,它是为了解决什么问题?
官方解读:When the switch is connected, all the applications including FRM(Forwarding Rules Manager) will receive the node added DTCN(Data Tree Change Listener) and starts pushing the flows for the openflow switch. FRM reconciliation will read the data from the config and starts pushing the flows one by one. In the meantime, applications can react to the node added DTCN change and will start pushing the flows through the config DS. With this, there is a high chance the application flow can be overwritten by the old flows by FRM via reconciliation.
个人理解,当上层有很多个应用时,那么当成为Master调用becomeMaster方法,各个方法都马上执行,可能会发生冲突,导致流表出错等。且只能靠应用注册先后顺序,来决定开始执行顺序。
3.ReconciliationFramework
看到没有ReconciliationFramework情况下,最开始原生的MastershipChangeServiceManager触发上层应用是有可能导致问题发生。那么ReconciliationFramework解决的是什么?
回顾上一篇笔记,当Switch contextChain选举成功后,且各个context均初始化完成后,当使用了ReconciliationFramework,在ContextChainHolderImpl的onMasterRoleAcquired方法中就会调用方法MastershipChangeServiceManager.becomeMasterBeforeSubmittedDS
。在展开此方法前,我们先来了解一下ReconciliationFramework。
3.1 ReconciliationFramework是什么
个人理解ReconciliationFramework是用于协调各个应用与switch上线的框架。可实现switch上线,按优先级触发北向注册的应用。
3.2 ReconciliationFramework初始化
我们先看ReconciliationFramework怎么与MastershipChangeServiceManager发生关系,再来看becomeMasterBeforeSubmittedDS方法会触发什么。
我们可以在reconciliation-framework.xml
中看到:
<reference id="mastershipChangeServiceManager"
interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>
<bean id="reconciliationManagerImpl"
class="org.opendaylight.openflowplugin.applications.reconciliation.impl.ReconciliationManagerImpl"
init-method="start"
destroy-method="close">
<argument ref="mastershipChangeServiceManager"/>
</bean>
<reference id="mastershipChangeServiceManager"
interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>
<bean id="reconciliationManagerImpl"
class="org.opendaylight.openflowplugin.applications.reconciliation.impl.ReconciliationManagerImpl"
init-method="start"
destroy-method="close">
<argument ref="mastershipChangeServiceManager"/>
</bean>
在此blueprint中引用了mastershipChangeServiceManager,实例化ReconciliationManagerImpl传入mastershipChangeServiceManager,并调用ReconciliationManagerImpl的start方法。
在ReconciliationManagerImpl
的start方法中,调用了MastershipChangeServiceManager的reconciliationFrameworkRegistration方法。
public ReconciliationManagerImpl(MastershipChangeServiceManager mastershipChangeServiceManager) {
this.mastershipChangeServiceManager = Preconditions
.checkNotNull(mastershipChangeServiceManager, "MastershipChangeServiceManager can not be null!");
}
public void start() throws MastershipChangeException {
mastershipChangeServiceManager.reconciliationFrameworkRegistration(this);
LOG.info("ReconciliationManager has started successfully.");
}
publicReconciliationManagerImpl(MastershipChangeServiceManager mastershipChangeServiceManager){
this.mastershipChangeServiceManager=Preconditions
.checkNotNull(mastershipChangeServiceManager,"MastershipChangeServiceManager can not be null!");
}
publicvoidstart()throwsMastershipChangeException{
mastershipChangeServiceManager.reconciliationFrameworkRegistration(this);
LOG.info("ReconciliationManager has started successfully.");
}
MastershipChangeServiceManager.reconciliationFrameworkRegistration
方法作用是给MastershipChangeServiceManager对象设置rfService
变量。
@Override
public ReconciliationFrameworkRegistration reconciliationFrameworkRegistration(
@Nonnull ReconciliationFrameworkEvent reconciliationFrameworkEvent) throws MastershipChangeException {
if (rfService != null) {
throw new MastershipChangeException("Reconciliation framework already registered.");
} else {
// reconciliationFrameworkEvent是ReconciliationManagerImpl
rfService = reconciliationFrameworkEvent;
return new ReconciliationFrameworkServiceDelegate(reconciliationFrameworkEvent, () -> rfService = null);
}
}
@Override
publicReconciliationFrameworkRegistration reconciliationFrameworkRegistration(
@NonnullReconciliationFrameworkEvent reconciliationFrameworkEvent)throwsMastershipChangeException{
if(rfService!=null){
thrownewMastershipChangeException("Reconciliation framework already registered.");
}else{
// reconciliationFrameworkEvent是ReconciliationManagerImpl
rfService=reconciliationFrameworkEvent;
returnnewReconciliationFrameworkServiceDelegate(reconciliationFrameworkEvent,()->rfService=null);
}
}
可以看到在ReconciliationManagerImpl实例化并调用start过程中,是将自身传入给MastershipChangeServiceManager的rfService变量。接下来我们看看becomeMasterBeforeSubmittedDS方法是怎么与其挂钩。
3.3 ReconciliationFramework触发服务
回顾上一篇笔记,当Switch contextChain选举成功后,且各个context均初始化完成后,当使用了ReconciliationFramework就会调用方法MastershipChangeServiceManager.becomeMasterBeforeSubmittedDS
。
在MastershipChangeServiceManager.becomeMasterBeforeSubmittedDS
方法中,是调用rfService.onDevicePrepared(deviceInfo)
。
@Override
public ListenableFuture<ResultState> becomeMasterBeforeSubmittedDS(@Nonnull DeviceInfo deviceInfo) {
// 调用的是ReconciliationManagerImpl.onDevicePrepared. 最终会调用注册到framework的service的startReconciliation方法
return rfService == null ? null : rfService.onDevicePrepared(deviceInfo);
}
@Override
publicListenableFuture<ResultState>becomeMasterBeforeSubmittedDS(@NonnullDeviceInfo deviceInfo){
// 调用的是ReconciliationManagerImpl.onDevicePrepared. 最终会调用注册到framework的service的startReconciliation方法
returnrfService==null?null:rfService.onDevicePrepared(deviceInfo);
}
即调用ReconciliationManagerImpl.onDevicePrepared(deviceInfo)
:
@Override
public ListenableFuture<ResultState> onDevicePrepared(@Nonnull DeviceInfo node) {
LOG.debug("Triggering reconciliation for node : {}", node.getNodeId());
return futureMap.computeIfAbsent(node, value -> reconcileNode(node));
}
@Override
publicListenableFuture<ResultState>onDevicePrepared(@NonnullDeviceInfo node){
LOG.debug("Triggering reconciliation for node : {}",node.getNodeId());
returnfutureMap.computeIfAbsent(node,value->reconcileNode(node));
}
onDevicePrepared方法中,会调用reconcileNode方法,传入参数为node(switch):
private ListenableFuture<ResultState> reconcileNode(DeviceInfo node) {
ListenableFuture<ResultState> lastFuture = Futures.immediateFuture(null);
for (List<ReconciliationNotificationListener> services : registeredServices.values()) {
lastFuture = reconcileServices(lastFuture, services, node);
}
return lastFuture;
}
privateListenableFuture<ResultState>reconcileNode(DeviceInfo node){
ListenableFuture<ResultState>lastFuture=Futures.immediateFuture(null);
for(List<ReconciliationNotificationListener>services:registeredServices.values()){
lastFuture=reconcileServices(lastFuture,services,node);
}
returnlastFuture;
}
在reconcileNode方法中是轮询registeredServices
变量中的services并调用reconcileServices
方法:其逻辑是调用每个service的startReconciliation
方法!这里我们可以看到传入的是一个List,提前给出结论这是一系列优先级相同的服务!
private ListenableFuture<ResultState> reconcileServices(ListenableFuture<ResultState> prevFuture,
List<ReconciliationNotificationListener>
servicesForPriority,
DeviceInfo node) {
return Futures.transformAsync(prevFuture, prevResult -> Futures.transform(Futures.allAsList(
// 调用了service的startReconciliation方法
servicesForPriority.stream().map(service -> service.startReconciliation(node))
.collect(Collectors.toList())), results -> decidedResultState.get(),
MoreExecutors.directExecutor()),
MoreExecutors.directExecutor());
}
privateListenableFuture<ResultState>reconcileServices(ListenableFuture<ResultState>prevFuture,
List<ReconciliationNotificationListener>
servicesForPriority,
DeviceInfo node){
returnFutures.transformAsync(prevFuture,prevResult->Futures.transform(Futures.allAsList(
// 调用了service的startReconciliation方法
servicesForPriority.stream().map(service->service.startReconciliation(node))
.collect(Collectors.toList())),results->decidedResultState.get(),
MoreExecutors.directExecutor()),
MoreExecutors.directExecutor());
}
回顾ReconciliationFramework!注册到framework的service都必须实现特定接口,会实现上述的startReconciliation方法!
总结,当节点成为Switch的Master,如果使用ReconciliationFramework,最终是会调用到注册到ReconciliationManagerImpl
中的service的startReconciliation
方法。而注册到ReconciliationManagerImpl
中的service是注册到ReconciliationFramework的应用,即我们北向实现的应用。所以我们可以将service注册到ReconciliationFramework来感知device选举后且完整初始化后的上线。
3.4 ReconciliationFramework回调
在onMasterRoleAcquired方法的逻辑中,异步执行becomeMasterBeforeSubmittedDS
方法(即上述注册到reconciliationFramework的service的startReconciliation方法异步执行),当完成后会回调reconciliationFrameworkCallback
:
Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
reconciliationFrameworkCallback(deviceInfo, contextChain),
MoreExecutors.directExecutor());
Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
reconciliationFrameworkCallback(deviceInfo,contextChain),
MoreExecutors.directExecutor());
这里有一个关注点,在contextChain.isMastered
中是会修改contextChainImpl和各个context的状态为WORKING_MASTER。而我们使用了reconciliationFramework时,不会改变状态(根据isMastered方法传入第二个参数,不在reconciliationFramework步骤时才会设置状态,请回顾前面isMastered方法!)。
那么在使用reconciliationFramework时什么时候会设置contextChainImpl和各个context的状态?
答案是:回调reconciliationFrameworkCallback
中:
private FutureCallback<ResultState> reconciliationFrameworkCallback(@Nonnull DeviceInfo deviceInfo,
ContextChain contextChain) {
return new FutureCallback<ResultState>() {
@Override
public void onSuccess(@Nullable ResultState result) {
if (ResultState.DONOTHING == result) {
LOG.info("Device {} connection is enabled by reconciliation framework.", deviceInfo);
// 注意:在reconciliationFramework处理完,再调用此方法,后续会再次调用到上面的 onMasterRoleAcquired且状态为INIT_SUBMIT, 在这个情况下就会进入else if设置状态为WORKING_MASTER
contextChain.continueInitializationAfterReconciliation();
} else {
LOG.warn("Reconciliation framework failure for device {}", deviceInfo);
destroyContextChain(deviceInfo);
}
}
@Override
public void onFailure(@Nonnull Throwable throwable) {
LOG.warn("Reconciliation framework failure.");
destroyContextChain(deviceInfo);
}
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
privateFutureCallback<ResultState>reconciliationFrameworkCallback(@NonnullDeviceInfo deviceInfo,
ContextChain contextChain){
returnnewFutureCallback<ResultState>(){
@Override
publicvoidonSuccess(@NullableResultState result){
if(ResultState.DONOTHING==result){
LOG.info("Device {} connection is enabled by reconciliation framework.",deviceInfo);
// 注意:在reconciliationFramework处理完,再调用此方法,后续会再次调用到上面的 onMasterRoleAcquired且状态为INIT_SUBMIT, 在这个情况下就会进入else if设置状态为WORKING_MASTER
contextChain.continueInitializationAfterReconciliation();
}else{
LOG.warn("Reconciliation framework failure for device {}",deviceInfo);
destroyContextChain(deviceInfo);
}
}
@Override
publicvoidonFailure(@NonnullThrowable throwable){
LOG.warn("Reconciliation framework failure.");
destroyContextChain(deviceInfo);
}
};
}
其逻辑是当reconciliationFramework中各个注册service的startReconciliation
执行成功后,回调时调用statisticsContext.continueInitializationAfterReconciliation
方法:在上面也提到
@Override
public void continueInitializationAfterReconciliation() {
if (deviceContext.initialSubmitTransaction()) {
contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT);
// 收集
startGatheringData();
} else {
contextChainMastershipWatcher
.onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted.");
}
}
@Override
publicvoidcontinueInitializationAfterReconciliation(){
if(deviceContext.initialSubmitTransaction()){
contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo,ContextChainMastershipState.INITIAL_SUBMIT);
// 收集
startGatheringData();
}else{
contextChainMastershipWatcher
.onNotAbleToStartMastershipMandatory(deviceInfo,"Initial transaction cannot be submitted.");
}
}
此时会再次执行ContextChainHolderImpl.onMasterRoleAcquired
方法,且状态是INITIAL_SUBMIT
!这很关键!此时再调用onMasterRoleAcquired
方法:
@Override
public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo,
@Nonnull final ContextChainMastershipState mastershipState) {
Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
if (ownershipChangeListener.isReconciliationFrameworkRegistered()
&& !ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)) {
if (contextChain.isMastered(mastershipState, true)) {
Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
reconciliationFrameworkCallback(deviceInfo, contextChain),
MoreExecutors.directExecutor());
}
}
else if (contextChain.isMastered(mastershipState, false)) {
LOG.info("Role MASTER was granted to device {}", deviceInfo);
// 钩子触发上层应用(不使用ReconciliationFramework情况下的钩子)
ownershipChangeListener.becomeMaster(deviceInfo);
// TODO:
deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
publicvoidonMasterRoleAcquired(@NonnullfinalDeviceInfo deviceInfo,
@NonnullfinalContextChainMastershipState mastershipState){
Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain->{
if(ownershipChangeListener.isReconciliationFrameworkRegistered()
&&!ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)){
if(contextChain.isMastered(mastershipState,true)){
Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
reconciliationFrameworkCallback(deviceInfo,contextChain),
MoreExecutors.directExecutor());
}
}
elseif(contextChain.isMastered(mastershipState,false)){
LOG.info("Role MASTER was granted to device {}",deviceInfo);
// 钩子触发上层应用(不使用ReconciliationFramework情况下的钩子)
ownershipChangeListener.becomeMaster(deviceInfo);
// TODO:
deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
}
});
}
我们关注if逻辑:此时虽然isReconciliationFrameworkRegistered
为true,但是此次状态为INITIAL_SUBMIT,即不会进入if代码块(触发ReconciliationFramework)
if (ownershipChangeListener.isReconciliationFrameworkRegistered()
&& !ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)) {
if(ownershipChangeListener.isReconciliationFrameworkRegistered()
&&!ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)){
而是进入else if逻辑:
即会设置contextChain状态和各个context状态为WORKING_MASTER
(这里isMastered方法的第二个参数体现出作用),且会调用ownershipChangeListener.becomeMaster(deviceInfo);
方法。即,调用原生注册到MastershipChangeServiceManager的service(不是注册到ReconciliationFramework的service)
else if (contextChain.isMastered(mastershipState, false)) {
LOG.info("Role MASTER was granted to device {}", deviceInfo);
ownershipChangeListener.becomeMaster(deviceInfo);
deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
elseif(contextChain.isMastered(mastershipState,false)){
LOG.info("Role MASTER was granted to device {}",deviceInfo);
ownershipChangeListener.becomeMaster(deviceInfo);
deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
总结一下:调用ReconciliationFramework会在调用完成注册到framework的service后,才会开启持续收集静态数据,才会设置WORKING_MASTER状态。同样的,也会执行原生注册到MastershipChangeServiceManagerImpl
的service。
3.5 ReconciliationFramework优点
- 北向应用服务可以按优先级注册,device上线也是按优先级调用各个servivce;
- 会先执行北向应用再开启收集数据;
- 使用reconciliationFramework同时也会执行原生注册
MastershipChangeServiceManagerImpl
的service
Priority based/Ordered Coordination of Reconciliation across multiple applications.
3.6 如何注册ReconciliationFramework服务
Service需要实现ReconciliationNotificationListener接口,并调用ReconciliationManagerImpl.registerService
将service注册到ReconciliationFramework。我们不妨来关注一下注册方法,其核心逻辑是会将相同优先级服务放入同一个list,key为优先级。将key和serviceList存入Map!那么是怎么实现上述的按优先级触发服务,关键在于这个Map对象类型!
@Override
public NotificationRegistration registerService(ReconciliationNotificationListener reconciliationTask) {
LOG.debug("Registered service {} with priority {} and intent {}", reconciliationTask.getName(),
reconciliationTask.getPriority(), reconciliationTask.getResultState());
registeredServices.computeIfAbsent(reconciliationTask.getPriority(), services -> new ArrayList<>())
.add(reconciliationTask);
ReconciliationServiceDelegate registration = new ReconciliationServiceDelegate(reconciliationTask, () -> {
LOG.debug("Service un-registered from Reconciliation framework {}", reconciliationTask.getName());
registeredServices.computeIfPresent(reconciliationTask.getPriority(), (priority, services) -> services)
.remove(reconciliationTask);
decideResultState(reconciliationTask.getResultState());
});
decideResultState(reconciliationTask.getResultState());
return registration;
}
@Override
publicNotificationRegistration registerService(ReconciliationNotificationListener reconciliationTask){
LOG.debug("Registered service {} with priority {} and intent {}",reconciliationTask.getName(),
reconciliationTask.getPriority(),reconciliationTask.getResultState());
registeredServices.computeIfAbsent(reconciliationTask.getPriority(),services->newArrayList<>())
.add(reconciliationTask);
ReconciliationServiceDelegate registration=newReconciliationServiceDelegate(reconciliationTask,()->{
LOG.debug("Service un-registered from Reconciliation framework {}",reconciliationTask.getName());
registeredServices.computeIfPresent(reconciliationTask.getPriority(),(priority,services)->services)
.remove(reconciliationTask);
decideResultState(reconciliationTask.getResultState());
});
decideResultState(reconciliationTask.getResultState());
returnregistration;
}
我们关注存服务优先级及各组服务引用的变量registeredServices
,它是ConcurrentSkipListMap
对象,其是根据key进行排序的Map!
private final Map<Integer, List<ReconciliationNotificationListener>> registeredServices =
new ConcurrentSkipListMap<>();
privatefinalMap<Integer,List<ReconciliationNotificationListener>>registeredServices=
newConcurrentSkipListMap<>();
注册到ReconciliationManagerImpl
中的service需要具体实现一些方法,比如startReconciliation
,具体可参考文档
4.总结
通过本文,我们可以了解到当节点成为Switch Master后是如何通过MastershipChangeServiceManager和ReconciliationFramework触发北向应用,我们作为开发者是如何将服务注册到MastershipChangeServiceManager和ReconciliationFramework。