(一)ODL OpenflowPlugin启动流程源码分析
source link: https://www.sdnlab.com/21199.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发,个人邮箱:[email protected]
本文为OpenflowPlugin源码分析第一篇,会有一系列文章深入openflowplugin,包括device上线handshake、master选举、调用ofp提供rpc服务、statistics gather。本文首先从OpenflowPlugin是如何启动展开。
读者约定:基本掌握Opendaylight的思想/有一定实践经验,想要深入理解openflowplugin源码/想对openflowplugin源码修改。
启动过程
1.创建SwitchConnectionProviderFactoryImpl作为service注入OSGI
在openflow-protocol-impl/src/main/resources/org/opendaylight/blueprint/openflow-protocol-impl.xml
中实例化SwitchConnectionProviderFactoryImpl.java
作为service注入OSGI
<bean id="switchConnFactory" class="org.opendaylight.openflowjava.protocol.impl.core.SwitchConnectionProviderFactoryImpl"/>
<service ref="switchConnFactory" interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderFactory" odl:type="default"/>
SwitchConnectionProviderFactoryImpl作用是提供newInstance()
工厂方法,创建SwitchConnectionProviderImpl
对象。
2.创建SwitchConnectionProvider对象作为service注入OSGI
在openflowjava/openflowjava-blueprint-config/src/main/resources/org/opendaylight/blueprint/openflowjava.xml
中:
1.引用上面注入OSGI的switchConnectionProviderFactory服务
2.读取配置,调用switchConnectionProviderFactory工厂方法newInstance()
创建switchConnectionProvider:
<!-- Create OF switch connection provider on port 6653 (default) --> <odl:clustered-app-config id="defaultSwitchConnConfig" default-config-file-name="default-openflow-connection-config.xml" binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow._switch.connection.config.rev160506.SwitchConnectionConfig" list-key-value="openflow-switch-connection-provider-default-impl"> </odl:clustered-app-config>
<!-- 调用引用service SwitchConnectionProviderFactoryImpl的 newInstance() --> <bean id="defaultSwitchConnProvider" factory-ref="switchConnectionProviderFactory" factory-method="newInstance"> <argument ref="defaultSwitchConnConfig"/> </bean>
<!-- 定义了defaultSwitchConnProvider作为一个service --> <service ref="defaultSwitchConnProvider" interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider" odl:type="openflow-switch-connection-provider-default-impl"/>
<!-- Create OF switch connection provider on port 6633 (legacy) --> <odl:clustered-app-config id="legacySwitchConnConfig" default-config-file-name="legacy-openflow-connection-config.xml" binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow._switch.connection.config.rev160506.SwitchConnectionConfig" list-key-value="openflow-switch-connection-provider-legacy-impl"> </odl:clustered-app-config>
<!-- 调用引用service SwitchConnectionProviderFactoryImpl的 newInstance() --> <bean id="legacySwitchConnProvider" factory-ref="switchConnectionProviderFactory" factory-method="newInstance"> <argument ref="legacySwitchConnConfig"/> </bean>
<!-- 定义了legacySwitchConnProvider作为一个service --> <service ref="legacySwitchConnProvider" interface="org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider" odl:type="openflow-switch-connection-provider-legacy-impl"/>
特别说明:
1)上面分别读取了配置文件,创建了两个switchConnectionProvider:defaultSwitchConnProvider及legacySwitchConnConfig
2)两个provider分别会监听6653、6633端口,分别是支持openflow1.3、openflow1.0
3)在blueprint中,将defaultSwitchConnProvider及legacySwitchConnProvider作为服务注入到OSGI,供后续其他bundle引用
调用switchConnectionProviderFactory的newInstance()
方法,实际就是new SwitchConnectionProviderImpl()
。过程会创建serializationFactory、deserializationFactory:
说明:SwitchConnectionProviderImpl
对象,为switch连上控制器提供端口监听tcp/udp 6633/6653。
至此,SwitchConnectionProviderImpl对象已经被创建,且创建了serializationFactory、deserializationFactory。但是,还没有看到启动服务监听端口的调用,后续我们会看到其他bundle引用SwitchConnectionProviderImpl,然后启动服务监听端口。
而上述“其他bundle”正是openflowPlugin的bundle,在下面展开。
SwitchConnectionProviderImpl除了OpenflowPlugin中会引用,在openflow项目下的onf-extension及niciria-extension会对其进行扩展register Serializer/Deserializer.
3.创建OpenFlowPluginProviderFactoryImpl作为service注入OSGI
在openflowplugin-impl/src/main/resources/org/opendaylight/blueprint/openflowplugin-impl.xml
中,实例化OpenFlowPluginProviderFactoryImpl.java
作为service注入OSGI
OpenFlowPluginProviderFactoryImpl作用是提供newInstance()
工厂方法,创建OpenFlowPluginProviderImpl
对象:
newInstance()
方法创建OpenFlowPluginProviderImpl对象,并调用其initialize()
方法
final OpenFlowPluginProvider openflowPluginProvider = new OpenFlowPluginProviderImpl( configurationService, switchConnectionProviders, dataBroker, rpcRegistry, notificationPublishService, singletonServiceProvider, entityOwnershipService, mastershipChangeServiceManager, ofPluginDiagstatusProvider, systemReadyMonitor);
openflowPluginProvider.initialize(); return openflowPluginProvider;
4.创建OpenFlowPluginProvider对象作为service注入OSGI
在openflowplugin-blueprint-config/src/main/resources/org/opendaylight/blueprint/openflowplugin.xml
中,引用reference
上描述在OGSI中作为serviceOpenFlowPluginProviderFactoryImpl,并调用其工厂方法newInstance()
;并将其创建的openflowPluginProvider作为service注入到OSGI中。
<reference id="openflowPluginProviderFactory" interface="org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProviderFactory"/>
<bean id="openflowPluginProvider" factory-ref="openflowPluginProviderFactory" factory-method="newInstance" destroy-method="close"> <argument ref="configurationService"/> <argument ref="dataBroker"/> <argument ref="rpcRegistry"/> <argument ref="notificationPublishService"/> <argument ref="entityOwnershipService"/> <argument> <list> <ref component-id="defaultSwitchConnProvider"/> <ref component-id="legacySwitchConnProvider"/> </list> </argument> <argument ref="clusterSingletonServiceProvider"/> <argument ref="mastershipChangeServiceManager"/> <argument ref="ofPluginDiagstatusProvider"/> <argument ref="systemReadyMonitor"/> </bean>
<!-- openflowpluginProvider作为service --> <service ref="openflowPluginProvider" odl:type="openflow-plugin-provider-impl"> <interfaces> <value>org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider</value> <value>org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider</value> </interfaces> </service>
OpenflowPluginProviderImpl
的newInstance()
方法调用:
- 注册MXBean
- 创建线程池对象
- 创建DeviceManagerImpl
- 创建RpcManagerImpl
- 创建StatisticsManagerImpl
- 创建RoleManagerImpl
- 创建ContextChainHolderImpl
- 创建ConnectionManagerImpl
- 调用DeviceManagerImpl.initialize()
@Override public void initialize() { // 注册MXBean监听 registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
// TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters // TODO: rewrite later! OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
// Creates a thread pool that creates new threads as needed, but will reuse previously // constructed threads when they are available. // Threads that have not been used for x seconds are terminated and removed from the cache. // 创建线程池 executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor( config.getThreadPoolMinThreads(), config.getThreadPoolMaxThreads().getValue(), config.getThreadPoolTimeout(), TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
// 创建Device manager deviceManager = new DeviceManagerImpl( config, dataBroker, getMessageIntelligenceAgency(), // MessageSpy: MessageIntelligenceAgencyImpl 消息监听Impl notificationPublishService, hashedWheelTimer, convertorManager, deviceInitializerProvider);
TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager); ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
rpcManager = new RpcManagerImpl( config, rpcProviderRegistry, extensionConverterManager, convertorManager, notificationPublishService);
statisticsManager = new StatisticsManagerImpl( config, rpcProviderRegistry, convertorManager, executorService);
roleManager = new RoleManagerImpl(hashedWheelTimer, config);
// 创建context chain holder封装所有服务 contextChainHolder = new ContextChainHolderImpl( executorService, singletonServicesProvider, entityOwnershipService, mastershipChangeServiceManager);
contextChainHolder.addManager(deviceManager); contextChainHolder.addManager(statisticsManager); contextChainHolder.addManager(rpcManager); contextChainHolder.addManager(roleManager);
// 创建connection Manger // 传入contextChainHolder connectionManager = new ConnectionManagerImpl(config, executorService); connectionManager.setDeviceConnectedHandler(contextChainHolder); connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
// 给device manager设置contextChain deviceManager.setContextChainHolder(contextChainHolder); // 效果是给deviceManage中的消息监听器MessageIntelligenceAgencyImpl调度线程运行 deviceManager.initialize(); }
OpenflowPluginProvider初始化过程
1.创建了四个managerDeviceManagerImpl
,RpcManagerImpl
,StatisticsManagerImpl
,RoleManagerImpl
,用于当有switch连上控制器时用于为connection处理。
2.创建ContextChainHolderImpl对象,将四个manager addManager()
进去。而ContextChainHolderImpl会在switch连上控制器时,为每个connection创建ContextChainImpl
。
3.创建ConnectionManagerImpl,用于底层openflowjava到openflowplugin的关键桥梁对象,当switch连上控制器,会触发ConnectionManagerImpl.onSwitchConnected
方法。用于manager connection,后面会详细阐述。
4.调用DeviceManagerImpl.initialize(),创建线程监听消息,比如统计,具体处理在MessageIntelligenceAgencyImpl.java
。
注意,OpenflowPlugin0.6.2与0.5.4-SNATSHOP版本有一个最大的区别,OpenflowPluginProvider.startSwitchConnections方法被调用方式。
在0.5.4-SNATSHOP中,此方法是在initialize()方法最后会被调用。而0.6.2 OpenflowPluginProvider实现了接口SystemReadyListener
,用于当系统ready时候可以做某些操作,而在OpenflowPluginProvider 0.6.2的处理是调用startSwitchConnections
方法:
@Override public void onSystemBootReady() { LOG.debug("onSystemBootReady() received, starting the switch connections"); startSwitchConnections(); }
private void startSwitchConnections() { Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> { // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava if (config.isUseSingleLayerSerialization()) { SerializerInjector.injectSerializers(switchConnectionProvider, switchConnectionProvider.getConfiguration().isGroupAddModEnabled()); DeserializerInjector.injectDeserializers(switchConnectionProvider); } else { DeserializerInjector.revertDeserializers(switchConnectionProvider); }
// 给switch connection provider设置ConnectionManagerImpl() // switch connection provider是在openflowjava中的openflowjava-blueprint-config的openflowjava.xml blueprint创建的 // Set handler of incoming connections and start switch connection provider switchConnectionProvider.setSwitchConnectionHandler(connectionManager); /* 调用startup, 效果: 创建tcp/udp的server监听6633、6653端口 当有sw连接时,调用TcpChannelInitializer的initChannel()方法,会调用调用openflowplugin的ConnectionManagerImpl onSwitchConnected()方法 */ return switchConnectionProvider.startup(); }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() { @Override public void onSuccess(@Nonnull final List<Boolean> result) { LOG.info("All switchConnectionProviders are up and running ({}).", result.size()); openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL, "switch connections started"); }
@Override public void onFailure(@Nonnull final Throwable throwable) { LOG.warn("Some switchConnectionProviders failed to start.", throwable); openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, "some switch connections failed to start"); } }, MoreExecutors.directExecutor()); }
可以看到startSwitchConnections
方法,最重要的逻辑是:
其目的是,将OpenflowPluginProvider中创建的connectionManager传入到上面提及Openflowjava中的SwitchConnectionProviderImpl对象,并调用其startup方法。
可以注意到这里SwitchConnectionProviderImpl是在openflowjava的blueprint中创建的,而实际调用其startup方法是在OpenflowPlugin中,这里很能体现OSGI模块化的思想。
接下来可以展开SwitchConnectionProviderImpl.startup()
方法:其最终作用是创建tcp/udp的server监听6633、6653端口,当switch连上端口(控制器)时,调用connectionManager的onSwitchConnected方法,进而进入接下来OpenflowPlugin的处理(比如Handshake,创建各个context,mastership选举等)。
以TCP为例,展开。startup方法的核心是serverFacade = createAndConfigureServer();
:
private ServerFacade createAndConfigureServer() { LOG.debug("Configuring .."); ServerFacade server = null; // 创建channel factory并配置 final ChannelInitializerFactory factory = new ChannelInitializerFactory(); // 记住switchConnectionHandler是OpenflowPlugin中传入的ConnectionManagerImpl factory.setSwitchConnectionHandler(switchConnectionHandler); factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout()); factory.setTlsConfig(connConfig.getTlsConfiguration()); factory.setSerializationFactory(serializationFactory); factory.setDeserializationFactory(deserializationFactory); factory.setUseBarrier(connConfig.useBarrier()); factory.setChannelOutboundQueueSize(connConfig.getChannelOutboundQueueSize()); final TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
// Check if Epoll native transport is available. // TODO : Add option to disable Epoll. boolean isEpollEnabled = Epoll.isAvailable();
if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) { // 创建TCP handler用于监听地址及端口 server = new TcpHandler(connConfig.getAddress(), connConfig.getPort()); // 创建TCP publishing channel initializer: 这里创建的initializer是有ConnectionManagerImpl的的引用 final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer(); ((TcpHandler) server).setChannelInitializer(channelInitializer); // 会创建底层netty相关的channel对象 ((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
// 获取上一步创建的netty底层channel相关对象:EventLoopGroup final EventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup(); // 创建TcpConnectionInitializer connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, isEpollEnabled); // 设置上面的channelInitializer(带有ConnectionManagerImpl引用) connectionInitializer.setChannelInitializer(channelInitializer); /* 会调用TcpHandler run()方法, 最终效果会监听tcp端口 TcpHandler run()中设置了再调用到TcpChannelInitializer的initChannel()方法; 当远程sw连上就会调用initChannel(),其会调用openflowplugin的ConnectionManagerImpl */ connectionInitializer.run(); } else if (TransportProtocol.UDP.equals(transportProtocol)) { server = new UdpHandler(connConfig.getAddress(), connConfig.getPort()); ((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled); ((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer()); } else { throw new IllegalStateException("Unknown transport protocol received: " + transportProtocol); } server.setThreadConfig(connConfig.getThreadConfiguration()); return server; }
在这个方法内,几个关键地方,可以说明Openflowjava中有switch连上后,触发ConnectionManagerImpl的onSwitchConnected方法的调用:
1.switchConnectionHandler
是在OpenflowPlugin创建的ConnectionManagerImpl
,ChannelInitializerFactory
对象创建后是通过setSwitchConnectionHandler(switchConnectionHandler)
方法设置了ConnectionManagerImpl
对象;
2.通过ChannelInitializerFactory factory
对象创建的TcpChannelInitializer
对象是有ConnectionManagerImpl
对象,并set到tcp servier对象。
3.用于初始化tcp connection的对象TcpConnectionInitializer
中,是有传入(setChannelInitializer)TcpChannelInitializer channelInitializer
(TcpChannelInitializer带有ConnectionManagerImpl
对象引用)
然后回到startup方法,new Thread(serverFacade).start();
将tcp server(TcpHandler对象)运行。
在TcpHandler.run
方法中,.childHandler(channelInitializer)
调用上述创建的TcpChannelInitializer对象(带有ConnectionManagerImpl
对象引用)。
@Override public void run() { /* * We generally do not perform IO-unrelated tasks, so we want to have * all outstanding tasks completed before the executing thread goes * back into select. * * Any other setting means netty will measure the time it spent selecting * and spend roughly proportional time executing tasks. */ //workerGroup.setIoRatio(100);
final ChannelFuture f; try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(socketChannelClass) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(channelInitializer) // 这里应该调用的是TcpChannelInitializer的initChannel方法 .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY , true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK)) .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
if (startupAddress != null) { f = bootstrap.bind(startupAddress.getHostAddress(), port).sync(); } else { f = bootstrap.bind(port).sync(); } } catch (InterruptedException e) { LOG.error("Interrupted while binding port {}", port, e); return; }
对netty底层的调用不熟悉,大胆猜测。当有switch通过tcp connect到6633/6653监听端口(TcpHandler),就会调用TcpChannelInitializer对象的initChannel
方法。在initChannel方法中可以看到调用ConnectionManagerImpl的onSwitchConnected方法:
下篇笔记再讲述switch连上控制器后,怎么进行握手handshake等动作。
总结
至此,可以完整看Openflowjava/Openflowplugin启动流程,ODL是如何监听openflow端口,并且当switch连上控制器后如何开始调用OpenflowPlugin中的方法。openflowplugin项目充分利用了OSGI的特性,各个“bundle”通过OSGI实现调用。
本文,只描述Openflowjava和OpenflowPlugin启动过程,而onSwitchConnected方法的后续调用,下篇笔记展开。
关键类/对象
个人理解整理:
类/对象
作用/描述
SwitchConnectionProviderImpl
提供端口监听, 底层device连上通知OpenflowPlugin进行下一步处理
TcpChannelInitializer
当switch连上,就会调用其为每个TCP连接initializer channel
ConnectionManagerImpl
负责connection,从switch连上控制器底层openfowjava,往上调用OpenflowPlugin
OpenflowPluginProvider
提供openflowPlugin的service(handshake/device/rpc/statistics/role)
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK