101

(一)ODL OpenflowPlugin启动流程源码分析

 5 years ago
source link: https://www.sdnlab.com/21199.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发,个人邮箱:[email protected]

本文为OpenflowPlugin源码分析第一篇,会有一系列文章深入openflowplugin,包括device上线handshake、master选举、调用ofp提供rpc服务、statistics gather。本文首先从OpenflowPlugin是如何启动展开。

读者约定:基本掌握Opendaylight的思想/有一定实践经验,想要深入理解openflowplugin源码/想对openflowplugin源码修改。

ODL-OpenflowPlugin1%28668x400%29.jpg

启动过程

1.创建SwitchConnectionProviderFactoryImpl作为service注入OSGI

openflow-protocol-impl/src/main/resources/org/opendaylight/blueprint/openflow-protocol-impl.xml中实例化SwitchConnectionProviderFactoryImpl.java作为service注入OSGI

Java

<bean id="switchConnFactory" class="org.opendaylight.openflowjava.protocol.impl.core.SwitchConnectionProviderFactoryImpl"/>

<service ref="switchConnFactory" interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderFactory" odl:type="default"/>

1
2
3
4
<bean id="switchConnFactory"class="org.opendaylight.openflowjava.protocol.impl.core.SwitchConnectionProviderFactoryImpl"/>
 
  <service ref="switchConnFactory"interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderFactory"
          odl:type="default"/>

SwitchConnectionProviderFactoryImpl作用是提供newInstance()工厂方法,创建SwitchConnectionProviderImpl对象。

2.创建SwitchConnectionProvider对象作为service注入OSGI

openflowjava/openflowjava-blueprint-config/src/main/resources/org/opendaylight/blueprint/openflowjava.xml中:

1.引用上面注入OSGI的switchConnectionProviderFactory服务

Java
<reference id="switchConnectionProviderFactory" interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderFactory" odl:type="default"/>
1
2
<reference id="switchConnectionProviderFactory"interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderFactory"
          odl:type="default"/>

2.读取配置,调用switchConnectionProviderFactory工厂方法newInstance()创建switchConnectionProvider:

Java

<!-- Create OF switch connection provider on port 6653 (default) --> <odl:clustered-app-config id="defaultSwitchConnConfig" default-config-file-name="default-openflow-connection-config.xml" binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow._switch.connection.config.rev160506.SwitchConnectionConfig" list-key-value="openflow-switch-connection-provider-default-impl"> </odl:clustered-app-config>

<!-- 调用引用service SwitchConnectionProviderFactoryImpl的 newInstance() --> <bean id="defaultSwitchConnProvider" factory-ref="switchConnectionProviderFactory" factory-method="newInstance"> <argument ref="defaultSwitchConnConfig"/> </bean>

<!-- 定义了defaultSwitchConnProvider作为一个service --> <service ref="defaultSwitchConnProvider" interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider" odl:type="openflow-switch-connection-provider-default-impl"/>

<!-- Create OF switch connection provider on port 6633 (legacy) --> <odl:clustered-app-config id="legacySwitchConnConfig" default-config-file-name="legacy-openflow-connection-config.xml" binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow._switch.connection.config.rev160506.SwitchConnectionConfig" list-key-value="openflow-switch-connection-provider-legacy-impl"> </odl:clustered-app-config>

<!-- 调用引用service SwitchConnectionProviderFactoryImpl的 newInstance() --> <bean id="legacySwitchConnProvider" factory-ref="switchConnectionProviderFactory" factory-method="newInstance"> <argument ref="legacySwitchConnConfig"/> </bean>

<!-- 定义了legacySwitchConnProvider作为一个service --> <service ref="legacySwitchConnProvider" interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider" odl:type="openflow-switch-connection-provider-legacy-impl"/>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<!--Create OF switchconnection provider on port6653(default)-->
  <odl:clustered-app-config id="defaultSwitchConnConfig"default-config-file-name="default-openflow-connection-config.xml"
      binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow._switch.connection.config.rev160506.SwitchConnectionConfig"
      list-key-value="openflow-switch-connection-provider-default-impl">
  </odl:clustered-app-config>
 
  <!--调用引用service SwitchConnectionProviderFactoryImplnewInstance()-->
  <bean id="defaultSwitchConnProvider"factory-ref="switchConnectionProviderFactory"factory-method="newInstance">
    <argument ref="defaultSwitchConnConfig"/>
  </bean>
 
  <!--定义了defaultSwitchConnProvider作为一个service-->
  <service ref="defaultSwitchConnProvider"interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider"
          odl:type="openflow-switch-connection-provider-default-impl"/>
          
 
  <!--Create OF switchconnection provider on port6633(legacy)-->
  <odl:clustered-app-config id="legacySwitchConnConfig"default-config-file-name="legacy-openflow-connection-config.xml"
      binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow._switch.connection.config.rev160506.SwitchConnectionConfig"
      list-key-value="openflow-switch-connection-provider-legacy-impl">
  </odl:clustered-app-config>
 
  <!--调用引用service SwitchConnectionProviderFactoryImplnewInstance()-->
  <bean id="legacySwitchConnProvider"factory-ref="switchConnectionProviderFactory"factory-method="newInstance">
    <argument ref="legacySwitchConnConfig"/>
  </bean>
 
  <!--定义了legacySwitchConnProvider作为一个service-->
  <service ref="legacySwitchConnProvider"interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider"
          odl:type="openflow-switch-connection-provider-legacy-impl"/>

特别说明:
1)上面分别读取了配置文件,创建了两个switchConnectionProvider:defaultSwitchConnProvider及legacySwitchConnConfig
2)两个provider分别会监听6653、6633端口,分别是支持openflow1.3、openflow1.0
3)在blueprint中,将defaultSwitchConnProvider及legacySwitchConnProvider作为服务注入到OSGI,供后续其他bundle引用

调用switchConnectionProviderFactory的newInstance()方法,实际就是new SwitchConnectionProviderImpl()。过程会创建serializationFactory、deserializationFactory:

Java
public SwitchConnectionProviderImpl(ConnectionConfiguration connConfig) { this.connConfig = connConfig; serializerRegistry = new SerializerRegistryImpl(); if (connConfig != null) { serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled()); } serializerRegistry.init(); serializationFactory = new SerializationFactory(serializerRegistry); deserializerRegistry = new DeserializerRegistryImpl(); deserializerRegistry.init(); deserializationFactory = new DeserializationFactory(deserializerRegistry); }
1
2
3
4
5
6
7
8
9
10
11
12
  publicSwitchConnectionProviderImpl(ConnectionConfiguration connConfig){
        this.connConfig=connConfig;
        serializerRegistry=newSerializerRegistryImpl();
        if(connConfig!=null){
            serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
        }
        serializerRegistry.init();
        serializationFactory=newSerializationFactory(serializerRegistry);
        deserializerRegistry=newDeserializerRegistryImpl();
        deserializerRegistry.init();
        deserializationFactory=newDeserializationFactory(deserializerRegistry);
    }

说明:
SwitchConnectionProviderImpl对象,为switch连上控制器提供端口监听tcp/udp 6633/6653。

至此,SwitchConnectionProviderImpl对象已经被创建,且创建了serializationFactory、deserializationFactory。但是,还没有看到启动服务监听端口的调用,后续我们会看到其他bundle引用SwitchConnectionProviderImpl,然后启动服务监听端口。

而上述“其他bundle”正是openflowPlugin的bundle,在下面展开。

SwitchConnectionProviderImpl除了OpenflowPlugin中会引用,在openflow项目下的onf-extension及niciria-extension会对其进行扩展register Serializer/Deserializer.

3.创建OpenFlowPluginProviderFactoryImpl作为service注入OSGI

openflowplugin-impl/src/main/resources/org/opendaylight/blueprint/openflowplugin-impl.xml中,实例化OpenFlowPluginProviderFactoryImpl.java作为service注入OSGI

Java
<bean id="ofPluginProviderFactory" class="org.opendaylight.openflowplugin.impl.OpenFlowPluginProviderFactoryImpl"/> <service ref="ofPluginProviderFactory" interface="org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProviderFactory"/>
1
2
<bean id="ofPluginProviderFactory"class="org.opendaylight.openflowplugin.impl.OpenFlowPluginProviderFactoryImpl"/>
  <service ref="ofPluginProviderFactory"interface="org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProviderFactory"/>

OpenFlowPluginProviderFactoryImpl作用是提供newInstance()工厂方法,创建OpenFlowPluginProviderImpl对象:

  • newInstance()方法创建OpenFlowPluginProviderImpl对象,并调用其initialize()
    方法
Java

final OpenFlowPluginProvider openflowPluginProvider = new OpenFlowPluginProviderImpl( configurationService, switchConnectionProviders, dataBroker, rpcRegistry, notificationPublishService, singletonServiceProvider, entityOwnershipService, mastershipChangeServiceManager, ofPluginDiagstatusProvider, systemReadyMonitor);

openflowPluginProvider.initialize(); return openflowPluginProvider;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
finalOpenFlowPluginProvider openflowPluginProvider=newOpenFlowPluginProviderImpl(
                configurationService,
                switchConnectionProviders,
                dataBroker,
                rpcRegistry,
                notificationPublishService,
                singletonServiceProvider,
                entityOwnershipService,
                mastershipChangeServiceManager,
                ofPluginDiagstatusProvider,
                systemReadyMonitor);
 
        openflowPluginProvider.initialize();
        returnopenflowPluginProvider;

4.创建OpenFlowPluginProvider对象作为service注入OSGI

openflowplugin-blueprint-config/src/main/resources/org/opendaylight/blueprint/openflowplugin.xml中,引用reference 上描述在OGSI中作为serviceOpenFlowPluginProviderFactoryImpl,并调用其工厂方法newInstance();并将其创建的openflowPluginProvider作为service注入到OSGI中。

Java

<reference id="openflowPluginProviderFactory" interface="org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProviderFactory"/>

<bean id="openflowPluginProvider" factory-ref="openflowPluginProviderFactory" factory-method="newInstance" destroy-method="close"> <argument ref="configurationService"/> <argument ref="dataBroker"/> <argument ref="rpcRegistry"/> <argument ref="notificationPublishService"/> <argument ref="entityOwnershipService"/> <argument> <list> <ref component-id="defaultSwitchConnProvider"/> <ref component-id="legacySwitchConnProvider"/> </list> </argument> <argument ref="clusterSingletonServiceProvider"/> <argument ref="mastershipChangeServiceManager"/> <argument ref="ofPluginDiagstatusProvider"/> <argument ref="systemReadyMonitor"/> </bean>

<!-- openflowpluginProvider作为service --> <service ref="openflowPluginProvider" odl:type="openflow-plugin-provider-impl"> <interfaces> <value>org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider</value> <value>org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider</value> </interfaces> </service>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<reference id="openflowPluginProviderFactory"
               interface="org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProviderFactory"/>
 
    <bean id="openflowPluginProvider"
          factory-ref="openflowPluginProviderFactory"
          factory-method="newInstance"
          destroy-method="close">
        <argument ref="configurationService"/>
        <argument ref="dataBroker"/>
        <argument ref="rpcRegistry"/>
        <argument ref="notificationPublishService"/>
        <argument ref="entityOwnershipService"/>
        <argument>
            <list>
                <ref component-id="defaultSwitchConnProvider"/>
                <ref component-id="legacySwitchConnProvider"/>
            </list>
        </argument>
        <argument ref="clusterSingletonServiceProvider"/>
        <argument ref="mastershipChangeServiceManager"/>
        <argument ref="ofPluginDiagstatusProvider"/>
        <argument ref="systemReadyMonitor"/>
    </bean>
 
    <!--openflowpluginProvider作为service-->
    <service ref="openflowPluginProvider"odl:type="openflow-plugin-provider-impl">
        <interfaces>
            <value>org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider</value>
            <value>org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider</value>
        </interfaces>
    </service>

OpenflowPluginProviderImplnewInstance()方法调用:

  • 注册MXBean
  • 创建线程池对象
  • 创建DeviceManagerImpl
  • 创建RpcManagerImpl
  • 创建StatisticsManagerImpl
  • 创建RoleManagerImpl
  • 创建ContextChainHolderImpl
  • 创建ConnectionManagerImpl
  • 调用DeviceManagerImpl.initialize()
Java

@Override public void initialize() { // 注册MXBean监听 registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);

// TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters // TODO: rewrite later! OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);

// Creates a thread pool that creates new threads as needed, but will reuse previously // constructed threads when they are available. // Threads that have not been used for x seconds are terminated and removed from the cache. // 创建线程池 executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor( config.getThreadPoolMinThreads(), config.getThreadPoolMaxThreads().getValue(), config.getThreadPoolTimeout(), TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));

// 创建Device manager deviceManager = new DeviceManagerImpl( config, dataBroker, getMessageIntelligenceAgency(), // MessageSpy: MessageIntelligenceAgencyImpl 消息监听Impl notificationPublishService, hashedWheelTimer, convertorManager, deviceInitializerProvider);

TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager); ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);

rpcManager = new RpcManagerImpl( config, rpcProviderRegistry, extensionConverterManager, convertorManager, notificationPublishService);

statisticsManager = new StatisticsManagerImpl( config, rpcProviderRegistry, convertorManager, executorService);

roleManager = new RoleManagerImpl(hashedWheelTimer, config);

// 创建context chain holder封装所有服务 contextChainHolder = new ContextChainHolderImpl( executorService, singletonServicesProvider, entityOwnershipService, mastershipChangeServiceManager);

contextChainHolder.addManager(deviceManager); contextChainHolder.addManager(statisticsManager); contextChainHolder.addManager(rpcManager); contextChainHolder.addManager(roleManager);

// 创建connection Manger // 传入contextChainHolder connectionManager = new ConnectionManagerImpl(config, executorService); connectionManager.setDeviceConnectedHandler(contextChainHolder); connectionManager.setDeviceDisconnectedHandler(contextChainHolder);

// 给device manager设置contextChain deviceManager.setContextChainHolder(contextChainHolder); // 效果是给deviceManage中的消息监听器MessageIntelligenceAgencyImpl调度线程运行 deviceManager.initialize(); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Override
    publicvoidinitialize(){
        // 注册MXBean监听
        registerMXBean(MESSAGE_INTELLIGENCE_AGENCY,MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
 
        // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
        // TODO: rewrite later!
        OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
 
        // Creates a thread pool that creates new threads as needed, but will reuse previously
        // constructed threads when they are available.
        // Threads that have not been used for x seconds are terminated and removed from the cache.
        // 创建线程池
        executorService=MoreExecutors.listeningDecorator(newThreadPoolLoggingExecutor(
                config.getThreadPoolMinThreads(),
                config.getThreadPoolMaxThreads().getValue(),
                config.getThreadPoolTimeout(),
                TimeUnit.SECONDS,newSynchronousQueue<>(),POOL_NAME));
 
        // 创建Device manager
        deviceManager=newDeviceManagerImpl(
                config,
                dataBroker,
                getMessageIntelligenceAgency(),// MessageSpy: MessageIntelligenceAgencyImpl 消息监听Impl
                notificationPublishService,
                hashedWheelTimer,
                convertorManager,
                deviceInitializerProvider);
 
        TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager,convertorManager);
        ((ExtensionConverterProviderKeeper)deviceManager).setExtensionConverterProvider(extensionConverterManager);
 
        rpcManager=newRpcManagerImpl(
                config,
                rpcProviderRegistry,
                extensionConverterManager,
                convertorManager,
                notificationPublishService);
 
        statisticsManager=newStatisticsManagerImpl(
                config,
                rpcProviderRegistry,
                convertorManager,
                executorService);
 
        roleManager=newRoleManagerImpl(hashedWheelTimer,config);
 
        // 创建context chain holder封装所有服务
        contextChainHolder=newContextChainHolderImpl(
                executorService,
                singletonServicesProvider,
                entityOwnershipService,
                mastershipChangeServiceManager);
 
        contextChainHolder.addManager(deviceManager);
        contextChainHolder.addManager(statisticsManager);
        contextChainHolder.addManager(rpcManager);
        contextChainHolder.addManager(roleManager);
 
        // 创建connection Manger
        //  传入contextChainHolder
        connectionManager=newConnectionManagerImpl(config,executorService);
        connectionManager.setDeviceConnectedHandler(contextChainHolder);
        connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
 
        // 给device manager设置contextChain
        deviceManager.setContextChainHolder(contextChainHolder);
        // 效果是给deviceManage中的消息监听器MessageIntelligenceAgencyImpl调度线程运行
        deviceManager.initialize();
    }

OpenflowPluginProvider初始化过程

1.创建了四个managerDeviceManagerImpl,RpcManagerImpl,StatisticsManagerImpl,RoleManagerImpl,用于当有switch连上控制器时用于为connection处理。

2.创建ContextChainHolderImpl对象,将四个manager addManager()进去。而ContextChainHolderImpl会在switch连上控制器时,为每个connection创建ContextChainImpl

3.创建ConnectionManagerImpl,用于底层openflowjava到openflowplugin的关键桥梁对象,当switch连上控制器,会触发ConnectionManagerImpl.onSwitchConnected方法。用于manager connection,后面会详细阐述。

4.调用DeviceManagerImpl.initialize(),创建线程监听消息,比如统计,具体处理在MessageIntelligenceAgencyImpl.java

注意,OpenflowPlugin0.6.2与0.5.4-SNATSHOP版本有一个最大的区别,OpenflowPluginProvider.startSwitchConnections方法被调用方式。
在0.5.4-SNATSHOP中,此方法是在initialize()方法最后会被调用。而0.6.2 OpenflowPluginProvider实现了接口SystemReadyListener,用于当系统ready时候可以做某些操作,而在OpenflowPluginProvider 0.6.2的处理是调用startSwitchConnections方法:

Java

@Override public void onSystemBootReady() { LOG.debug("onSystemBootReady() received, starting the switch connections"); startSwitchConnections(); }

private void startSwitchConnections() { Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> { // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava if (config.isUseSingleLayerSerialization()) { SerializerInjector.injectSerializers(switchConnectionProvider, switchConnectionProvider.getConfiguration().isGroupAddModEnabled()); DeserializerInjector.injectDeserializers(switchConnectionProvider); } else { DeserializerInjector.revertDeserializers(switchConnectionProvider); }

// 给switch connection provider设置ConnectionManagerImpl() // switch connection provider是在openflowjava中的openflowjava-blueprint-config的openflowjava.xml blueprint创建的 // Set handler of incoming connections and start switch connection provider switchConnectionProvider.setSwitchConnectionHandler(connectionManager); /* 调用startup, 效果: 创建tcp/udp的server监听6633、6653端口 当有sw连接时,调用TcpChannelInitializer的initChannel()方法,会调用调用openflowplugin的ConnectionManagerImpl onSwitchConnected()方法 */ return switchConnectionProvider.startup(); }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() { @Override public void onSuccess(@Nonnull final List<Boolean> result) { LOG.info("All switchConnectionProviders are up and running ({}).", result.size()); openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL, "switch connections started"); }

@Override public void onFailure(@Nonnull final Throwable throwable) { LOG.warn("Some switchConnectionProviders failed to start.", throwable); openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, "some switch connections failed to start"); } }, MoreExecutors.directExecutor()); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
    publicvoidonSystemBootReady(){
        LOG.debug("onSystemBootReady() received, starting the switch connections");
        startSwitchConnections();
    }
 
    privatevoidstartSwitchConnections(){
        Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider->{
            // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
            if(config.isUseSingleLayerSerialization()){
                SerializerInjector.injectSerializers(switchConnectionProvider,
                        switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
                DeserializerInjector.injectDeserializers(switchConnectionProvider);
            }else{
                DeserializerInjector.revertDeserializers(switchConnectionProvider);
            }
 
            // 给switch connection provider设置ConnectionManagerImpl()
            //    switch connection provider是在openflowjava中的openflowjava-blueprint-config的openflowjava.xml blueprint创建的
            // Set handler of incoming connections and start switch connection provider
            switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
            /*
                调用startup, 效果: 创建tcp/udp的server监听6633、6653端口
                    当有sw连接时,调用TcpChannelInitializer的initChannel()方法,会调用调用openflowplugin的ConnectionManagerImpl onSwitchConnected()方法
             */
            returnswitchConnectionProvider.startup();
        }).collect(Collectors.toSet())),newFutureCallback<List<Boolean>>(){
            @Override
            publicvoidonSuccess(@NonnullfinalList<Boolean>result){
                LOG.info("All switchConnectionProviders are up and running ({}).",result.size());
                openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL,"switch connections started");
            }
 
            @Override
            publicvoidonFailure(@NonnullfinalThrowable throwable){
                LOG.warn("Some switchConnectionProviders failed to start.",throwable);
                openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR,"some switch connections failed to start");
            }
        },MoreExecutors.directExecutor());
    }

可以看到startSwitchConnections方法,最重要的逻辑是:

Java
switchConnectionProvider.setSwitchConnectionHandler(connectionManager); return switchConnectionProvider.startup();
1
2
switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
            returnswitchConnectionProvider.startup();

其目的是,将OpenflowPluginProvider中创建的connectionManager传入到上面提及Openflowjava中的SwitchConnectionProviderImpl对象,并调用其startup方法。

可以注意到这里SwitchConnectionProviderImpl是在openflowjava的blueprint中创建的,而实际调用其startup方法是在OpenflowPlugin中,这里很能体现OSGI模块化的思想。

接下来可以展开SwitchConnectionProviderImpl.startup()方法:其最终作用是创建tcp/udp的server监听6633、6653端口,当switch连上端口(控制器)时,调用connectionManager的onSwitchConnected方法,进而进入接下来OpenflowPlugin的处理(比如Handshake,创建各个context,mastership选举等)。

Java
@Override @SuppressWarnings("checkstyle:IllegalCatch") public ListenableFuture<Boolean> startup() { LOG.debug("Startup summoned"); ListenableFuture<Boolean> result = null; try { /* 效果: 创建tcp/udp的server监听6633、6653端口 1.创建TcpServer,后面最终会调用其run()方法 2.当有连接时,调用TcpChannelInitializer的initChannel()方法,会调用调用openflowplugin的ConnectionManagerImpl */ serverFacade = createAndConfigureServer(); if (switchConnectionHandler == null) { throw new IllegalStateException("SwitchConnectionHandler is not set"); } // 创建一个线程运行ServerFacade new Thread(serverFacade).start(); result = serverFacade.getIsOnlineFuture(); } catch (RuntimeException e) { final SettableFuture<Boolean> exResult = SettableFuture.create(); exResult.setException(e); result = exResult; } return result; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
    @SuppressWarnings("checkstyle:IllegalCatch")
    publicListenableFuture<Boolean>startup(){
        LOG.debug("Startup summoned");
        ListenableFuture<Boolean>result=null;
        try{
            /*
                效果: 创建tcp/udp的server监听6633、6653端口
                    1.创建TcpServer,后面最终会调用其run()方法
                    2.当有连接时,调用TcpChannelInitializer的initChannel()方法,会调用调用openflowplugin的ConnectionManagerImpl
             */
            serverFacade=createAndConfigureServer();
            if(switchConnectionHandler==null){
                thrownewIllegalStateException("SwitchConnectionHandler is not set");
            }
            // 创建一个线程运行ServerFacade
            newThread(serverFacade).start();
            result=serverFacade.getIsOnlineFuture();
        }catch(RuntimeExceptione){
            finalSettableFuture<Boolean>exResult=SettableFuture.create();
            exResult.setException(e);
            result=exResult;
        }
        returnresult;
    }

以TCP为例,展开。startup方法的核心是serverFacade = createAndConfigureServer();

Java

private ServerFacade createAndConfigureServer() { LOG.debug("Configuring .."); ServerFacade server = null; // 创建channel factory并配置 final ChannelInitializerFactory factory = new ChannelInitializerFactory(); // 记住switchConnectionHandler是OpenflowPlugin中传入的ConnectionManagerImpl factory.setSwitchConnectionHandler(switchConnectionHandler); factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout()); factory.setTlsConfig(connConfig.getTlsConfiguration()); factory.setSerializationFactory(serializationFactory); factory.setDeserializationFactory(deserializationFactory); factory.setUseBarrier(connConfig.useBarrier()); factory.setChannelOutboundQueueSize(connConfig.getChannelOutboundQueueSize()); final TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();

// Check if Epoll native transport is available. // TODO : Add option to disable Epoll. boolean isEpollEnabled = Epoll.isAvailable();

if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) { // 创建TCP handler用于监听地址及端口 server = new TcpHandler(connConfig.getAddress(), connConfig.getPort()); // 创建TCP publishing channel initializer: 这里创建的initializer是有ConnectionManagerImpl的的引用 final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer(); ((TcpHandler) server).setChannelInitializer(channelInitializer); // 会创建底层netty相关的channel对象 ((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);

// 获取上一步创建的netty底层channel相关对象:EventLoopGroup final EventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup(); // 创建TcpConnectionInitializer connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, isEpollEnabled); // 设置上面的channelInitializer(带有ConnectionManagerImpl引用) connectionInitializer.setChannelInitializer(channelInitializer); /* 会调用TcpHandler run()方法, 最终效果会监听tcp端口 TcpHandler run()中设置了再调用到TcpChannelInitializer的initChannel()方法; 当远程sw连上就会调用initChannel(),其会调用openflowplugin的ConnectionManagerImpl */ connectionInitializer.run(); } else if (TransportProtocol.UDP.equals(transportProtocol)) { server = new UdpHandler(connConfig.getAddress(), connConfig.getPort()); ((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled); ((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer()); } else { throw new IllegalStateException("Unknown transport protocol received: " + transportProtocol); } server.setThreadConfig(connConfig.getThreadConfiguration()); return server; }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
privateServerFacade createAndConfigureServer(){
        LOG.debug("Configuring ..");
        ServerFacade server=null;
        // 创建channel factory并配置
        finalChannelInitializerFactory factory=newChannelInitializerFactory();
        // 记住switchConnectionHandler是OpenflowPlugin中传入的ConnectionManagerImpl
        factory.setSwitchConnectionHandler(switchConnectionHandler);
        factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout());
        factory.setTlsConfig(connConfig.getTlsConfiguration());
        factory.setSerializationFactory(serializationFactory);
        factory.setDeserializationFactory(deserializationFactory);
        factory.setUseBarrier(connConfig.useBarrier());
        factory.setChannelOutboundQueueSize(connConfig.getChannelOutboundQueueSize());
        finalTransportProtocol transportProtocol=(TransportProtocol)connConfig.getTransferProtocol();
 
        // Check if Epoll native transport is available.
        // TODO : Add option to disable Epoll.
        booleanisEpollEnabled=Epoll.isAvailable();
 
        if(TransportProtocol.TCP.equals(transportProtocol)||TransportProtocol.TLS.equals(transportProtocol)){
            // 创建TCP handler用于监听地址及端口
            server=newTcpHandler(connConfig.getAddress(),connConfig.getPort());
            // 创建TCP publishing channel initializer: 这里创建的initializer是有ConnectionManagerImpl的的引用
            finalTcpChannelInitializer channelInitializer=factory.createPublishingChannelInitializer();
            ((TcpHandler)server).setChannelInitializer(channelInitializer);
            // 会创建底层netty相关的channel对象
            ((TcpHandler)server).initiateEventLoopGroups(connConfig.getThreadConfiguration(),isEpollEnabled);
 
            // 获取上一步创建的netty底层channel相关对象:EventLoopGroup
            finalEventLoopGroup workerGroupFromTcpHandler=((TcpHandler)server).getWorkerGroup();
            // 创建TcpConnectionInitializer
            connectionInitializer=newTcpConnectionInitializer(workerGroupFromTcpHandler,isEpollEnabled);
            // 设置上面的channelInitializer(带有ConnectionManagerImpl引用)
            connectionInitializer.setChannelInitializer(channelInitializer);
            /*
             会调用TcpHandler run()方法, 最终效果会监听tcp端口
                TcpHandler run()中设置了再调用到TcpChannelInitializer的initChannel()方法;
                当远程sw连上就会调用initChannel(),其会调用openflowplugin的ConnectionManagerImpl
             */
            connectionInitializer.run();
        }elseif(TransportProtocol.UDP.equals(transportProtocol)){
            server=newUdpHandler(connConfig.getAddress(),connConfig.getPort());
            ((UdpHandler)server).initiateEventLoopGroups(connConfig.getThreadConfiguration(),isEpollEnabled);
            ((UdpHandler)server).setChannelInitializer(factory.createUdpChannelInitializer());
        }else{
            thrownewIllegalStateException("Unknown transport protocol received: "+transportProtocol);
        }
        server.setThreadConfig(connConfig.getThreadConfiguration());
        returnserver;
    }

在这个方法内,几个关键地方,可以说明Openflowjava中有switch连上后,触发ConnectionManagerImpl的onSwitchConnected方法的调用:
1.switchConnectionHandler是在OpenflowPlugin创建的ConnectionManagerImplChannelInitializerFactory对象创建后是通过setSwitchConnectionHandler(switchConnectionHandler)方法设置了ConnectionManagerImpl对象;

Java
final ChannelInitializerFactory factory = new ChannelInitializerFactory(); factory.setSwitchConnectionHandler(switchConnectionHandler);
1
2
finalChannelInitializerFactory factory=newChannelInitializerFactory();
factory.setSwitchConnectionHandler(switchConnectionHandler);

2.通过ChannelInitializerFactory factory对象创建的TcpChannelInitializer对象是有ConnectionManagerImpl对象,并set到tcp servier对象。

Java
final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer(); ((TcpHandler) server).setChannelInitializer(channelInitializer);
1
2
finalTcpChannelInitializer channelInitializer=factory.createPublishingChannelInitializer();
((TcpHandler)server).setChannelInitializer(channelInitializer);

3.用于初始化tcp connection的对象TcpConnectionInitializer中,是有传入(setChannelInitializer)TcpChannelInitializer channelInitializer(TcpChannelInitializer带有ConnectionManagerImpl对象引用)

Java
connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, isEpollEnabled); connectionInitializer.setChannelInitializer(channelInitializer);
1
2
connectionInitializer=newTcpConnectionInitializer(workerGroupFromTcpHandler,isEpollEnabled);
connectionInitializer.setChannelInitializer(channelInitializer);

然后回到startup方法,new Thread(serverFacade).start();将tcp server(TcpHandler对象)运行。

TcpHandler.run方法中,.childHandler(channelInitializer)调用上述创建的TcpChannelInitializer对象(带有ConnectionManagerImpl对象引用)。

Java

@Override public void run() { /* * We generally do not perform IO-unrelated tasks, so we want to have * all outstanding tasks completed before the executing thread goes * back into select. * * Any other setting means netty will measure the time it spent selecting * and spend roughly proportional time executing tasks. */ //workerGroup.setIoRatio(100);

final ChannelFuture f; try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(socketChannelClass) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(channelInitializer) // 这里应该调用的是TcpChannelInitializer的initChannel方法 .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY , true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK)) .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);

if (startupAddress != null) { f = bootstrap.bind(startupAddress.getHostAddress(), port).sync(); } else { f = bootstrap.bind(port).sync(); } } catch (InterruptedException e) { LOG.error("Interrupted while binding port {}", port, e); return; }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
    publicvoidrun(){
        /*
         * We generally do not perform IO-unrelated tasks, so we want to have
         * all outstanding tasks completed before the executing thread goes
         * back into select.
         *
         * Any other setting means netty will measure the time it spent selecting
         * and spend roughly proportional time executing tasks.
         */
        //workerGroup.setIoRatio(100);
 
        finalChannelFuturef;
        try{
            ServerBootstrap bootstrap=newServerBootstrap();
            bootstrap.group(bossGroup,workerGroup)
                    .channel(socketChannelClass)
                    .handler(newLoggingHandler(LogLevel.DEBUG))
                    .childHandler(channelInitializer)// 这里应该调用的是TcpChannelInitializer的initChannel方法
                    .option(ChannelOption.SO_BACKLOG,128)
                    .option(ChannelOption.SO_REUSEADDR,true)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
                            newWriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK,DEFAULT_WRITE_HIGH_WATERMARK))
                    .childOption(ChannelOption.WRITE_SPIN_COUNT,DEFAULT_WRITE_SPIN_COUNT);
 
            if(startupAddress!=null){
                f=bootstrap.bind(startupAddress.getHostAddress(),port).sync();
            }else{
                f=bootstrap.bind(port).sync();
            }
        }catch(InterruptedExceptione){
            LOG.error("Interrupted while binding port {}",port,e);
            return;
        }

对netty底层的调用不熟悉,大胆猜测。当有switch通过tcp connect到6633/6653监听端口(TcpHandler),就会调用TcpChannelInitializer对象的initChannel方法。在initChannel方法中可以看到调用ConnectionManagerImpl的onSwitchConnected方法:

Java
getSwitchConnectionHandler().onSwitchConnected(connectionFacade);
1
getSwitchConnectionHandler().onSwitchConnected(connectionFacade);

下篇笔记再讲述switch连上控制器后,怎么进行握手handshake等动作。

总结

至此,可以完整看Openflowjava/Openflowplugin启动流程,ODL是如何监听openflow端口,并且当switch连上控制器后如何开始调用OpenflowPlugin中的方法。openflowplugin项目充分利用了OSGI的特性,各个“bundle”通过OSGI实现调用。

本文,只描述Openflowjava和OpenflowPlugin启动过程,而onSwitchConnected方法的后续调用,下篇笔记展开。

关键类/对象

个人理解整理:

类/对象

作用/描述

SwitchConnectionProviderImpl

提供端口监听, 底层device连上通知OpenflowPlugin进行下一步处理

TcpChannelInitializer

当switch连上,就会调用其为每个TCP连接initializer channel

ConnectionManagerImpl

负责connection,从switch连上控制器底层openfowjava,往上调用OpenflowPlugin

OpenflowPluginProvider

提供openflowPlugin的service(handshake/device/rpc/statistics/role)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK