61

(五)ODL Openflowplugin Mastership及ReconciliationFramework源码分析

 6 years ago
source link: https://www.sdnlab.com/22391.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发。

由于篇幅问题,我们将“Openflowplugin中Switch生命周期”这个大问题拆分为几个篇章:Switch生命周期对象ContextChain创建;控制节点的Master选举及ContextChain/Context服务实例化;MastershipChangeService以ReconciliationFramework;控制节点成为Slave;Switch下线过程。
本文为Openflowplugin(0.6.2)源码分析第五篇,深入OFP中Mastership及ReconciliationFramework是如何向北向应用提供服务。

附:
第一篇:(一)ODL OpenflowPlugin启动流程源码分析
第二篇:(二)ODL Openflowplugin Switch连上控制器Handshake过程源码分析
第三篇:(三)ODL Openflowplugin Switch生命周期对象ContextChain创建源码分析
第四篇:(四)ODL Openflowplugin Master选举及Context服务实例化源码分析

读者约定:基本掌握Opendaylight的思想/有一定实践经验,想要深入理解openflowplugin源码/想对openflowplugin源码修改。

回顾上一篇笔记,“当完成Switch的ContextChain在leader节点上实例化后,会调用MastershipChangeServiceManager方法,其为ofp底层对上层应用提供的钩子触发服务。当实例化完成(isMasterd)后,触发上层应用。如果使用ReconciliationFramework,则调用becomeMasterBeforeSubmittedDS方法,否则调用becomeMaster方法。”

如上述,无论是否使用ReconciliationFramework,成为Master均会调用ownershipChangeListener对象的方法:becomeMasterBeforeSubmittedDS或者becomeMaster。而ownershipChangeListener对象即MastershipChangeServiceManager!

1.MastershipChangeServiceManager是什么

首先MastershipChangeServiceManager是什么?

我们首先看到ContextChainHolderImpl中的ownershipChangeListener变量。在OpenFlowPluginProviderImpl创建ContextChainHolderImpl对象时传入的mastershipChangeServiceManager。即ownershipChangeListener变量是
MastershipChangeServiceManager。

Java
contextChainHolder = new ContextChainHolderImpl( executorService, singletonServicesProvider, entityOwnershipService, mastershipChangeServiceManager);
1
2
3
4
5
contextChainHolder=newContextChainHolderImpl(
                executorService,
                singletonServicesProvider,
                entityOwnershipService,
                mastershipChangeServiceManager);

继续深入,mastershipChangeServiceManager是在openflowplugin.xml中创建OpenFlowPluginProviderImpl时传入。是在blueprint中引用的服务:

Java
<reference id="mastershipChangeServiceManager" interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>
1
2
<reference id="mastershipChangeServiceManager"
               interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>

我们需要找到mastershipChangeServiceManager是怎么注入到OSGi环境的?一步一步探索!最终定位到mastershipChangeServiceManager是在openflowplugin/openflowplugin-impl/src/main/resources/org/opendaylight/blueprint/openflowplugin-impl.xml中实例化,并注入到OSGI:

Java
<bean id="mastershipChangeServiceManager" class="org.opendaylight.openflowplugin.impl.mastership.MastershipChangeServiceManagerImpl"/> <service ref="mastershipChangeServiceManager" interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>
1
2
<bean id="mastershipChangeServiceManager"class="org.opendaylight.openflowplugin.impl.mastership.MastershipChangeServiceManagerImpl"/>
  <service ref="mastershipChangeServiceManager"interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>

至此,我们可以找到MastershipChangeServiceManager完整调用链!下面继续来深入MastershipChangeServiceManager的两个方法becomeMasterBeforeSubmittedDS和becomeMaster。

回顾上篇笔记,我们可以看到当Switch contextChain选举成功后,且各个Context均初始化完成后会根据是否使用reconciliationFramework来判断调用MastershipChangeServiceManager.becomeMasterBeforeSubmittedDS方法还是MastershipChangeServiceManager.becomeMaster这两个也是OpenflowPlugin留给我们的钩子,让上层应用可以感知device上线后写选举初始化完成,触发我们上层北向应用!

2.MastershipChangeServiceManager最初设计

先来看看不使用ReconciliationFramework,是怎样的过程?ReconciliationFramework在下面再展开!

当Switch ContextChain选举成功且实例化完成后,当没有使用ReconciliationFramework时就会调用此方法becomeMaster我个人理解,这个Openflowplugin最开始设计提供钩子,在最开始设计是没有ReconciliationFramework,我们的应用同样可以感知“Switch选举后且完整初始化完成(isMaster)”,下面展开。

MastershipChangeServiceManager.becomeMaster方法,核心逻辑是调用serviceGroup变量中所有的service的onBecomeOwner方法:

Java
@Override public void becomeMaster(@Nonnull final DeviceInfo deviceInfo) { serviceGroup.forEach(mastershipChangeService -> mastershipChangeService.onBecomeOwner(deviceInfo)); }
1
2
3
4
@Override
    publicvoidbecomeMaster(@NonnullfinalDeviceInfo deviceInfo){
        serviceGroup.forEach(mastershipChangeService->mastershipChangeService.onBecomeOwner(deviceInfo));
    }

serviceGroup变量中的service是通过调用MastershipChangeServiceManager.register方法注册:

Java
@Nonnull @Override public MastershipChangeRegistration register(@Nonnull MastershipChangeService service) { final MastershipServiceDelegate registration = new MastershipServiceDelegate(service, () -> serviceGroup.remove(service)); serviceGroup.add(service); if (masterChecker != null && masterChecker.isAnyDeviceMastered()) { masterChecker.listOfMasteredDevices().forEach(service::onBecomeOwner); } return registration; }
1
2
3
4
5
6
7
8
9
10
11
@Nonnull
    @Override
    publicMastershipChangeRegistration register(@NonnullMastershipChangeService service){
        finalMastershipServiceDelegate registration=
                newMastershipServiceDelegate(service,()->serviceGroup.remove(service));
        serviceGroup.add(service);
        if(masterChecker!=null&&masterChecker.isAnyDeviceMastered()){
            masterChecker.listOfMasteredDevices().forEach(service::onBecomeOwner);
        }
        returnregistration;
    }

所以在不使用ReconciliationFramework情况下,上层应用可以直接在OSGi引用MastershipChangeServiceManager,调用其register方法注册服务,这样当ofp成为Switch的Master就会触发我们注册的服务(实现特定接口:实现方法onBecomeOwner等)。

可以看到becomeMaster最为直接粗暴,这样做有什么缺点?为什么后面会出现ReconciliationFramework,它是为了解决什么问题?

官方解读:When the switch is connected, all the applications including FRM(Forwarding Rules Manager) will receive the node added DTCN(Data Tree Change Listener) and starts pushing the flows for the openflow switch. FRM reconciliation will read the data from the config and starts pushing the flows one by one. In the meantime, applications can react to the node added DTCN change and will start pushing the flows through the config DS. With this, there is a high chance the application flow can be overwritten by the old flows by FRM via reconciliation.

个人理解,当上层有很多个应用时,那么当成为Master调用becomeMaster方法,各个方法都马上执行,可能会发生冲突,导致流表出错等。且只能靠应用注册先后顺序,来决定开始执行顺序。

3.ReconciliationFramework

看到没有ReconciliationFramework情况下,最开始原生的MastershipChangeServiceManager触发上层应用是有可能导致问题发生。那么ReconciliationFramework解决的是什么?

回顾上一篇笔记,当Switch contextChain选举成功后,且各个context均初始化完成后,当使用了ReconciliationFramework,在ContextChainHolderImpl的onMasterRoleAcquired方法中就会调用方法MastershipChangeServiceManager.becomeMasterBeforeSubmittedDS。在展开此方法前,我们先来了解一下ReconciliationFramework。

3.1 ReconciliationFramework是什么

个人理解ReconciliationFramework是用于协调各个应用与switch上线的框架。可实现switch上线,按优先级触发北向注册的应用。

3.2 ReconciliationFramework初始化

我们先看ReconciliationFramework怎么与MastershipChangeServiceManager发生关系,再来看becomeMasterBeforeSubmittedDS方法会触发什么。

我们可以在reconciliation-framework.xml中看到:

Java
<reference id="mastershipChangeServiceManager" interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/> <bean id="reconciliationManagerImpl" class="org.opendaylight.openflowplugin.applications.reconciliation.impl.ReconciliationManagerImpl" init-method="start" destroy-method="close"> <argument ref="mastershipChangeServiceManager"/> </bean>
1
2
3
4
5
6
7
8
<reference id="mastershipChangeServiceManager"
             interface="org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager"/>
  <bean id="reconciliationManagerImpl"
        class="org.opendaylight.openflowplugin.applications.reconciliation.impl.ReconciliationManagerImpl"
        init-method="start"
        destroy-method="close">
    <argument ref="mastershipChangeServiceManager"/>
  </bean>

在此blueprint中引用了mastershipChangeServiceManager,实例化ReconciliationManagerImpl传入mastershipChangeServiceManager,并调用ReconciliationManagerImpl的start方法。

ReconciliationManagerImpl的start方法中,调用了MastershipChangeServiceManager的reconciliationFrameworkRegistration方法。

Java

public ReconciliationManagerImpl(MastershipChangeServiceManager mastershipChangeServiceManager) { this.mastershipChangeServiceManager = Preconditions .checkNotNull(mastershipChangeServiceManager, "MastershipChangeServiceManager can not be null!"); }

public void start() throws MastershipChangeException { mastershipChangeServiceManager.reconciliationFrameworkRegistration(this); LOG.info("ReconciliationManager has started successfully."); }

1
2
3
4
5
6
7
8
9
publicReconciliationManagerImpl(MastershipChangeServiceManager mastershipChangeServiceManager){
        this.mastershipChangeServiceManager=Preconditions
                .checkNotNull(mastershipChangeServiceManager,"MastershipChangeServiceManager can not be null!");
    }
 
    publicvoidstart()throwsMastershipChangeException{
        mastershipChangeServiceManager.reconciliationFrameworkRegistration(this);
        LOG.info("ReconciliationManager has started successfully.");
    }

MastershipChangeServiceManager.reconciliationFrameworkRegistration方法作用是给MastershipChangeServiceManager对象设置rfService变量。

Java
@Override public ReconciliationFrameworkRegistration reconciliationFrameworkRegistration( @Nonnull ReconciliationFrameworkEvent reconciliationFrameworkEvent) throws MastershipChangeException { if (rfService != null) { throw new MastershipChangeException("Reconciliation framework already registered."); } else { // reconciliationFrameworkEvent是ReconciliationManagerImpl rfService = reconciliationFrameworkEvent; return new ReconciliationFrameworkServiceDelegate(reconciliationFrameworkEvent, () -> rfService = null); } }
1
2
3
4
5
6
7
8
9
10
11
@Override
    publicReconciliationFrameworkRegistration reconciliationFrameworkRegistration(
            @NonnullReconciliationFrameworkEvent reconciliationFrameworkEvent)throwsMastershipChangeException{
        if(rfService!=null){
            thrownewMastershipChangeException("Reconciliation framework already registered.");
        }else{
            // reconciliationFrameworkEvent是ReconciliationManagerImpl
            rfService=reconciliationFrameworkEvent;
            returnnewReconciliationFrameworkServiceDelegate(reconciliationFrameworkEvent,()->rfService=null);
        }
    }

可以看到在ReconciliationManagerImpl实例化并调用start过程中,是将自身传入给MastershipChangeServiceManager的rfService变量。接下来我们看看becomeMasterBeforeSubmittedDS方法是怎么与其挂钩。

3.3 ReconciliationFramework触发服务

回顾上一篇笔记,当Switch contextChain选举成功后,且各个context均初始化完成后,当使用了ReconciliationFramework就会调用方法MastershipChangeServiceManager.becomeMasterBeforeSubmittedDS

MastershipChangeServiceManager.becomeMasterBeforeSubmittedDS方法中,是调用rfService.onDevicePrepared(deviceInfo)

Java
@Override public ListenableFuture<ResultState> becomeMasterBeforeSubmittedDS(@Nonnull DeviceInfo deviceInfo) { // 调用的是ReconciliationManagerImpl.onDevicePrepared. 最终会调用注册到framework的service的startReconciliation方法 return rfService == null ? null : rfService.onDevicePrepared(deviceInfo); }
1
2
3
4
5
@Override
    publicListenableFuture<ResultState>becomeMasterBeforeSubmittedDS(@NonnullDeviceInfo deviceInfo){
        // 调用的是ReconciliationManagerImpl.onDevicePrepared. 最终会调用注册到framework的service的startReconciliation方法
        returnrfService==null?null:rfService.onDevicePrepared(deviceInfo);
    }

即调用ReconciliationManagerImpl.onDevicePrepared(deviceInfo)

Java
@Override public ListenableFuture<ResultState> onDevicePrepared(@Nonnull DeviceInfo node) { LOG.debug("Triggering reconciliation for node : {}", node.getNodeId()); return futureMap.computeIfAbsent(node, value -> reconcileNode(node)); }
1
2
3
4
5
@Override
    publicListenableFuture<ResultState>onDevicePrepared(@NonnullDeviceInfo node){
        LOG.debug("Triggering reconciliation for node : {}",node.getNodeId());
        returnfutureMap.computeIfAbsent(node,value->reconcileNode(node));
    }

onDevicePrepared方法中,会调用reconcileNode方法,传入参数为node(switch):

Java
private ListenableFuture<ResultState> reconcileNode(DeviceInfo node) { ListenableFuture<ResultState> lastFuture = Futures.immediateFuture(null); for (List<ReconciliationNotificationListener> services : registeredServices.values()) { lastFuture = reconcileServices(lastFuture, services, node); } return lastFuture; }
1
2
3
4
5
6
7
privateListenableFuture<ResultState>reconcileNode(DeviceInfo node){
        ListenableFuture<ResultState>lastFuture=Futures.immediateFuture(null);
        for(List<ReconciliationNotificationListener>services:registeredServices.values()){
            lastFuture=reconcileServices(lastFuture,services,node);
        }
        returnlastFuture;
    }

在reconcileNode方法中是轮询registeredServices变量中的services并调用reconcileServices方法:其逻辑是调用每个service的startReconciliation方法!这里我们可以看到传入的是一个List,提前给出结论这是一系列优先级相同的服务!

Java
private ListenableFuture<ResultState> reconcileServices(ListenableFuture<ResultState> prevFuture, List<ReconciliationNotificationListener> servicesForPriority, DeviceInfo node) { return Futures.transformAsync(prevFuture, prevResult -> Futures.transform(Futures.allAsList( // 调用了service的startReconciliation方法 servicesForPriority.stream().map(service -> service.startReconciliation(node)) .collect(Collectors.toList())), results -> decidedResultState.get(), MoreExecutors.directExecutor()), MoreExecutors.directExecutor()); }
1
2
3
4
5
6
7
8
9
10
11
privateListenableFuture<ResultState>reconcileServices(ListenableFuture<ResultState>prevFuture,
                                                            List<ReconciliationNotificationListener>
                                                                    servicesForPriority,
                                                            DeviceInfo node){
        returnFutures.transformAsync(prevFuture,prevResult->Futures.transform(Futures.allAsList(
        // 调用了service的startReconciliation方法
                servicesForPriority.stream().map(service->service.startReconciliation(node))
                        .collect(Collectors.toList())),results->decidedResultState.get(),
                                                                                  MoreExecutors.directExecutor()),
                                      MoreExecutors.directExecutor());
    }

回顾ReconciliationFramework!注册到framework的service都必须实现特定接口,会实现上述的startReconciliation方法!

总结,当节点成为Switch的Master,如果使用ReconciliationFramework,最终是会调用到注册到ReconciliationManagerImpl中的service的startReconciliation方法。而注册到ReconciliationManagerImpl中的service是注册到ReconciliationFramework的应用,即我们北向实现的应用。所以我们可以将service注册到ReconciliationFramework来感知device选举后且完整初始化后的上线。

3.4 ReconciliationFramework回调

在onMasterRoleAcquired方法的逻辑中,异步执行becomeMasterBeforeSubmittedDS方法(即上述注册到reconciliationFramework的service的startReconciliation方法异步执行),当完成后会回调reconciliationFrameworkCallback

Java
Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo), reconciliationFrameworkCallback(deviceInfo, contextChain), MoreExecutors.directExecutor());
1
2
3
Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
                                        reconciliationFrameworkCallback(deviceInfo,contextChain),
                                        MoreExecutors.directExecutor());

这里有一个关注点,在contextChain.isMastered中是会修改contextChainImpl和各个context的状态为WORKING_MASTER。而我们使用了reconciliationFramework时,不会改变状态(根据isMastered方法传入第二个参数,不在reconciliationFramework步骤时才会设置状态,请回顾前面isMastered方法!)。

那么在使用reconciliationFramework时什么时候会设置contextChainImpl和各个context的状态?

答案是:回调reconciliationFrameworkCallback中:

Java

private FutureCallback<ResultState> reconciliationFrameworkCallback(@Nonnull DeviceInfo deviceInfo, ContextChain contextChain) { return new FutureCallback<ResultState>() { @Override public void onSuccess(@Nullable ResultState result) { if (ResultState.DONOTHING == result) { LOG.info("Device {} connection is enabled by reconciliation framework.", deviceInfo); // 注意:在reconciliationFramework处理完,再调用此方法,后续会再次调用到上面的 onMasterRoleAcquired且状态为INIT_SUBMIT, 在这个情况下就会进入else if设置状态为WORKING_MASTER contextChain.continueInitializationAfterReconciliation(); } else { LOG.warn("Reconciliation framework failure for device {}", deviceInfo); destroyContextChain(deviceInfo); } }

@Override public void onFailure(@Nonnull Throwable throwable) { LOG.warn("Reconciliation framework failure."); destroyContextChain(deviceInfo); } }; }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
privateFutureCallback<ResultState>reconciliationFrameworkCallback(@NonnullDeviceInfo deviceInfo,
                                                                        ContextChain contextChain){
        returnnewFutureCallback<ResultState>(){
            @Override
            publicvoidonSuccess(@NullableResultState result){
                if(ResultState.DONOTHING==result){
                    LOG.info("Device {} connection is enabled by reconciliation framework.",deviceInfo);
                    // 注意:在reconciliationFramework处理完,再调用此方法,后续会再次调用到上面的 onMasterRoleAcquired且状态为INIT_SUBMIT, 在这个情况下就会进入else if设置状态为WORKING_MASTER
                    contextChain.continueInitializationAfterReconciliation();
                }else{
                    LOG.warn("Reconciliation framework failure for device {}",deviceInfo);
                    destroyContextChain(deviceInfo);
                }
            }
 
            @Override
            publicvoidonFailure(@NonnullThrowable throwable){
                LOG.warn("Reconciliation framework failure.");
                destroyContextChain(deviceInfo);
            }
        };
    }

其逻辑是当reconciliationFramework中各个注册service的startReconciliation执行成功后,回调时调用statisticsContext.continueInitializationAfterReconciliation方法:在上面也提到

Java

@Override public void continueInitializationAfterReconciliation() { if (deviceContext.initialSubmitTransaction()) { contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT);

// 收集 startGatheringData(); } else { contextChainMastershipWatcher .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted."); } }

1
2
3
4
5
6
7
8
9
10
11
12
@Override
    publicvoidcontinueInitializationAfterReconciliation(){
        if(deviceContext.initialSubmitTransaction()){
            contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo,ContextChainMastershipState.INITIAL_SUBMIT);
 
            // 收集
            startGatheringData();
        }else{
            contextChainMastershipWatcher
                    .onNotAbleToStartMastershipMandatory(deviceInfo,"Initial transaction cannot be submitted.");
        }
    }

此时会再次执行ContextChainHolderImpl.onMasterRoleAcquired方法,且状态是INITIAL_SUBMIT!这很关键!此时再调用onMasterRoleAcquired方法:

Java

@Override public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo, @Nonnull final ContextChainMastershipState mastershipState) { Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {

if (ownershipChangeListener.isReconciliationFrameworkRegistered() && !ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)) { if (contextChain.isMastered(mastershipState, true)) { Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo), reconciliationFrameworkCallback(deviceInfo, contextChain), MoreExecutors.directExecutor()); } } else if (contextChain.isMastered(mastershipState, false)) {

LOG.info("Role MASTER was granted to device {}", deviceInfo); // 钩子触发上层应用(不使用ReconciliationFramework情况下的钩子) ownershipChangeListener.becomeMaster(deviceInfo);

// TODO: deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier()); } }); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
    publicvoidonMasterRoleAcquired(@NonnullfinalDeviceInfo deviceInfo,
                                     @NonnullfinalContextChainMastershipState mastershipState){
        Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain->{
 
            if(ownershipChangeListener.isReconciliationFrameworkRegistered()
                    &&!ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)){
                if(contextChain.isMastered(mastershipState,true)){
                    Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
                                        reconciliationFrameworkCallback(deviceInfo,contextChain),
                                        MoreExecutors.directExecutor());
                }
            }
            elseif(contextChain.isMastered(mastershipState,false)){
 
                LOG.info("Role MASTER was granted to device {}",deviceInfo);
                // 钩子触发上层应用(不使用ReconciliationFramework情况下的钩子)
                ownershipChangeListener.becomeMaster(deviceInfo);
 
                // TODO:
                deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
            }
        });
    }

我们关注if逻辑:此时虽然isReconciliationFrameworkRegistered为true,但是此次状态为INITIAL_SUBMIT,即不会进入if代码块(触发ReconciliationFramework)

Java
if (ownershipChangeListener.isReconciliationFrameworkRegistered() && !ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)) {
1
2
  if(ownershipChangeListener.isReconciliationFrameworkRegistered()
                    &&!ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)){

而是进入else if逻辑:
即会设置contextChain状态和各个context状态为WORKING_MASTER(这里isMastered方法的第二个参数体现出作用),且会调用ownershipChangeListener.becomeMaster(deviceInfo);方法。即,调用原生注册到MastershipChangeServiceManager的service(不是注册到ReconciliationFramework的service)

Java
else if (contextChain.isMastered(mastershipState, false)) { LOG.info("Role MASTER was granted to device {}", deviceInfo); ownershipChangeListener.becomeMaster(deviceInfo); deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
1
2
3
4
elseif(contextChain.isMastered(mastershipState,false)){
                LOG.info("Role MASTER was granted to device {}",deviceInfo);
                ownershipChangeListener.becomeMaster(deviceInfo);
                deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());

总结一下:调用ReconciliationFramework会在调用完成注册到framework的service后,才会开启持续收集静态数据,才会设置WORKING_MASTER状态。同样的,也会执行原生注册到MastershipChangeServiceManagerImpl的service。

3.5 ReconciliationFramework优点

  1. 北向应用服务可以按优先级注册,device上线也是按优先级调用各个servivce;
  2. 会先执行北向应用再开启收集数据;
  3. 使用reconciliationFramework同时也会执行原生注册MastershipChangeServiceManagerImpl的service

    Priority based/Ordered Coordination of Reconciliation across multiple applications.

3.6 如何注册ReconciliationFramework服务

Service需要实现ReconciliationNotificationListener接口,并调用ReconciliationManagerImpl.registerService将service注册到ReconciliationFramework。我们不妨来关注一下注册方法,其核心逻辑是会将相同优先级服务放入同一个list,key为优先级。将key和serviceList存入Map!那么是怎么实现上述的按优先级触发服务,关键在于这个Map对象类型!

Java
@Override public NotificationRegistration registerService(ReconciliationNotificationListener reconciliationTask) { LOG.debug("Registered service {} with priority {} and intent {}", reconciliationTask.getName(), reconciliationTask.getPriority(), reconciliationTask.getResultState()); registeredServices.computeIfAbsent(reconciliationTask.getPriority(), services -> new ArrayList<>()) .add(reconciliationTask); ReconciliationServiceDelegate registration = new ReconciliationServiceDelegate(reconciliationTask, () -> { LOG.debug("Service un-registered from Reconciliation framework {}", reconciliationTask.getName()); registeredServices.computeIfPresent(reconciliationTask.getPriority(), (priority, services) -> services) .remove(reconciliationTask); decideResultState(reconciliationTask.getResultState()); }); decideResultState(reconciliationTask.getResultState()); return registration; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
    publicNotificationRegistration registerService(ReconciliationNotificationListener reconciliationTask){
        LOG.debug("Registered service {} with priority {} and intent {}",reconciliationTask.getName(),
                  reconciliationTask.getPriority(),reconciliationTask.getResultState());
        registeredServices.computeIfAbsent(reconciliationTask.getPriority(),services->newArrayList<>())
                .add(reconciliationTask);
        ReconciliationServiceDelegate registration=newReconciliationServiceDelegate(reconciliationTask,()->{
            LOG.debug("Service un-registered from Reconciliation framework {}",reconciliationTask.getName());
            registeredServices.computeIfPresent(reconciliationTask.getPriority(),(priority,services)->services)
                    .remove(reconciliationTask);
            decideResultState(reconciliationTask.getResultState());
        });
        decideResultState(reconciliationTask.getResultState());
        returnregistration;
    }

我们关注存服务优先级及各组服务引用的变量registeredServices,它是ConcurrentSkipListMap对象,其是根据key进行排序的Map!

Java
private final Map<Integer, List<ReconciliationNotificationListener>> registeredServices = new ConcurrentSkipListMap<>();
1
2
privatefinalMap<Integer,List<ReconciliationNotificationListener>>registeredServices=
            newConcurrentSkipListMap<>();

注册到ReconciliationManagerImpl中的service需要具体实现一些方法,比如startReconciliation,具体可参考文档

4.总结

通过本文,我们可以了解到当节点成为Switch Master后是如何通过MastershipChangeServiceManager和ReconciliationFramework触发北向应用,我们作为开发者是如何将服务注册到MastershipChangeServiceManager和ReconciliationFramework。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK