62

(四)ODL Openflowplugin Master选举及Context服务实例化源码分析

 5 years ago
source link: https://www.sdnlab.com/22309.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发。

由于篇幅问题,我们将“Openflowplugin中Switch生命周期”这个大问题拆分为几个篇章:Switch生命周期对象ContextChain创建;控制节点的Master选举及ContextChain/Context服务实例化;MastershipChangeService以ReconciliationFramework;控制节点成为Slave;Switch下线过程。
本文为Openflowplugin(0.6.2)源码分析第四篇,控制节点的Master选举及ContextChain/Context服务实例化,个人认为是Openflowplugin中最核心一篇!

附:
第一篇:(一)ODL OpenflowPlugin启动流程源码分析
第二篇:(二)ODL Openflowplugin Switch连上控制器Handshake过程源码分析
第三篇:(三)ODL Openflowplugin Switch生命周期对象ContextChain创建源码分析

读者约定:基本掌握Opendaylight的思想/有一定实践经验,想要深入理解openflowplugin源码/想对openflowplugin源码修改。

回顾上一篇核心观点:“在Switch完成Handshake后,Openflowplugin会为Switch创建各个Context对象(Device/Rpc/Statistics/Role),以及ContextChain对象。”引出本文核心问题:在多个控制器情况下,当Switch设置多个控制器(比如ovs:set-controller),那么每个控制器,Openflowplugin都会为switch创建ContextChain对象,那么哪个控制器节点会向上层提供Switch的服务?比如下发流表。

结论:Openflow协议支持多个控制器,控制器有角色有三类master、slave、equal。本文仅讨论Master/Slave,当控制器节点为Switch的Master,该控制器的Openflowplugin为上层应用提供该Switch的服务。

一般来说,为了控制层高可用使用master/slave模式;为了控制器负载均衡使用equal模式;

1.Master选举(Cluster Singleton Service)

在上一篇《ODL Openflowplugin Switch生命周期对象ContextChain创建源码分析》笔记最后留下了悬念,在ContextChainHolderImpl.createContextChain方法的最后代码是Master选举的关键:

Java
contextChain.registerServices(singletonServiceProvider);
1
contextChain.registerServices(singletonServiceProvider);

接下来我们详细剖析此行代码!

ContextChainImpl.registerServices方法如下:

Java
@Override public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) { registration = Objects.requireNonNull(clusterSingletonServiceProvider .registerClusterSingletonService(this)); LOG.debug("Registered clustering services for node {}", deviceInfo); }
1
2
3
4
5
6
@Override
    publicvoidregisterServices(finalClusterSingletonServiceProvider clusterSingletonServiceProvider){
        registration=Objects.requireNonNull(clusterSingletonServiceProvider
                .registerClusterSingletonService(this));
        LOG.debug("Registered clustering services for node {}",deviceInfo);
    }

方法是将当前ContextChainImpl对象注册为ClusterSingletonService。这是opendaylight/mdsal的接口,它作用是在控制器集群节点中为这一个service进行选举,最终只会在选举出来的leader节点中运行此serivce,所以称为Singleton service。注意:可以看到ContextChain实现了ClusterSingletonService接口。

如果是集群情况,那么各个控制节点会选举,如果不是集群,那么当前节点就会成为master。

注:Opendaylight为集群环境提供了Singleton Service以及EntityOwnershipService,具体可以参考ODL官网,后续我会为此两服务展开深入剖析笔记!

根据Cluster Singleton Service的实现,当某个节点成为此service的leader,首先会在此节点中调用此service自身的instantiateServiceInstance方法。所以,可以思考到Openflowplugin运行集群情况下,底层Switch必须设置所有集群节点为其控制器,这样在每个控制器都创建ContextChain对象都会注册Singleton Service,最终由底层决定某一个节点成某个switch的ContextChain服务的Leader并运行其instantiateServiceInstance方法进行实例化服务!

总结:Openflowplugin为每个Switch创建ContextChain,ContextChain作为一个Cluster Singleton service。本质上的Switch的控制器的Master选举,就是ContextChain的Cluster Singleton Service选举。

2.实例化ContextChain服务实例

如上面所述,当ContextChain作为一个Singleton service选举,成为Leader的节点就会运行其ContextChainImpl.instantiateServiceInstance方法:

Java
@Override @SuppressWarnings("checkstyle:IllegalCatch") public void instantiateServiceInstance() { try { contexts.forEach(OFPContext::instantiateServiceInstance); LOG.info("Started clustering services for node {}", deviceInfo); } catch (final Exception ex) { LOG.warn("Not able to start clustering services for node {}", deviceInfo); executorService.execute(() -> contextChainMastershipWatcher .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString())); } }
1
2
3
4
5
6
7
8
9
10
11
12
@Override
    @SuppressWarnings("checkstyle:IllegalCatch")
    publicvoidinstantiateServiceInstance(){
        try{
            contexts.forEach(OFPContext::instantiateServiceInstance);
            LOG.info("Started clustering services for node {}",deviceInfo);
        }catch(finalException ex){
            LOG.warn("Not able to start clustering services for node {}",deviceInfo);
            executorService.execute(()->contextChainMastershipWatcher
                    .onNotAbleToStartMastershipMandatory(deviceInfo,ex.toString()));
        }
    }

可以看到方法逻辑是调用上一步传入的各个Context的instantiateServiceInstance方法。如果有任何异常都调用ContextChainHolderImpl.onNotAbleToStartMastership方法(会销毁ContextChain及各个Context)。

而调用Context的instantiateServiceInstance方法,即GuardedContextImpl.instantiateServiceInstance:

Java
@Override public void instantiateServiceInstance() { if (monitor.enterIf(isStartable)) { try { LOG.info("Starting {} service for node {}", this, getDeviceInfo()); state = STARTING; delegate.instantiateServiceInstance(); state = RUNNING; } finally { monitor.leave(); } } else { throw new IllegalStateException("Service " + this + " has already been started"); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
    publicvoidinstantiateServiceInstance(){
        if(monitor.enterIf(isStartable)){
            try{
                LOG.info("Starting {} service for node {}",this,getDeviceInfo());
                state=STARTING;
                delegate.instantiateServiceInstance();
                state=RUNNING;
            }finally{
                monitor.leave();
            }
        }else{
            thrownewIllegalStateException("Service "+this+" has already been started");
        }
    }

实际上是调用:DeviceContext/RpcContext/StatisticsContext/RoleContext的instantiateServiceInstance方法。

先给出结论:当四个Context的instantiateServiceInstance方法都成功执行完成,当前控制器正式成为Switch的Master,并可通过“钩子”触发上层北向应用。

2.1 DeviceContext实例化服务

下面展开DeviceContextImpl.instantiateServiceInstance具体过程。实例化DeviceContextImpl,需要执行很多逻辑。包括:
(1)lazyTransactionManagerInitialization(),会创建TransactionChainManager对象;创建DeviceFlowRegistryImpl对象;创建DeviceGroupRegistryImpl对象;创建DeviceMeterRegistryImpl对象。

(2)解析portStatusMessages,将switch上的nodeConnector(port)写入operational YANG。注意:此时刚刚Handshake完成后,应该是没有port的,后面步骤会请求底层port信息。

(3)根据Device openflow版本号,选择不同的initializer(OF10DeviceInitializer或OF13DeviceInitializer)进行处似乎还,初始化会进行。

  • 1.将switch node写入Operational YANG: inventory node;
  • 2.解析capabilities, 写入deviceContext.deviceState;
  • 3.发消息请求switch获取switch各个特性。包括:table/group/meter/port description等信息,并写到operational yang中

(4)调用DeviceFlowRegistryImpl.fill(), 读取ovs node的FlowCapableNode的节点的流(从YANG),注意DeviceFlowRegistryImpl对象是ofp中缓存了一份完整流表。

(5)读取FlowCapableNode完成后, 回调DeviceFlowRegistryCallback对象; 最终会调用ContextChainHolderImpl.onMasterRoleAcquired方法, 传入参数状态是INITIAL_FLOW_REGISTRY_FILL。

Java

@Override @SuppressWarnings({"checkstyle:IllegalCatch"}) public void instantiateServiceInstance() {

lazyTransactionManagerInitialization();

try { final List<PortStatusMessage> portStatusMessages = primaryConnectionContext .retrieveAndClearPortStatusMessages(); // 解析portStatusMessages,将switch上的nodeConnector(port)写入operational YANG portStatusMessages.forEach(this::writePortStatusMessage); submitTransaction(); } catch (final Exception ex) { throw new RuntimeException(String.format("Error processing port status messages from device %s: %s", deviceInfo.toString(), ex.toString()), ex); }

/* 根据device openflow版本号,选择不同的initializer(OF10DeviceInitializer或OF13DeviceInitializer),初始化. 1.将ovs node写入operational YANG 2.解析capabilities, 写入deviceContext.deviceState 3.发消息请求switch获取table features/group features/meter features/port description等信息,并写到operational yang中 */ final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider .lookup(deviceInfo.getVersion()); if (initializer.isPresent()) { final Future<Void> initialize = initializer .get() .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);

try { initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { initialize.cancel(true); throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s", deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex); } catch (ExecutionException | InterruptedException ex) { throw new RuntimeException( String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex); } } else { throw new RuntimeException(String.format("Unsupported version %s for device %s", deviceInfo.getVersion(), deviceInfo.toString())); } // 调用DeviceFlowRegistryImpl.fill(), 读取ovs node的FlowCapableNode的节点(从YANG) final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();

// 读取FlowCapableNode完成后, 回调DeviceFlowRegistryCallback对象; 最终会调用contextChainMastershipWatcher.onMasterRoleAcquired, 状态是INITIAL_FLOW_REGISTRY_FILL Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher), MoreExecutors.directExecutor()); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Override
    @SuppressWarnings({"checkstyle:IllegalCatch"})
    publicvoidinstantiateServiceInstance(){
 
        lazyTransactionManagerInitialization();
 
        try{
            finalList<PortStatusMessage>portStatusMessages=primaryConnectionContext
                    .retrieveAndClearPortStatusMessages();
// 解析portStatusMessages,将switch上的nodeConnector(port)写入operational YANG
            portStatusMessages.forEach(this::writePortStatusMessage);
            submitTransaction();
        }catch(finalException ex){
            thrownewRuntimeException(String.format("Error processing port status messages from device %s: %s",
                    deviceInfo.toString(),ex.toString()),ex);
        }
 
/*
根据device openflow版本号,选择不同的initializer(OF10DeviceInitializer或OF13DeviceInitializer),初始化.
1.将ovs node写入operational YANG
2.解析capabilities, 写入deviceContext.deviceState
3.发消息请求switch获取table features/group features/meter features/port description等信息,并写到operational yang中
        */
        finalOptional<AbstractDeviceInitializer>initializer=deviceInitializerProvider
                .lookup(deviceInfo.getVersion());
        if(initializer.isPresent()){
            finalFuture<Void>initialize=initializer
                    .get()
                    .initialize(this,switchFeaturesMandatory,skipTableFeatures,writerProvider,convertorExecutor);
 
            try{
                initialize.get(DEVICE_INIT_TIMEOUT,TimeUnit.MILLISECONDS);
            }catch(TimeoutException ex){
                initialize.cancel(true);
                thrownewRuntimeException(String.format("Failed to initialize device %s in %ss: %s",
                        deviceInfo.toString(),String.valueOf(DEVICE_INIT_TIMEOUT/1000),ex.toString()),ex);
            }catch(ExecutionException | InterruptedException ex){
                thrownewRuntimeException(
                        String.format("Device %s cannot be initialized: %s",deviceInfo.toString(),ex.toString()),ex);
            }
        }else{
            thrownewRuntimeException(String.format("Unsupported version %s for device %s",
                    deviceInfo.getVersion(),
                    deviceInfo.toString()));
        }
// 调用DeviceFlowRegistryImpl.fill(), 读取ovs node的FlowCapableNode的节点(从YANG)
        finalListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>>deviceFlowRegistryFill=
                getDeviceFlowRegistry().fill();
 
// 读取FlowCapableNode完成后, 回调DeviceFlowRegistryCallback对象; 最终会调用contextChainMastershipWatcher.onMasterRoleAcquired, 状态是INITIAL_FLOW_REGISTRY_FILL
        Futures.addCallback(deviceFlowRegistryFill,
                newDeviceFlowRegistryCallback(deviceFlowRegistryFill,contextChainMastershipWatcher),
                MoreExecutors.directExecutor());
    }

关注一下,所以当DeviceContext对象初始化完成后,最终调用ContextChainHolderImpl.onMasterRoleAcquired方法。给出结论,当各个Context实例化服务实例最后都会调用此方法。当所有Context都完成调用后,说明Switch对应的ContextChain及Context完成实例化,并能提供服务。

2.2 RpcContext实例化服务

RpcContext的instantiateServiceInstance方法主要完成:
(1)主要是给rpcContext注册各种rpc实现,用于上层调用(比如下发流表SalFlowService)。在RpcContext实例化过程中,会实例化并注册所有Rpc服务,提供给上层调用。具体RPC服务注册及使用,在下一篇笔记展开。
(2)在注册各种rpc实现后,调用ContextChainHolderImpl.onMasterRoleAcquired方法,传入参数状态是RPC_REGISTRATION

Java

@Override public void instantiateServiceInstance() { // 创建各个sal service(对设备的rpc操作等等), registerRpcServiceImplementation到自身rpcContextImpl. 包括:SalFlowServiceImpl // registers all OF services for role MdSalRegistrationUtils.registerServices(this, deviceContext, extensionConverterProvider, convertorExecutor);

if (isStatisticsRpcEnabled && !deviceContext.canUseSingleLayerSerialization()) { // 注册统计相关的service MdSalRegistrationUtils.registerStatCompatibilityServices( this, deviceContext, notificationPublishService, convertorExecutor); }

contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.RPC_REGISTRATION); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
    publicvoidinstantiateServiceInstance(){
        // 创建各个sal service(对设备的rpc操作等等), registerRpcServiceImplementation到自身rpcContextImpl. 包括:SalFlowServiceImpl
        //  registers all OF services for role
        MdSalRegistrationUtils.registerServices(this,deviceContext,extensionConverterProvider,convertorExecutor);
 
        if(isStatisticsRpcEnabled&&!deviceContext.canUseSingleLayerSerialization()){
            // 注册统计相关的service
            MdSalRegistrationUtils.registerStatCompatibilityServices(
                    this,
                    deviceContext,
                    notificationPublishService,
                    convertorExecutor);
        }
 
        contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo,ContextChainMastershipState.RPC_REGISTRATION);
    }

其中调用MdSalRegistrationUtils.registerServices方法中可以看到实例化各种RPC Service并注册。

Java

public static void registerServices(@Nonnull final RpcContext rpcContext, @Nonnull final DeviceContext deviceContext, final ExtensionConverterProvider extensionConverterProvider, //ExtensionConverterManagerImpl, 是在openflowPluginProvider中new的 final ConvertorExecutor convertorExecutor) { // TODO: Use multipart writer provider from device context final MultipartWriterProvider multipartWriterProvider = MultipartWriterProviderFactory .createDefaultProvider(deviceContext);

// create service instances final SalFlowServiceImpl salFlowService = new SalFlowServiceImpl(rpcContext, deviceContext, convertorExecutor); final FlowCapableTransactionServiceImpl flowCapableTransactionService = new FlowCapableTransactionServiceImpl(rpcContext, deviceContext); final SalAsyncConfigServiceImpl salAsyncConfigService = new SalAsyncConfigServiceImpl(rpcContext, deviceContext); final SalGroupServiceImpl salGroupService = new SalGroupServiceImpl(rpcContext, deviceContext, convertorExecutor); final SalMeterServiceImpl salMeterService = new SalMeterServiceImpl(rpcContext, deviceContext, convertorExecutor);

// register routed service instances rpcContext.registerRpcServiceImplementation(SalEchoService.class, new SalEchoServiceImpl(rpcContext, deviceContext)); rpcContext.registerRpcServiceImplementation(SalFlowService.class, salFlowService); rpcContext.registerRpcServiceImplementation(FlowCapableTransactionService.class, flowCapableTransactionService); rpcContext.registerRpcServiceImplementation(SalAsyncConfigService.class, salAsyncConfigService); rpcContext.registerRpcServiceImplementation(SalMeterService.class, salMeterService); rpcContext.registerRpcServiceImplementation(SalGroupService.class, salGroupService); rpcContext.registerRpcServiceImplementation(SalTableService.class, new SalTableServiceImpl(rpcContext, deviceContext, convertorExecutor, multipartWriterProvider)); rpcContext.registerRpcServiceImplementation(SalPortService.class, new SalPortServiceImpl(rpcContext, deviceContext, convertorExecutor)); rpcContext.registerRpcServiceImplementation(PacketProcessingService.class, new PacketProcessingServiceImpl(rpcContext, deviceContext, convertorExecutor)); rpcContext.registerRpcServiceImplementation(NodeConfigService.class, new NodeConfigServiceImpl(rpcContext, deviceContext)); rpcContext.registerRpcServiceImplementation(OpendaylightFlowStatisticsService.class, OpendaylightFlowStatisticsServiceImpl.createWithOook(rpcContext, deviceContext, convertorExecutor));

// register direct statistics gathering services rpcContext.registerRpcServiceImplementation(OpendaylightDirectStatisticsService.class, new OpendaylightDirectStatisticsServiceImpl(deviceContext.canUseSingleLayerSerialization() ? SingleLayerDirectStatisticsProviderInitializer .createProvider(rpcContext, deviceContext, convertorExecutor, multipartWriterProvider) : MultiLayerDirectStatisticsProviderInitializer .createProvider(rpcContext, deviceContext, convertorExecutor, multipartWriterProvider)));

// register flat batch services rpcContext.registerRpcServiceImplementation(SalFlatBatchService.class, new SalFlatBatchServiceImpl( new SalFlowsBatchServiceImpl(salFlowService, flowCapableTransactionService), new SalGroupsBatchServiceImpl(salGroupService, flowCapableTransactionService), new SalMetersBatchServiceImpl(salMeterService, flowCapableTransactionService) ));

// register experimenter services rpcContext.registerRpcServiceImplementation(SalExperimenterMessageService.class, new SalExperimenterMessageServiceImpl(rpcContext, deviceContext, extensionConverterProvider)); rpcContext.registerRpcServiceImplementation(SalExperimenterMpMessageService.class, new SalExperimenterMpMessageServiceImpl(rpcContext, deviceContext, extensionConverterProvider));

//register onf extension bundles rpcContext.registerRpcServiceImplementation(SalBundleService.class, new SalBundleServiceImpl(new SalExperimenterMessageServiceImpl( rpcContext, deviceContext, extensionConverterProvider))); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
publicstaticvoidregisterServices(@NonnullfinalRpcContext rpcContext,
                                        @NonnullfinalDeviceContext deviceContext,
                                        finalExtensionConverterProvider extensionConverterProvider,//ExtensionConverterManagerImpl, 是在openflowPluginProvider中new的
                                        finalConvertorExecutor convertorExecutor){
        // TODO: Use multipart writer provider from device context
        finalMultipartWriterProvider multipartWriterProvider=MultipartWriterProviderFactory
            .createDefaultProvider(deviceContext);
 
        // create service instances
        finalSalFlowServiceImpl salFlowService=newSalFlowServiceImpl(rpcContext,deviceContext,convertorExecutor);
        finalFlowCapableTransactionServiceImpl flowCapableTransactionService=
                newFlowCapableTransactionServiceImpl(rpcContext,deviceContext);
        finalSalAsyncConfigServiceImpl salAsyncConfigService=
                newSalAsyncConfigServiceImpl(rpcContext,deviceContext);
        finalSalGroupServiceImpl salGroupService=
                newSalGroupServiceImpl(rpcContext,deviceContext,convertorExecutor);
        finalSalMeterServiceImpl salMeterService=
                newSalMeterServiceImpl(rpcContext,deviceContext,convertorExecutor);
 
        // register routed service instances
        rpcContext.registerRpcServiceImplementation(SalEchoService.class,
                newSalEchoServiceImpl(rpcContext,deviceContext));
        rpcContext.registerRpcServiceImplementation(SalFlowService.class,salFlowService);
        rpcContext.registerRpcServiceImplementation(FlowCapableTransactionService.class,flowCapableTransactionService);
        rpcContext.registerRpcServiceImplementation(SalAsyncConfigService.class,salAsyncConfigService);
        rpcContext.registerRpcServiceImplementation(SalMeterService.class,salMeterService);
        rpcContext.registerRpcServiceImplementation(SalGroupService.class,salGroupService);
        rpcContext.registerRpcServiceImplementation(SalTableService.class,
                newSalTableServiceImpl(rpcContext,deviceContext,convertorExecutor,multipartWriterProvider));
        rpcContext.registerRpcServiceImplementation(SalPortService.class,
                newSalPortServiceImpl(rpcContext,deviceContext,convertorExecutor));
        rpcContext.registerRpcServiceImplementation(PacketProcessingService.class,
                newPacketProcessingServiceImpl(rpcContext,deviceContext,convertorExecutor));
        rpcContext.registerRpcServiceImplementation(NodeConfigService.class,
                newNodeConfigServiceImpl(rpcContext,deviceContext));
        rpcContext.registerRpcServiceImplementation(OpendaylightFlowStatisticsService.class,
                OpendaylightFlowStatisticsServiceImpl.createWithOook(rpcContext,deviceContext,convertorExecutor));
 
        // register direct statistics gathering services
        rpcContext.registerRpcServiceImplementation(OpendaylightDirectStatisticsService.class,
            newOpendaylightDirectStatisticsServiceImpl(deviceContext.canUseSingleLayerSerialization()
                ?SingleLayerDirectStatisticsProviderInitializer
                    .createProvider(rpcContext,deviceContext,convertorExecutor,multipartWriterProvider)
                :MultiLayerDirectStatisticsProviderInitializer
                    .createProvider(rpcContext,deviceContext,convertorExecutor,multipartWriterProvider)));
 
        // register flat batch services
        rpcContext.registerRpcServiceImplementation(SalFlatBatchService.class,newSalFlatBatchServiceImpl(
                newSalFlowsBatchServiceImpl(salFlowService,flowCapableTransactionService),
                newSalGroupsBatchServiceImpl(salGroupService,flowCapableTransactionService),
                newSalMetersBatchServiceImpl(salMeterService,flowCapableTransactionService)
        ));
 
        // register experimenter services
        rpcContext.registerRpcServiceImplementation(SalExperimenterMessageService.class,
                newSalExperimenterMessageServiceImpl(rpcContext,deviceContext,extensionConverterProvider));
        rpcContext.registerRpcServiceImplementation(SalExperimenterMpMessageService.class,
                newSalExperimenterMpMessageServiceImpl(rpcContext,deviceContext,extensionConverterProvider));
 
        //register onf extension bundles
        rpcContext.registerRpcServiceImplementation(SalBundleService.class,
                newSalBundleServiceImpl(newSalExperimenterMessageServiceImpl(
                        rpcContext,deviceContext,extensionConverterProvider)));
    }

2.3 StatisticsContext实例化服务

实例化StatisticsContext,会提供Statistics数据收集服务。当然这需要额外的性能消耗,我们可以关闭数据收集以提高性能,会在后续笔记具体展开。

在其instantiateServiceInstance方法中,会根据配置和Switch的特性,确定需要收集的数据类型,并最后调用gatherDynamicData方法收集动态数据,当收集成功后回调InitialSubmitCallback

instantiateServiceInstance方法如下:

Java

@Override public void instantiateServiceInstance() { final List<MultipartType> statListForCollecting = new ArrayList<>();

// 根据device支持及config配置, 填入对应收集的数据类型. 意思应该是需要收集这些类型数据 if (devState.isTableStatisticsAvailable() && config.isIsTableStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPTABLE); }

if (devState.isGroupAvailable() && config.isIsGroupStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPGROUPDESC); statListForCollecting.add(MultipartType.OFPMPGROUP); }

if (devState.isMetersAvailable() && config.isIsMeterStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPMETERCONFIG); statListForCollecting.add(MultipartType.OFPMPMETER); }

if (devState.isFlowStatisticsAvailable() && config.isIsFlowStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPFLOW); }

if (devState.isPortStatisticsAvailable() && config.isIsPortStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPPORTSTATS); }

if (devState.isQueueStatisticsAvailable() && config.isIsQueueStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPQUEUE); }

collectingStatType = ImmutableList.copyOf(statListForCollecting); // 根据设置的收集数据类型,进行数据收集(会请求switch), 成功后回调InitialSubmitCallback Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor()); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
    publicvoidinstantiateServiceInstance(){
        finalList<MultipartType>statListForCollecting=newArrayList<>();
 
        // 根据device支持及config配置, 填入对应收集的数据类型. 意思应该是需要收集这些类型数据
        if(devState.isTableStatisticsAvailable()&&config.isIsTableStatisticsPollingOn()){
            statListForCollecting.add(MultipartType.OFPMPTABLE);
        }
 
        if(devState.isGroupAvailable()&&config.isIsGroupStatisticsPollingOn()){
            statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
            statListForCollecting.add(MultipartType.OFPMPGROUP);
        }
 
        if(devState.isMetersAvailable()&&config.isIsMeterStatisticsPollingOn()){
            statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
            statListForCollecting.add(MultipartType.OFPMPMETER);
        }
 
        if(devState.isFlowStatisticsAvailable()&&config.isIsFlowStatisticsPollingOn()){
            statListForCollecting.add(MultipartType.OFPMPFLOW);
        }
 
        if(devState.isPortStatisticsAvailable()&&config.isIsPortStatisticsPollingOn()){
            statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
        }
 
        if(devState.isQueueStatisticsAvailable()&&config.isIsQueueStatisticsPollingOn()){
            statListForCollecting.add(MultipartType.OFPMPQUEUE);
        }
 
        collectingStatType=ImmutableList.copyOf(statListForCollecting);
        // 根据设置的收集数据类型,进行数据收集(会请求switch), 成功后回调InitialSubmitCallback
        Futures.addCallback(gatherDynamicData(),newInitialSubmitCallback(),MoreExecutors.directExecutor());
    }

gatherDynamicData方法收集动态数据:会判断switch是否在线(没有断连),然后根据刚刚在instantiateServiceInstance方法中设置的收集数据列表进行数据收集,会发请求到switch。在gatherDynamicData方法中,实现的是一次根据需要收集数据的类型,向Switch发送请求收集数据,并且在YANG中更新数据收集时间戳。(具体收集数据过程在另一篇笔记展开)

Java

private ListenableFuture<Boolean> gatherDynamicData() { // 数据收集开关 if (!isStatisticsPollingOn || !schedulingEnabled.get()) { LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue()); return Futures.immediateFuture(Boolean.TRUE); }

return this.lastDataGatheringRef.updateAndGet(future -> { // write start timestamp to state snapshot container // 写入收集开始时间到 FlowCapableStatisticsGatheringStatus(flow-node-inventory.yang) StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);

// recreate gathering future if it should be recreated final ListenableFuture<Boolean> lastDataGathering = Objects.isNull(future) || future.isCancelled() || future.isDone() ? Futures .immediateFuture(Boolean.TRUE) : future;

// build statistics gathering future // 判断switch是否在线(没有断连),然后根据inita设置的收集数据列表进行数据收集 final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream() .reduce(lastDataGathering, this::statChainFuture, (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn, MoreExecutors.directExecutor()));

// write end timestamp to state snapshot container Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() { @Override public void onSuccess(@Nonnull final Boolean result) { // 写入结束时间到 FlowCapableStatisticsGatheringStatus(flow-node-inventory.yang) StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result); }

@Override public void onFailure(final Throwable throwable) { if (!(throwable instanceof TransactionChainClosedException)) { StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false); } } }, MoreExecutors.directExecutor());

return newDataGathering; }); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
privateListenableFuture<Boolean>gatherDynamicData(){
        // 数据收集开关
        if(!isStatisticsPollingOn||!schedulingEnabled.get()){
            LOG.debug("Statistics for device {} are not enabled.",getDeviceInfo().getNodeId().getValue());
            returnFutures.immediateFuture(Boolean.TRUE);
        }
 
        returnthis.lastDataGatheringRef.updateAndGet(future->{
            // write start timestamp to state snapshot container
            // 写入收集开始时间到 FlowCapableStatisticsGatheringStatus(flow-node-inventory.yang)
            StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo,deviceContext);
 
            // recreate gathering future if it should be recreated
            finalListenableFuture<Boolean>lastDataGathering=
                    Objects.isNull(future)||future.isCancelled()||future.isDone()?Futures
                            .immediateFuture(Boolean.TRUE):future;
 
            // build statistics gathering future
            // 判断switch是否在线(没有断连),然后根据inita设置的收集数据列表进行数据收集
            finalListenableFuture<Boolean>newDataGathering=collectingStatType.stream()
                    .reduce(lastDataGathering,this::statChainFuture,
                        (listenableFuture,asyn)->Futures.transformAsync(listenableFuture,result->asyn,
                                MoreExecutors.directExecutor()));
 
            // write end timestamp to state snapshot container
            Futures.addCallback(newDataGathering,newFutureCallback<Boolean>(){
                @Override
                publicvoidonSuccess(@NonnullfinalBooleanresult){
                // 写入结束时间到 FlowCapableStatisticsGatheringStatus(flow-node-inventory.yang)
                    StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo,deviceContext,result);
                }
 
                @Override
                publicvoidonFailure(finalThrowable throwable){
                    if(!(throwable instanceofTransactionChainClosedException)){
                        StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo,deviceContext,false);
                    }
                }
            },MoreExecutors.directExecutor());
 
            returnnewDataGathering;
        });
    }

而回调对象InitialSubmitCallback中,当gatherDynamicData方法执行成功后会首先调用ContextChainHolderImpl.onMasterRoleAcquired方法,且传入参数状态为INITIAL_GATHERING,表示完成初始化的收集数据(第一次收集)。

Java

private final class InitialSubmitCallback implements FutureCallback<Boolean> { @Override public void onSuccess(@Nullable final Boolean result) { contextChainMastershipWatcher .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING);

// 如果使用ReconciliationFramework,在此先不会开启持续的数据收集服务 if (!isUsingReconciliationFramework) { continueInitializationAfterReconciliation(); } }

@Override public void onFailure(@Nonnull final Throwable throwable) { contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo, "Initial gathering statistics " + "unsuccessful: " + throwable.getMessage()); } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
privatefinalclassInitialSubmitCallbackimplementsFutureCallback<Boolean>{
        @Override
        publicvoidonSuccess(@NullablefinalBooleanresult){
            contextChainMastershipWatcher
                    .onMasterRoleAcquired(deviceInfo,ContextChainMastershipState.INITIAL_GATHERING);
 
// 如果使用ReconciliationFramework,在此先不会开启持续的数据收集服务
            if(!isUsingReconciliationFramework){
                continueInitializationAfterReconciliation();
            }
        }
 
        @Override
        publicvoidonFailure(@NonnullfinalThrowable throwable){
            contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo,
                                                                              "Initial gathering statistics "
                                                                                      +"unsuccessful: "
                                                                                      +throwable.getMessage());
        }
    }

并且在这里会判断是否使用ReconciliationFramework。如果使用了,会调用continueInitializationAfterReconciliation方法开启持续收集数据的服务。如果不使用,会持续收集信息。在调用方法中,会再次调用ContextChainHolderImpl.onMasterRoleAcquired方法,传入参数状态为INITIAL_SUBMIT

需要注意,ReconciliationFramework是Openflowplugin额外扩展的实现用于Switch协商的应用,主要是提供有优先级的上层服务启动,是一个钩子感知底层ofp成为Switch的Master的一个实现,后面笔记会深入剖析。

在此InitialSubmitCallback回调中如果使用ReconciliationFramework就不会立即开启持续收集数据,而是在ReconciliationFramework处理完后,才再调用continueInitializationAfterReconciliation方法开启持续收集数据的服务。此处具体逻辑,可以在后面看到。

2.4 RoleContext实例化服务

初始化RoleContext服务,是向底层Switch发送消息,通知其当前控制节点成为其Master。具体实现中,当Openflow版本大于等于1.3,在这里会直接调用set-role rpc,通知Switch当前控制器节点是master(BECOMEMASTER)。

Java

@Override public void instantiateServiceInstance() { /* 通知device成为master, 调用rpc set-role */ final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMEMASTER); changeLastRoleFuture(future);

// 回调contextChainHolderImpl.onMasterRoleAcquired() Futures.addCallback(future, new MasterRoleCallback(), MoreExecutors.directExecutor()); }

1
2
3
4
5
6
7
8
9
10
11
@Override
    publicvoidinstantiateServiceInstance(){
        /*
            通知device成为master, 调用rpc set-role
         */
        finalListenableFuture<RpcResult<SetRoleOutput>>future=sendRoleChangeToDevice(OfpRole.BECOMEMASTER);
        changeLastRoleFuture(future);
 
        // 回调contextChainHolderImpl.onMasterRoleAcquired()
        Futures.addCallback(future,newMasterRoleCallback(),MoreExecutors.directExecutor());
    }

完成set-role rpc调用成功后(BECOMEMASTER),会回调MasterRoleCallback,最终会调用ContextChainHolderImpl.onMasterRoleAcquired方法,且传入状态为MASTER_ON_DEVICE

Java

private final class MasterRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> { @Override public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) { // 回调,设置状态为MASTER_ON_DEVICE contextChainMastershipWatcher.onMasterRoleAcquired( deviceInfo, ContextChainMastershipState.MASTER_ON_DEVICE); LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo); }

@Override public void onFailure(@Nonnull final Throwable throwable) { if (!(throwable instanceof CancellationException)) { contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory( deviceInfo, "Was not able to propagate MASTER role on device. Error: " + throwable.toString()); } } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
privatefinalclassMasterRoleCallbackimplementsFutureCallback<RpcResult<SetRoleOutput>>{
        @Override
        publicvoidonSuccess(@NullableRpcResult<SetRoleOutput>setRoleOutputRpcResult){
            // 回调,设置状态为MASTER_ON_DEVICE
            contextChainMastershipWatcher.onMasterRoleAcquired(
                    deviceInfo,
                    ContextChainMastershipState.MASTER_ON_DEVICE);
            LOG.debug("Role MASTER was successfully set on device, node {}",deviceInfo);
        }
 
        @Override
        publicvoidonFailure(@NonnullfinalThrowable throwable){
            if(!(throwable instanceofCancellationException)){
                contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
                        deviceInfo,
                        "Was not able to propagate MASTER role on device. Error: "+throwable.toString());
            }
        }
    }

3.控制器成为Switch的Master

实例化各个服务完成后,控制器正式成为Switch的Master。可以看到在各个context(device/rpc/statistics/role)初始化完成后,都会调用ContextChainHolderImpl.onMasterRoleAcquired方法,且传入不同的状态。现在让我们来展开在onMasterRoleAcquired方法。onMasterRoleAcquired方法是判断Switch在控制节点完成各个服务实例化的核心。是Openflowplugin对上层应用提供触发钩子的核心方法。

Java
@Override public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo, @Nonnull final ContextChainMastershipState mastershipState) { Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { if (ownershipChangeListener.isReconciliationFrameworkRegistered() && !ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)) { if (contextChain.isMastered(mastershipState, true)) { Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo), reconciliationFrameworkCallback(deviceInfo, contextChain), MoreExecutors.directExecutor()); } } else if (contextChain.isMastered(mastershipState, false)) { LOG.info("Role MASTER was granted to device {}", deviceInfo); ownershipChangeListener.becomeMaster(deviceInfo); deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier()); } }); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
    publicvoidonMasterRoleAcquired(@NonnullfinalDeviceInfo deviceInfo,
                                     @NonnullfinalContextChainMastershipState mastershipState){
        Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain->{
            if(ownershipChangeListener.isReconciliationFrameworkRegistered()
                    &&!ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)){
                if(contextChain.isMastered(mastershipState,true)){
                    Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
                                        reconciliationFrameworkCallback(deviceInfo,contextChain),
                                        MoreExecutors.directExecutor());
                }
            }
            elseif(contextChain.isMastered(mastershipState,false)){
                LOG.info("Role MASTER was granted to device {}",deviceInfo);
                ownershipChangeListener.becomeMaster(deviceInfo);
                deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
            }
        });
    }

会根据是否使用了ReconciliationFramework,有两个判断方式ContextChain是否已经成为master。在这里我们可以回顾一下在StatisticsContext中实例化服务时,当使用ReconciliationFramework不会立即开启数据收集,即不会再次调用onMasterRoleAcquired方法并传入状态INITIAL_SUBMIT。

所以在onMasterRoleAcquired方法中有体现,在调用contextChain.isMastered方法判断是否已经成为Master(各个服务实例化完成),在调用isMastered方法除了传入当前状态,还会传入第二个参数意味着是否在ReconciliationFramework的步骤中。

当使用了ReconciliationFramework且传入状态不是INITIAL_SUBMIT,证明是还在ReconciliationFramework的步骤中。当传入状态是INITIAL_SUBMIT,只会出现在调用StatisticsContextImpl.continueInitializationAfterReconciliation方法,即不开启ReconciliationFramework(StatisticsContext中)情况,或者ReconciliationFramework处理完成后才会调用。

Java
if (ownershipChangeListener.isReconciliationFrameworkRegistered() && !ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState))
1
if(ownershipChangeListener.isReconciliationFrameworkRegistered()&&!ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState))

当在ReconciliationFramework的步骤中,会传入第二个参数为true,否则传入参数为false

3.1 判断控制器节点为Master

contextChain.isMastered方法中,会判断控制器节点是否成为Switch的Master。可以注意下面一个小细节,当在ReconciliationFramework步骤(传入参数true)时,如果已经是Master也不会马上改变ContextChain及Context状态为WORKING_MASTER,仅仅直接返回值为True表示isMaster,但是还没是WORKING_MASTER。原因在于ReconciliationFramework会有额外处理,最后再会回调此方法,再设置为WORKING_MASTER。需要结合后面笔记,这里算是埋下伏笔吧。

Java

@Override public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState, boolean inReconciliationFrameworkStep) { switch (mastershipState) { case INITIAL_SUBMIT: // statisticsContext LOG.debug("Device {}, initial submit OK.", deviceInfo); this.initialSubmitting.set(true); break; case MASTER_ON_DEVICE: // roleContext LOG.debug("Device {}, master state OK.", deviceInfo); this.masterStateOnDevice.set(true); break; case INITIAL_GATHERING: // statisticsContext LOG.debug("Device {}, initial gathering OK.", deviceInfo); this.initialGathering.set(true); break; case RPC_REGISTRATION: // rpcContext LOG.debug("Device {}, RPC registration OK.", deviceInfo); this.rpcRegistration.set(true); break; case INITIAL_FLOW_REGISTRY_FILL: // 是deviceContext.instantiateServiceInstance执行完传入状态 // Flow registry fill is not mandatory to work as a master LOG.debug("Device {}, initial registry filling OK.", deviceInfo); this.registryFilling.set(true); break; case CHECK: // no operation break; default: // no operation break; }

// 几个context(deviceContext,rpcContext,statisticContext,roleContext)都初始化完成(各自instantiateServiceInstance执行完成), result才会true // ReconciliationFramework和initialSubmitting是互斥关系, 使用ReconciliationFramework,initialSubmitting就是false(可以看statisticsContextImpl中逻辑) final boolean result = initialGathering.get() && masterStateOnDevice.get() && rpcRegistration.get() && inReconciliationFrameworkStep || initialSubmitting.get();

if (!inReconciliationFrameworkStep && result && mastershipState != ContextChainMastershipState.CHECK) { // 不使用ReconciliationFramework情况下, deviceContext,rpcContext,statisticContext,roleContext都初始化完成后,就会进入此 LOG.info("Device {} is able to work as master{}", deviceInfo, registryFilling.get() ? "." : " WITHOUT flow registry !!!"); /* 1.设置contextChainState状态为WORKING_MASTER 2.设置各个context(deviceContext,rpcContext,statisticContext,roleContext)状态为WORKING_MASTER */ changeMastershipState(ContextChainState.WORKING_MASTER); }

return result; }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
    publicbooleanisMastered(@NonnullContextChainMastershipState mastershipState,
                              booleaninReconciliationFrameworkStep){
        switch(mastershipState){
            caseINITIAL_SUBMIT:// statisticsContext
                LOG.debug("Device {}, initial submit OK.",deviceInfo);
                this.initialSubmitting.set(true);
                break;
            caseMASTER_ON_DEVICE:// roleContext
                LOG.debug("Device {}, master state OK.",deviceInfo);
                this.masterStateOnDevice.set(true);
                break;
            caseINITIAL_GATHERING:// statisticsContext
                LOG.debug("Device {}, initial gathering OK.",deviceInfo);
                this.initialGathering.set(true);
                break;
            caseRPC_REGISTRATION:// rpcContext
                LOG.debug("Device {}, RPC registration OK.",deviceInfo);
                this.rpcRegistration.set(true);
                break;
            caseINITIAL_FLOW_REGISTRY_FILL:// 是deviceContext.instantiateServiceInstance执行完传入状态
                // Flow registry fill is not mandatory to work as a master
                LOG.debug("Device {}, initial registry filling OK.",deviceInfo);
                this.registryFilling.set(true);
                break;
            caseCHECK:
                // no operation
                break;
            default:
                // no operation
                break;
        }
 
        
        // 几个context(deviceContext,rpcContext,statisticContext,roleContext)都初始化完成(各自instantiateServiceInstance执行完成), result才会true
        // ReconciliationFramework和initialSubmitting是互斥关系, 使用ReconciliationFramework,initialSubmitting就是false(可以看statisticsContextImpl中逻辑)
        finalbooleanresult=initialGathering.get()&&masterStateOnDevice.get()&&rpcRegistration.get()
                &&inReconciliationFrameworkStep||initialSubmitting.get();
 
        if(!inReconciliationFrameworkStep&&result&&mastershipState!=ContextChainMastershipState.CHECK){
            // 不使用ReconciliationFramework情况下, deviceContext,rpcContext,statisticContext,roleContext都初始化完成后,就会进入此
            LOG.info("Device {} is able to work as master{}",deviceInfo,
                     registryFilling.get()?".":" WITHOUT flow registry !!!");
            /*
                1.设置contextChainState状态为WORKING_MASTER
                2.设置各个context(deviceContext,rpcContext,statisticContext,roleContext)状态为WORKING_MASTER
             */
            changeMastershipState(ContextChainState.WORKING_MASTER);
        }
 
        returnresult;
    }

在isMastered方法中,会根据调用传入的状态,设置对应其类属性标识。且最都会一起判断各个标识位,当各个当各个context(device/rpc/statistics/role)都完成初始化,就会表示Openflowplugin中已经完成服务实例化,即已经成为Switch的Master,会返回true

关注一下判断Master逻辑:inReconciliationFrameworkStep和initialSubmitting是互斥关系(回顾上面的StatisticsContext流程)

Java
final boolean result = initialGathering.get() && masterStateOnDevice.get() && rpcRegistration.get() && inReconciliationFrameworkStep || initialSubmitting.get();
1
finalbooleanresult=initialGathering.get()&&masterStateOnDevice.get()&&rpcRegistration.get()&&inReconciliationFrameworkStep||initialSubmitting.get();

最后如果不在ReconciliationFramework步骤中(第二个传入参数),会改变contextChain和各个context状态为WORKING_MASTER。

回到ContextChainHolderImpl.onMasterRoleAcquired方法中:
1)如果使用ReconciliationFramework,且isMastered(方法返回true)情况下,会调用ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo)方法。
且有额外的回调reconciliationFrameworkCallback,其会调用statisticsContext.continueInitializationAfterReconciliation方法开启statisticsContext中的持续收集信息。注意在statisticsContext初始化时,会根据使用reconciliationFramework跳过开启持续收集信息,目的是在这里完成ReconciliationFramework调用后再开启。

2)不使用ReconciliationFramework时,且isMastered(方法返回true)情况下:会调用ownershipChangeListener.becomeMaster;再调用deviceManager向inventory通知device node节点添加;

先给出结论,当完成Switch的ContextChain在leader节点上实例化后,会调用MastershipChangeServiceManager方法,其为ofp底层对上层应用提供的钩子触发服务。当实例化完成(isMasterd)后,触发上层应用。如果使用ReconciliationFramework,则调用becomeMasterBeforeSubmittedDS方法,否则调用becomeMaster方法。

由于篇幅问题,我们在下一笔记才继续深入MastershipChangeServiceManager。留下疑问:MastershipChangeServiceManager是如何作为钩子,为上层提供感知节点成为Switch的Master呢?

4.总结

在本文中,我可以了解到控制器节点作为Switch的Master选举是直接通过ODL controller底层提供的Cluster Singleton Service实现。

且我们深入看到在控制器节点选举成为Master后,会进行各个Context服务的实例化,包括Device,Rpc,Statistics,Role。只有当完成所有Context的实例化,Openflowplugin才能对北向应用提供此Switch的服务。

最后,提出一个观点:MastershipChangeServiceManager,其为上层感知节点成为Switch的Master提供钩子。让我们下篇再见!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK