![](/style/images/good.png)
![](/style/images/bad.png)
ODL Openflowplugin Switch连上控制器Handshake过程源码分析
source link: https://www.sdnlab.com/21257.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发。
本文为Openflowplugin源码分析第二篇,紧跟第一篇:(一)ODL OpenflowPlugin启动流程源码分析
补充说明:Openflowplugin版本0.6.2读者约定:基本掌握Opendaylight的思想/有一定实践经验,想要深入理解openflowplugin源码/想对openflowplugin源码修改。
Handshake过程
首先回顾第一篇笔记,在Openflowplugin启动过程中,SwitchConnectionProviderImpl.startup
会启动tcp server监听端口。而tcp server是基于netty实现,在TcpHandler.java
会创建Bootstrap/EventLoopGroup等,同样会设置channelInitialize。
try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(socketChannelClass) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(channelInitializer) // 这里应该调用的是TcpChannelInitializer的initChannel方法 .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY , true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK)) .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
if (startupAddress != null) { f = bootstrap.bind(startupAddress.getHostAddress(), port).sync(); } else { f = bootstrap.bind(port).sync(); }
当switch底层连上控制器tcp server监听的端口6633/6653,netty在接受channel后,会调用channelInitialize的initChannel方法,即TcpChannelInitializer.initChannel
。
关于netty,可以参考《netty in action》,推荐阅读。
1.初始化Channel
当switch通过tcp连接上控制器,会触发TcpChannelInitializer.initChannel
方法初始化channel。
@Override @SuppressWarnings("checkstyle:IllegalCatch") protected void initChannel(final SocketChannel ch) { if (ch.remoteAddress() != null) { final InetAddress switchAddress = ch.remoteAddress().getAddress(); final int port = ch.localAddress().getPort(); final int remotePort = ch.remoteAddress().getPort(); LOG.debug("Incoming connection from (remote address): {}:{} --> :{}", switchAddress.toString(), remotePort, port);
if (!getSwitchConnectionHandler().accept(switchAddress)) { ch.disconnect(); LOG.debug("Incoming connection rejected"); return; } } LOG.debug("Incoming connection accepted - building pipeline"); allChannels.add(ch); ConnectionFacade connectionFacade = null; connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null, useBarrier(), getChannelOutboundQueueSize()); try { LOG.debug("Calling OF plugin: {}", getSwitchConnectionHandler()); // 当channel建立,调用ConnectionManageImpl的onSwitchConnected方法 getSwitchConnectionHandler().onSwitchConnected(connectionFacade); // 检查上一步设置的3个listener是否存在 connectionFacade.checkListeners(); ch.pipeline().addLast(PipelineHandlers.IDLE_HANDLER.name(), new IdleHandler(getSwitchIdleTimeout(), TimeUnit.MILLISECONDS)); boolean tlsPresent = false;
// If this channel is configured to support SSL it will only support SSL if (getTlsConfiguration() != null) { tlsPresent = true; final SslContextFactory sslFactory = new SslContextFactory(getTlsConfiguration()); final SSLEngine engine = sslFactory.getServerContext().createSSLEngine(); engine.setNeedClientAuth(true); engine.setUseClientMode(false); List<String> suitesList = getTlsConfiguration().getCipherSuites(); if (suitesList != null && !suitesList.isEmpty()) { LOG.debug("Requested Cipher Suites are: {}", suitesList); String[] suites = suitesList.toArray(new String[suitesList.size()]); engine.setEnabledCipherSuites(suites); LOG.debug("Cipher suites enabled in SSLEngine are: {}", Arrays.toString(engine.getEnabledCipherSuites())); } final SslHandler ssl = new SslHandler(engine); final Future<Channel> handshakeFuture = ssl.handshakeFuture(); final ConnectionFacade finalConnectionFacade = connectionFacade; // 会调用ConnectionReadyListenerImpl.onConnectionReady()发起handshake handshakeFuture.addListener(future -> finalConnectionFacade.fireConnectionReadyNotification()); ch.pipeline().addLast(PipelineHandlers.SSL_HANDLER.name(), ssl); } // Decodes incoming messages into message frames. ch.pipeline().addLast(PipelineHandlers.OF_FRAME_DECODER.name(), new OFFrameDecoder(connectionFacade, tlsPresent)); ch.pipeline().addLast(PipelineHandlers.OF_VERSION_DETECTOR.name(), new OFVersionDetector()); final OFDecoder ofDecoder = new OFDecoder(); ofDecoder.setDeserializationFactory(getDeserializationFactory()); ch.pipeline().addLast(PipelineHandlers.OF_DECODER.name(), ofDecoder); final OFEncoder ofEncoder = new OFEncoder(); ofEncoder.setSerializationFactory(getSerializationFactory()); ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(), ofEncoder); // Delegates translated POJOs into MessageConsumer. ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade)); if (!tlsPresent) { // 如果没有配置tls加密 // 会调用ConnectionReadyListenerImpl.onConnectionReady()发起handshake connectionFacade.fireConnectionReadyNotification(); } } catch (RuntimeException e) { LOG.warn("Failed to initialize channel", e); ch.close(); } }
在initchannel方法中主要逻辑:
1.创建ConnectionAdapterImpl
对象,封装SocketChannel
channel对象。
会为每个connection(switch)创建一个ConnectionAdapterImpl
对象,此对象是封装底层switch的关键对象,上层通过此对象与switch通信。从变量名Facade也能推敲出此对象的作用。
2.调用ConnectionManagerImpl.onSwitchConnected
方法,传参传入的是ConnectionAdapterImpl
对象
而在ConnectionManagerImpl.onSwitchConnected
的处理是给ConnectionAdapterImpl
对象设置3个listener,用于处理底层各个事件。
- 1.创建
ConnectionReadyListenerImpl
对象给ConnectionAdapterImpl
对象传入引用(setConnectionReadyListener
);ConnectionReadyListenerImpl
对象封装ConnectionContextImpl
和HandshakeContextImpl
;ConnectionReadyListenerImpl
对象提供onConnectionReady()
方法,该方法处理是调用HandshakeManagerImpl.shake()
;
- 2.创建
OpenflowProtocolListenerInitialImpl
对象,给ConnectionAdapterImpl
对象传入引用(setMessageListener
);OpenflowProtocolListenerInitialImpl
对象用于处理底层switch发给控制器的消息,比如提供onHelloMessage
方法。- 注意:该对象仅用于处理handshake过程中涉及的基本消息,在handshake后会被另一对象
OpenflowProtocolListenerFullImpl
替换。
- 3.创建
SystemNotificationsListenerImpl
对象,给ConnectionAdapterImpl
对象传入引用(setSystemListener
SystemNotificationsListenerImpl
对象用于处理SwitchIdleEvent和DisconnectEvent事件。提供onSwitchIdleEvent()
方法, 当swich idle发送echo心跳消息;提供onDisconnectEvent
方法处理disconnectJava@Override public void onSwitchConnected(final ConnectionAdapter connectionAdapter) { LOG.trace("prepare connection context"); // connectionAdapter是:ConnectionAdapterImpl: 包含基本的socket channel信息
// 创建ConnectionContext final ConnectionContext connectionContext = new ConnectionContextImpl(connectionAdapter); connectionContext.setDeviceDisconnectedHandler(this.deviceDisconnectedHandler);
// 创建HandshakeListenerImpl, 当handshake成功,会调用 deviceConnectedHandler.deviceConnected() // deviceConnectedHandlerContext是在OpenflowPluginProvider中设置的ContextChainHolderImpl HandshakeListener handshakeListener = new HandshakeListenerImpl(connectionContext, deviceConnectedHandler);
// 创建HandshakeManagerImpl: 提供shake()等方法实现握手 final HandshakeManager handshakeManager = createHandshakeManager(connectionAdapter, handshakeListener);
LOG.trace("prepare handshake context"); // manager传入context HandshakeContext handshakeContext = new HandshakeContextImpl(executorService, handshakeManager); // 设置handshakeContext,用于当handshake失败时调用close()方法 handshakeListener.setHandshakeContext(handshakeContext); connectionContext.setHandshakeContext(handshakeContext);
// 至此connectionContext封装了 connectionAdapter, handshakeContext
LOG.trace("prepare connection listeners"); /* 传入handshakeContext, 提供onConnectionReady()方法. onConnectionReady()效果: 1.会修改 connectionContext的状态为HANDSHAKING 2.创建 HandshakeStepWrapper(), 最终调用HandshakeManagerImpl.shake() */ final ConnectionReadyListener connectionReadyListener = new ConnectionReadyListenerImpl( connectionContext, handshakeContext); // 给底层TcpCannelInitializer传递上来的connectionAdapter 设置connectionReadyListener connectionAdapter.setConnectionReadyListener(connectionReadyListener);
// 给底层TcpCannelInitializer传递上来的connectionAdapter 设置ofMessageListener(监听openflow消息处理) final OpenflowProtocolListener ofMessageListener = new OpenflowProtocolListenerInitialImpl(connectionContext, handshakeContext); connectionAdapter.setMessageListener(ofMessageListener);
// 提供onSwitchIdleEvent()方法, 当swich idle发送echo心跳消息 final SystemNotificationsListener systemListener = new SystemNotificationsListenerImpl( connectionContext, config.getEchoReplyTimeout().getValue(), executorService); // 给底层TcpCannelInitializer传递上来的connectionAdapter 设置systemListener connectionAdapter.setSystemListener(systemListener);
LOG.trace("connection ballet finished"); }
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849@OverridepublicvoidonSwitchConnected(finalConnectionAdapter connectionAdapter){LOG.trace("prepare connection context");// connectionAdapter是:ConnectionAdapterImpl: 包含基本的socket channel信息// 创建ConnectionContextfinalConnectionContext connectionContext=newConnectionContextImpl(connectionAdapter);connectionContext.setDeviceDisconnectedHandler(this.deviceDisconnectedHandler);// 创建HandshakeListenerImpl, 当handshake成功,会调用 deviceConnectedHandler.deviceConnected()// deviceConnectedHandlerContext是在OpenflowPluginProvider中设置的ContextChainHolderImplHandshakeListener handshakeListener=newHandshakeListenerImpl(connectionContext,deviceConnectedHandler);// 创建HandshakeManagerImpl: 提供shake()等方法实现握手finalHandshakeManager handshakeManager=createHandshakeManager(connectionAdapter,handshakeListener);LOG.trace("prepare handshake context");// manager传入contextHandshakeContext handshakeContext=newHandshakeContextImpl(executorService,handshakeManager);// 设置handshakeContext,用于当handshake失败时调用close()方法handshakeListener.setHandshakeContext(handshakeContext);connectionContext.setHandshakeContext(handshakeContext);// 至此connectionContext封装了 connectionAdapter, handshakeContextLOG.trace("prepare connection listeners");/*传入handshakeContext, 提供onConnectionReady()方法. onConnectionReady()效果:1.会修改 connectionContext的状态为HANDSHAKING2.创建 HandshakeStepWrapper(), 最终调用HandshakeManagerImpl.shake()*/finalConnectionReadyListener connectionReadyListener=newConnectionReadyListenerImpl(connectionContext,handshakeContext);// 给底层TcpCannelInitializer传递上来的connectionAdapter 设置connectionReadyListenerconnectionAdapter.setConnectionReadyListener(connectionReadyListener);// 给底层TcpCannelInitializer传递上来的connectionAdapter 设置ofMessageListener(监听openflow消息处理)finalOpenflowProtocolListener ofMessageListener=newOpenflowProtocolListenerInitialImpl(connectionContext,handshakeContext);connectionAdapter.setMessageListener(ofMessageListener);// 提供onSwitchIdleEvent()方法, 当swich idle发送echo心跳消息finalSystemNotificationsListener systemListener=newSystemNotificationsListenerImpl(connectionContext,config.getEchoReplyTimeout().getValue(),executorService);// 给底层TcpCannelInitializer传递上来的connectionAdapter 设置systemListenerconnectionAdapter.setSystemListener(systemListener);LOG.trace("connection ballet finished");}
3.给channel.pipeline设置ChannelHandler
会给channel的pipeline对象传入ChannelHandler对象,用于处理channel idle/inactive、处理openflow消息编码解码等。
pipeline是netty针对数据流处理的设计,具体参考《netty in action》
4.调用ConnectionAdapterImpl.fireConnectionReadyNotification()
方法发起handshake
在TcpChannelInitializer.initChannel
方法中,可以看到无论是否开启tls,最终都会调用ConnectionAdapterImpl.fireConnectionReadyNotification()
方法:
1.开tls
2.没开tls
而上面两个代码片段的connectionFacade变量正是ConnectionAdapterImpl
对象。其fireConnectionReadyNotification()
方法如下:
@Override public void fireConnectionReadyNotification() { versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name()); Preconditions.checkState(versionDetector != null);
new Thread(() -> connectionReadyListener.onConnectionReady()).start(); }
可以看到fireConnectionReadyNotification()
方法实际是调用connectionReadyListener.onConnectionReady()
,而connectionReadyListener
变量正是上面第二步中调用setConnectionReadyListener
传入的ConnectionReadyListenerImpl
对象。
即分配新的线程执行ConnectionReadyListenerImpl.onConnectionReady()
,而onConnectionReady()
方法会触发handshake,在下面开展。
总结,可以看到在Tcp channel初始化时(TcpChannelInitializer.initChannel
),会:
- 创建
ConnectionAdapterImpl
对象,封装传入的SocketChannel channel
对象; - 调用
ConnectionManagerImpl.onSwitchConnected
方法,给ConnectionAdapterImpl
对象setConnectionReadyListener
,setMessageListener
,setSystemListener
; - 给pipeline设置各种channelHandler
- 调用
ConnectionAdapterImpl.fireConnectionReadyNotification()
发起handshake;
2.ConnectionReady开始Handshake
在TcpChannelInitializer.initChannel
最后,调用ConnectionReadyListenerImpl.onConnectionReady()
如下:
@Override @SuppressWarnings("checkstyle:IllegalCatch") public void onConnectionReady() { if (LOG.isDebugEnabled()) { LOG.debug("device is connected and ready-to-use (pipeline prepared): {}", connectionContext.getConnectionAdapter().getRemoteAddress()); }
if (connectionContext.getConnectionState() == null) { synchronized (connectionContext) { if (connectionContext.getConnectionState() == null) { connectionContext.changeStateToHandshaking(); HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( null, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter()); // 调用线程运行 final Future<?> handshakeResult = handshakeContext.getHandshakePool().submit(handshakeStepWrapper);
try { // As we run not in netty thread, // need to remain in sync lock until initial handshake step processed. handshakeResult.get(); } catch (Exception e) { LOG.error("failed to process onConnectionReady event on device {}, reason {}", connectionContext.getConnectionAdapter().getRemoteAddress(), e); connectionContext.closeConnection(false); handshakeContext.close(); } } else { LOG.debug("already touched by hello message from device {} after second check", connectionContext.getConnectionAdapter().getRemoteAddress()); } } } else { LOG.debug("already touched by hello message from device {} after first check", connectionContext.getConnectionAdapter().getRemoteAddress()); } }
onConnectionReady()
方法主要逻辑:
connectionContext
状态设置为HANDSHAKING创建
HandshakeStepWrapper
对象,分配线程运行:实际上是运行HandshakeManagerImpl
对象的shake
方法(在ConnectionManagerImpl
中创建的)Java@Override public void run() { if (connectionAdapter.isAlive()) { handshakeManager.shake(helloMessage); } else { LOG.debug("connection is down - skipping handshake step"); } }12345678@Overridepublicvoidrun(){if(connectionAdapter.isAlive()){handshakeManager.shake(helloMessage);}else{LOG.debug("connection is down - skipping handshake step");}}
3.控制器主动发送Hello消息
HandshakeManagerImpl.shake
,注意此时调用shake方法时,传入的receivedHello
为null,所以会调用sendHelloMessage(highestVersion, getNextXid())
@Override @SuppressWarnings("checkstyle:IllegalCatch") public synchronized void shake(HelloMessage receivedHello) {
if (version != null) { // Some switches respond with a second HELLO acknowledging our HELLO // but we've already completed the handshake based on the negotiated // version and have registered this switch. LOG.debug("Hello recieved after handshake already settled ... ignoring."); return; }
LOG.trace("handshake STARTED"); setActiveXid(ACTIVE_XID);
try { if (receivedHello == null) { // first Hello sending sendHelloMessage(highestVersion, getNextXid()); lastProposedVersion = highestVersion; LOG.trace("ret - firstHello+wait"); return; }
// process the 2. and later hellos Short remoteVersion = receivedHello.getVersion(); List<Elements> elements = receivedHello.getElements(); setActiveXid(receivedHello.getXid()); List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements); LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(), remoteVersionBitmap);
if (useVersionBitmap && remoteVersionBitmap != null) { // versionBitmap on both sides -> ONE STEP DECISION handleVersionBitmapNegotiation(elements); } else { // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying handleStepByStepVersionNegotiation(remoteVersion); } } catch (Exception ex) { errorHandler.handleException(ex); LOG.trace("ret - shake fail - closing"); handshakeListener.onHandshakeFailure(); }
sendHelloMessage
方法如下,实际是调用ConnectionAdapterImpl
对象的hello
方法。最终控制器发送hello消息给switch,进行协商openflow版本。
这里就可以看出,控制器与底层switch通信靠
ConnectionAdapterImpl
对象封装Javaprivate ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
final SettableFuture<Void> resultFtr = SettableFuture.create();
LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements()));
// 发送hello消息 Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult); Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() { @Override public void onSuccess(@Nonnull RpcResult<Void> result) { if (result.isSuccessful()) { LOG.debug("hello successfully sent, xid={}, addr={}", helloXid, connectionAdapter.getRemoteAddress()); resultFtr.set(null); } else { for (RpcError error : result.getErrors()) { LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, error.getInfo(), error.getSeverity(), error.getMessage(), connectionAdapter.getRemoteAddress()); if (error.getCause() != null) { LOG.trace("DETAIL of sending hello failure", error.getCause()); } } resultFtr.cancel(false); handshakeListener.onHandshakeFailure(); } }
@Override public void onFailure(Throwable throwable) { LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid, connectionAdapter.getRemoteAddress(), throwable.getMessage()); LOG.trace("DETAIL of sending of hello failure:", throwable); resultFtr.cancel(false); handshakeListener.onHandshakeFailure(); } }, MoreExecutors.directExecutor()); LOG.trace("sending hello message [{}] - result hooked ..", helloXid); return resultFtr; }
12345678910111213141516171819202122232425262728293031323334353637383940414243444546privateListenableFuture<Void>sendHelloMessage(ShorthelloVersion,finalLonghelloXid)throwsException{HelloInput helloInput=MessageFactory.createHelloInput(helloVersion,helloXid,versionOrder);finalSettableFuture<Void>resultFtr=SettableFuture.create();LOG.debug("sending hello message: version{}, xid={}, version bitmap={}",helloVersion,helloXid,MessageFactory.digVersions(helloInput.getElements()));// 发送hello消息Future<RpcResult<Void>>helloResult=connectionAdapter.hello(helloInput);ListenableFuture<RpcResult<Void>>rpcResultListenableFuture=JdkFutureAdapters.listenInPoolThread(helloResult);Futures.addCallback(rpcResultListenableFuture,newFutureCallback<RpcResult<Void>>(){@OverridepublicvoidonSuccess(@NonnullRpcResult<Void>result){if(result.isSuccessful()){LOG.debug("hello successfully sent, xid={}, addr={}",helloXid,connectionAdapter.getRemoteAddress());resultFtr.set(null);}else{for(RpcError error:result.getErrors()){LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}",helloXid,error.getInfo(),error.getSeverity(),error.getMessage(),connectionAdapter.getRemoteAddress());if(error.getCause()!=null){LOG.trace("DETAIL of sending hello failure",error.getCause());}}resultFtr.cancel(false);handshakeListener.onHandshakeFailure();}}@OverridepublicvoidonFailure(Throwable throwable){LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}",helloXid,connectionAdapter.getRemoteAddress(),throwable.getMessage());LOG.trace("DETAIL of sending of hello failure:",throwable);resultFtr.cancel(false);handshakeListener.onHandshakeFailure();}},MoreExecutors.directExecutor());LOG.trace("sending hello message [{}] - result hooked ..",helloXid);returnresultFtr;}4.控制器处理Switch回复的Hello消息
在上述步骤,控制器主动会发送hello包到switch,然后switch也会回复数据包给控制器。下面展开探讨控制器是如何处理Switch回复。
首先回到
TcpChannelInitializer.initChannel
,给ConnectionAdapterImpl
对象设置了DelegatingInboundHandler
Java// Delegates translated POJOs into MessageConsumer. ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade));123// Delegates translated POJOs into MessageConsumer.ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),newDelegatingInboundHandler(connectionFacade));根据netty pipeline的数据流处理模型,当收到switch发送的消息,会调用
DelegatingInboundHandler
处理。会调用DelegatingInboundHandler.channelRead
方法。而
DelegatingInboundHandler
的channelRead
方法调用的是ConnectionAdapterImpl
对象的consume
方法。Java@Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { consumer.consume((DataObject) msg); }1234@OverridepublicvoidchannelRead(finalChannelHandlerContext ctx,finalObjectmsg){consumer.consume((DataObject)msg);}最终就会调用到
ConnectionAdapterImpl.consumeDeviceMessage
方法:Java@Override public void consumeDeviceMessage(final DataObject message) { LOG.debug("ConsumeIntern msg on {}", channel); if (disconnectOccured) { return; } if (message instanceof Notification) {
// System events if (message instanceof DisconnectEvent) { systemListener.onDisconnectEvent((DisconnectEvent) message); responseCache.invalidateAll(); disconnectOccured = true; } else if (message instanceof SwitchIdleEvent) { systemListener.onSwitchIdleEvent((SwitchIdleEvent) message); // OpenFlow messages } else if (message instanceof EchoRequestMessage) { if (outputManager != null) { outputManager.onEchoRequest((EchoRequestMessage) message); } else { messageListener.onEchoRequestMessage((EchoRequestMessage) message); } } else if (message instanceof ErrorMessage) { // Send only unmatched errors if (outputManager == null || !outputManager.onMessage((OfHeader) message)) { messageListener.onErrorMessage((ErrorMessage) message); } } else if (message instanceof ExperimenterMessage) { if (outputManager != null) { outputManager.onMessage((OfHeader) message); } messageListener.onExperimenterMessage((ExperimenterMessage) message); } else if (message instanceof FlowRemovedMessage) { messageListener.onFlowRemovedMessage((FlowRemovedMessage) message); } else if (message instanceof HelloMessage) { LOG.info("Hello received"); messageListener.onHelloMessage((HelloMessage) message); } else if (message instanceof MultipartReplyMessage) { if (outputManager != null) { outputManager.onMessage((OfHeader) message); } messageListener.onMultipartReplyMessage((MultipartReplyMessage) message); } else if (message instanceof PacketInMessage) { messageListener.onPacketInMessage((PacketInMessage) message); } else if (message instanceof PortStatusMessage) { messageListener.onPortStatusMessage((PortStatusMessage) message); } else { LOG.warn("message listening not supported for type: {}", message.getClass()); } } else if (message instanceof OfHeader) { LOG.debug("OF header msg received");
if (alienMessageListener != null && alienMessageListener.onAlienMessage((OfHeader) message)) { LOG.debug("Alien message {} received", message.getImplementedInterface()); } else if (outputManager == null || !outputManager.onMessage((OfHeader) message) || message instanceof EchoOutput) { final RpcResponseKey key = createRpcResponseKey((OfHeader) message); final ResponseExpectedRpcListener<?> listener = findRpcResponse(key); if (listener != null) { LOG.debug("Corresponding rpcFuture found"); listener.completed((OfHeader) message); LOG.debug("After setting rpcFuture"); responseCache.invalidate(key); } } } else { LOG.warn("message listening not supported for type: {}", message.getClass()); } }
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869@OverridepublicvoidconsumeDeviceMessage(finalDataObject message){LOG.debug("ConsumeIntern msg on {}",channel);if(disconnectOccured){return;}if(message instanceofNotification){// System eventsif(message instanceofDisconnectEvent){systemListener.onDisconnectEvent((DisconnectEvent)message);responseCache.invalidateAll();disconnectOccured=true;}elseif(message instanceofSwitchIdleEvent){systemListener.onSwitchIdleEvent((SwitchIdleEvent)message);// OpenFlow messages}elseif(message instanceofEchoRequestMessage){if(outputManager!=null){outputManager.onEchoRequest((EchoRequestMessage)message);}else{messageListener.onEchoRequestMessage((EchoRequestMessage)message);}}elseif(message instanceofErrorMessage){// Send only unmatched errorsif(outputManager==null||!outputManager.onMessage((OfHeader)message)){messageListener.onErrorMessage((ErrorMessage)message);}}elseif(message instanceofExperimenterMessage){if(outputManager!=null){outputManager.onMessage((OfHeader)message);}messageListener.onExperimenterMessage((ExperimenterMessage)message);}elseif(message instanceofFlowRemovedMessage){messageListener.onFlowRemovedMessage((FlowRemovedMessage)message);}elseif(message instanceofHelloMessage){LOG.info("Hello received");messageListener.onHelloMessage((HelloMessage)message);}elseif(message instanceofMultipartReplyMessage){if(outputManager!=null){outputManager.onMessage((OfHeader)message);}messageListener.onMultipartReplyMessage((MultipartReplyMessage)message);}elseif(message instanceofPacketInMessage){messageListener.onPacketInMessage((PacketInMessage)message);}elseif(message instanceofPortStatusMessage){messageListener.onPortStatusMessage((PortStatusMessage)message);}else{LOG.warn("message listening not supported for type: {}",message.getClass());}}elseif(message instanceofOfHeader){LOG.debug("OF header msg received");if(alienMessageListener!=null&&alienMessageListener.onAlienMessage((OfHeader)message)){LOG.debug("Alien message {} received",message.getImplementedInterface());}elseif(outputManager==null||!outputManager.onMessage((OfHeader)message)||message instanceofEchoOutput){finalRpcResponseKey key=createRpcResponseKey((OfHeader)message);finalResponseExpectedRpcListener<?>listener=findRpcResponse(key);if(listener!=null){LOG.debug("Corresponding rpcFuture found");listener.completed((OfHeader)message);LOG.debug("After setting rpcFuture");responseCache.invalidate(key);}}}else{LOG.warn("message listening not supported for type: {}",message.getClass());}}以Handshake过程的Hello message为例,会调用
messageListener.onHelloMessage((HelloMessage) message);
,即调用OpenflowProtocolListenerInitialImpl.onHelloMessage
方法:-
回忆上述步骤:在
ConnectionManagerImpl.onSwitchConnected
方法中,会将OpenflowProtocolListenerInitialImpl
对象传入(setMessageListener
)Java@Override public void onHelloMessage(final HelloMessage hello) { LOG.debug("processing HELLO.xid: {} from device {}", hello.getXid(), connectionContext.getConnectionAdapter().getRemoteAddress()); final ConnectionContext.CONNECTION_STATE connectionState = connectionContext.getConnectionState(); if (connectionState == null || ConnectionContext.CONNECTION_STATE.HANDSHAKING.equals(connectionState)) { synchronized (connectionContext) { if (connectionContext.getConnectionState() == null) { // got here before connection ready notification connectionContext.changeStateToHandshaking(); }
if (checkState(ConnectionContext.CONNECTION_STATE.HANDSHAKING)) { final HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( hello, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter()); // use up netty thread handshakeStepWrapper.run(); } else { LOG.debug("already out of handshake phase but still received hello message from device {}", connectionContext.getConnectionAdapter().getRemoteAddress()); } } } else { //TODO: consider disconnecting of bad behaving device LOG.warn("Hello message received outside handshake phase:{} ", hello); LOG.debug("already touched by onConnectionReady event from device {} (or finished handshake)", connectionContext.getConnectionAdapter().getRemoteAddress()); } }
123456789101112131415161718192021222324252627282930@OverridepublicvoidonHelloMessage(finalHelloMessage hello){LOG.debug("processing HELLO.xid: {} from device {}",hello.getXid(),connectionContext.getConnectionAdapter().getRemoteAddress());finalConnectionContext.CONNECTION_STATE connectionState=connectionContext.getConnectionState();if(connectionState==null||ConnectionContext.CONNECTION_STATE.HANDSHAKING.equals(connectionState)){synchronized(connectionContext){if(connectionContext.getConnectionState()==null){// got here before connection ready notificationconnectionContext.changeStateToHandshaking();}if(checkState(ConnectionContext.CONNECTION_STATE.HANDSHAKING)){finalHandshakeStepWrapper handshakeStepWrapper=newHandshakeStepWrapper(hello,handshakeContext.getHandshakeManager(),connectionContext.getConnectionAdapter());// use up netty threadhandshakeStepWrapper.run();}else{LOG.debug("already out of handshake phase but still received hello message from device {}",connectionContext.getConnectionAdapter().getRemoteAddress());}}}else{//TODO: consider disconnecting of bad behaving deviceLOG.warn("Hello message received outside handshake phase:{} ",hello);LOG.debug("already touched by onConnectionReady event from device {} (or finished handshake)",connectionContext.getConnectionAdapter().getRemoteAddress());}}在
onHelloMessage
方法中,会查询connectionContext的状态为HANDSHAKING时,会再次分配线程运行HandshakeStepWrapper
,即再次调用HandshakeManagerImpl.shake
方法。
5.协商Openflow协议版本
在
HandshakeManagerImpl.shake
中,可以看到处理第二个或更后的hello包后续逻辑是根据switch的第一个hello返回是否带有openflow版本bit,而进行不同协商过程(handleVersionBitmapNegotiation
,handleStepByStepVersionNegotiation
)。而具体两种协商过程可以参考官方文档说明,在这里不展开。
两种协商过程,最终都会调用
HandshakeManagerImpl.postHandshake
方法。HandshakeManagerImpl.postHandshake
在控制器与switch通过协商确定openflow版本号后,会调用
postHandshake
方法。postHandshake
方法主要操作:-
调用
get-features
rpc,向switch请求获取features。这里也是通过调用ConnectionAdapterImpl对象(connectionAdapter.getFeatures
)-
features包括:datapathId,buffers,tables,auxiliaryId,capabilities,reserved,actions,phy-port等(参考
openflow-protocol.yang
)Javaprotected void postHandshake(final Short proposedVersion, final Long xid) { // set version version = proposedVersion;
LOG.debug("version set: {}", proposedVersion); // request features GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder(); featuresBuilder.setVersion(version).setXid(xid); LOG.debug("sending feature request for version={} and xid={}", version, xid); Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter.getFeatures(featuresBuilder.build());
Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture), new FutureCallback<RpcResult<GetFeaturesOutput>>() { @Override public void onSuccess(@Nonnull RpcResult<GetFeaturesOutput> rpcFeatures) { LOG.trace("features are back"); if (rpcFeatures.isSuccessful()) { GetFeaturesOutput featureOutput = rpcFeatures.getResult(); if (!deviceConnectionRateLimiter.tryAquire()) { LOG.debug("Permit not acquired for dpn {}, disconnecting the device.", featureOutput.getDatapathId()); connectionAdapter.disconnect(); return; }
LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId()); LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId()); LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}", version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId()); // 成功 handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion); } else { // handshake failed LOG.warn("issuing disconnect during handshake [{}]", connectionAdapter.getRemoteAddress()); for (RpcError rpcError : rpcFeatures.getErrors()) { LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid, rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(), rpcError.getCause()); } handshakeListener.onHandshakeFailure(); }
LOG.debug("postHandshake DONE"); }
@Override public void onFailure(Throwable throwable) { LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid, connectionAdapter.getRemoteAddress(), throwable.getMessage()); LOG.trace("DETAIL of sending of hello failure:", throwable); } }, MoreExecutors.directExecutor()); LOG.debug("future features [{}] hooked ..", xid); }
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556protectedvoidpostHandshake(finalShortproposedVersion,finalLongxid){// set versionversion=proposedVersion;LOG.debug("version set: {}",proposedVersion);// request featuresGetFeaturesInputBuilder featuresBuilder=newGetFeaturesInputBuilder();featuresBuilder.setVersion(version).setXid(xid);LOG.debug("sending feature request for version={} and xid={}",version,xid);Future<RpcResult<GetFeaturesOutput>>featuresFuture=connectionAdapter.getFeatures(featuresBuilder.build());Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),newFutureCallback<RpcResult<GetFeaturesOutput>>(){@OverridepublicvoidonSuccess(@NonnullRpcResult<GetFeaturesOutput>rpcFeatures){LOG.trace("features are back");if(rpcFeatures.isSuccessful()){GetFeaturesOutput featureOutput=rpcFeatures.getResult();if(!deviceConnectionRateLimiter.tryAquire()){LOG.debug("Permit not acquired for dpn {}, disconnecting the device.",featureOutput.getDatapathId());connectionAdapter.disconnect();return;}LOG.debug("obtained features: datapathId={}",featureOutput.getDatapathId());LOG.debug("obtained features: auxiliaryId={}",featureOutput.getAuxiliaryId());LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",version,featureOutput.getDatapathId(),featureOutput.getAuxiliaryId());// 成功handshakeListener.onHandshakeSuccessful(featureOutput,proposedVersion);}else{// handshake failedLOG.warn("issuing disconnect during handshake [{}]",connectionAdapter.getRemoteAddress());for(RpcError rpcError:rpcFeatures.getErrors()){LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}",xid,rpcError.getInfo(),rpcError.getMessage(),rpcError.getSeverity(),rpcError.getCause());}handshakeListener.onHandshakeFailure();}LOG.debug("postHandshake DONE");}@OverridepublicvoidonFailure(Throwable throwable){LOG.warn("getting feature failed seriously [{}, addr:{}]: {}",xid,connectionAdapter.getRemoteAddress(),throwable.getMessage());LOG.trace("DETAIL of sending of hello failure:",throwable);}},MoreExecutors.directExecutor());LOG.debug("future features [{}] hooked ..",xid);}
-
在
get-features
成功后,会调用handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);
继续接下来的处理。HandshakeListenerImpl.onHandshakeSuccessful
onHandshakeSuccessful
方法逻辑:- 设置connectionContext状态为WORKING
- 设置connectionContext.featuresReply为上一步调用get-features的返回
- 设置connectionContext.nodeId为datapathId
- 调用
connectionContext.handshakeSuccessful()
,创建DeviceInfoImpl对象this.deviceInfo = new DeviceInfoImpl()
-
最后,向switch发送
barrier
消息。如果成功回调addBarrierCallback()
方法-
用于保证在switch之前的命令都已经被执行
Java@Override public void onHandshakeSuccessful(final GetFeaturesOutput featureOutput, final Short version) { if (LOG.isDebugEnabled()) { LOG.debug("handshake succeeded: {}", connectionContext.getConnectionAdapter().getRemoteAddress()); } this.handshakeContext.close(); connectionContext.changeStateToWorking(); // 设置connectionContext的features信息 connectionContext.setFeatures(featureOutput); // nodeId是datapathId connectionContext.setNodeId(InventoryDataServiceUtil.nodeIdFromDatapathId(featureOutput.getDatapathId())); // 效果:connectionContext中new DeviceInfoImpl connectionContext.handshakeSuccessful();
// fire barrier in order to sweep all handshake and posthandshake messages before continue final ListenableFuture<RpcResult<BarrierOutput>> barrier = fireBarrier(version, 0L); // barrier消息回调 Futures.addCallback(barrier, addBarrierCallback(), MoreExecutors.directExecutor()); }
12345678910111213141516171819@OverridepublicvoidonHandshakeSuccessful(finalGetFeaturesOutput featureOutput,finalShortversion){if(LOG.isDebugEnabled()){LOG.debug("handshake succeeded: {}",connectionContext.getConnectionAdapter().getRemoteAddress());}this.handshakeContext.close();connectionContext.changeStateToWorking();// 设置connectionContext的features信息connectionContext.setFeatures(featureOutput);// nodeId是datapathIdconnectionContext.setNodeId(InventoryDataServiceUtil.nodeIdFromDatapathId(featureOutput.getDatapathId()));// 效果:connectionContext中new DeviceInfoImplconnectionContext.handshakeSuccessful();// fire barrier in order to sweep all handshake and posthandshake messages before continuefinalListenableFuture<RpcResult<BarrierOutput>>barrier=fireBarrier(version,0L);// barrier消息回调Futures.addCallback(barrier,addBarrierCallback(),MoreExecutors.directExecutor());}
-
HandshakeListenerImpl.addBarrierCallback
addBarrierCallback()
方法,核心逻辑deviceConnectedHandler.deviceConnected(connectionContext);
,用于调用ContextChainHolderImpl.deviceConnected
方法-
deviceConnectedHandler
变量是在ConnectionManagerImpl.onSwitchConnected
方法,创建HandshakeListenerImpl
对象时传入,即ContextChainHolderImpl
。Javaprivate FutureCallback<RpcResult<BarrierOutput>> addBarrierCallback() { return new FutureCallback<RpcResult<BarrierOutput>>() { @Override @SuppressWarnings("checkstyle:IllegalCatch") public void onSuccess(@Nullable final RpcResult<BarrierOutput> result) { if (LOG.isDebugEnabled()) { LOG.debug("succeeded by getting sweep barrier after post-handshake for device {}", connectionContext.getDeviceInfo()); } try { // device连接成功: 调用ContextChainHolderImpl.deviceConnected() ConnectionStatus connectionStatusResult = deviceConnectedHandler.deviceConnected(connectionContext); if (connectionStatusResult != ConnectionStatus.MAY_CONTINUE) { connectionContext.closeConnection(false); } SessionStatistics.countEvent(connectionContext.getDeviceInfo().toString(), SessionStatistics.ConnectionStatus.CONNECTION_CREATED); } catch (final Exception e) { LOG.warn("initial processing failed for device {}", connectionContext.getDeviceInfo(), e); SessionStatistics.countEvent(connectionContext.getDeviceInfo().toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP); connectionContext.closeConnection(true); } }
@Override public void onFailure(final Throwable throwable) { LOG.warn("failed to get sweep barrier after post-handshake for device {}", connectionContext.getDeviceInfo(), throwable); connectionContext.closeConnection(false); } }; }
123456789101112131415161718192021222324252627282930313233privateFutureCallback<RpcResult<BarrierOutput>>addBarrierCallback(){returnnewFutureCallback<RpcResult<BarrierOutput>>(){@Override@SuppressWarnings("checkstyle:IllegalCatch")publicvoidonSuccess(@NullablefinalRpcResult<BarrierOutput>result){if(LOG.isDebugEnabled()){LOG.debug("succeeded by getting sweep barrier after post-handshake for device {}",connectionContext.getDeviceInfo());}try{// device连接成功: 调用ContextChainHolderImpl.deviceConnected()ConnectionStatus connectionStatusResult=deviceConnectedHandler.deviceConnected(connectionContext);if(connectionStatusResult!=ConnectionStatus.MAY_CONTINUE){connectionContext.closeConnection(false);}SessionStatistics.countEvent(connectionContext.getDeviceInfo().toString(),SessionStatistics.ConnectionStatus.CONNECTION_CREATED);}catch(finalExceptione){LOG.warn("initial processing failed for device {}",connectionContext.getDeviceInfo(),e);SessionStatistics.countEvent(connectionContext.getDeviceInfo().toString(),SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP);connectionContext.closeConnection(true);}}@OverridepublicvoidonFailure(finalThrowable throwable){LOG.warn("failed to get sweep barrier after post-handshake for device {}",connectionContext.getDeviceInfo(),throwable);connectionContext.closeConnection(false);}};}
ContextChainHolderImpl.deviceConnected
当调用到
ContextChainHolderImpl.deviceConnected
方法时,代表switch已经与控制器完成handshake。在此方法中,除了处理辅助连接,最核心的是为第一次连上控制器的switch创建ContextChainImpl对象!调用createContextChain(connectionContext)
方法,而后续的步骤已经不是handshake过程,是为switch创建各个context,并进行mastership选举等,本文不展开。Java@Override public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception { final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); final ContextChain contextChain = contextChainMap.get(deviceInfo); final FeaturesReply featuresReply = connectionContext.getFeatures(); final Short auxiliaryId = featuresReply != null ? featuresReply.getAuxiliaryId() : null;
// 辅助连接 if (auxiliaryId != null && auxiliaryId != 0) { if (contextChain == null) { LOG.warn("An auxiliary connection for device {}, but no primary connection. Refusing connection.", deviceInfo); return ConnectionStatus.REFUSING_AUXILIARY_CONNECTION; } else { if (contextChain.addAuxiliaryConnection(connectionContext)) { LOG.info("An auxiliary connection was added to device: {}", deviceInfo); return ConnectionStatus.MAY_CONTINUE; } else { LOG.warn("Not able to add auxiliary connection to the device {}", deviceInfo); return ConnectionStatus.REFUSING_AUXILIARY_CONNECTION; } } } else { LOG.info("Device {} connected.", deviceInfo); final boolean contextExists = contextChain != null; final boolean isClosing = contextExists && contextChain.isClosing();
if (!isClosing && connectingDevices.putIfAbsent(deviceInfo, connectionContext) != null) { LOG.warn("Device {} is already trying to connect, wait until succeeded or disconnected.", deviceInfo); return ConnectionStatus.ALREADY_CONNECTED; }
if (contextExists) { if (isClosing) { LOG.warn("Device {} is already in termination state, closing all incoming connections.", deviceInfo); return ConnectionStatus.CLOSING; }
LOG.warn("Device {} already connected. Closing previous connection", deviceInfo); destroyContextChain(deviceInfo); LOG.info("Old connection dropped, creating new context chain for device {}", deviceInfo); createContextChain(connectionContext); } else { // 设备第一次connect,需要创建contextChain LOG.info("No context chain found for device: {}, creating new.", deviceInfo); createContextChain(connectionContext); }
return ConnectionStatus.MAY_CONTINUE; }
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253@OverridepublicConnectionStatus deviceConnected(finalConnectionContext connectionContext)throwsException{finalDeviceInfo deviceInfo=connectionContext.getDeviceInfo();finalContextChain contextChain=contextChainMap.get(deviceInfo);finalFeaturesReply featuresReply=connectionContext.getFeatures();finalShortauxiliaryId=featuresReply!=null?featuresReply.getAuxiliaryId():null;// 辅助连接if(auxiliaryId!=null&&auxiliaryId!=0){if(contextChain==null){LOG.warn("An auxiliary connection for device {}, but no primary connection. Refusing connection.",deviceInfo);returnConnectionStatus.REFUSING_AUXILIARY_CONNECTION;}else{if(contextChain.addAuxiliaryConnection(connectionContext)){LOG.info("An auxiliary connection was added to device: {}",deviceInfo);returnConnectionStatus.MAY_CONTINUE;}else{LOG.warn("Not able to add auxiliary connection to the device {}",deviceInfo);returnConnectionStatus.REFUSING_AUXILIARY_CONNECTION;}}}else{LOG.info("Device {} connected.",deviceInfo);finalbooleancontextExists=contextChain!=null;finalbooleanisClosing=contextExists&&contextChain.isClosing();if(!isClosing&&connectingDevices.putIfAbsent(deviceInfo,connectionContext)!=null){LOG.warn("Device {} is already trying to connect, wait until succeeded or disconnected.",deviceInfo);returnConnectionStatus.ALREADY_CONNECTED;}if(contextExists){if(isClosing){LOG.warn("Device {} is already in termination state, closing all incoming connections.",deviceInfo);returnConnectionStatus.CLOSING;}LOG.warn("Device {} already connected. Closing previous connection",deviceInfo);destroyContextChain(deviceInfo);LOG.info("Old connection dropped, creating new context chain for device {}",deviceInfo);createContextChain(connectionContext);}else{// 设备第一次connect,需要创建contextChainLOG.info("No context chain found for device: {}, creating new.",deviceInfo);createContextChain(connectionContext);}returnConnectionStatus.MAY_CONTINUE;}}总结
至此,我们看到了switch连上控制器,从
TcpChannelInitializer
到ContextChainHolderImpl
,可以看到整个handshake过程的调用,更加认识到了ConnectionAdapterImpl
对象就是与底层switch通信的关键封装对象。关键类/对象
个人理解整理:
-
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK