50

ODL Openflowplugin Switch连上控制器Handshake过程源码分析

 5 years ago
source link: https://www.sdnlab.com/21257.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发。

本文为Openflowplugin源码分析第二篇,紧跟第一篇:(一)ODL OpenflowPlugin启动流程源码分析
补充说明:Openflowplugin版本0.6.2

读者约定:基本掌握Opendaylight的思想/有一定实践经验,想要深入理解openflowplugin源码/想对openflowplugin源码修改。

ODL-Openflowplugin-Switch-Handshake-668x400.jpg

Handshake过程

首先回顾第一篇笔记,在Openflowplugin启动过程中,SwitchConnectionProviderImpl.startup会启动tcp server监听端口。而tcp server是基于netty实现,在TcpHandler.java会创建Bootstrap/EventLoopGroup等,同样会设置channelInitialize。

Java

try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(socketChannelClass) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(channelInitializer) // 这里应该调用的是TcpChannelInitializer的initChannel方法 .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY , true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK)) .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);

if (startupAddress != null) { f = bootstrap.bind(startupAddress.getHostAddress(), port).sync(); } else { f = bootstrap.bind(port).sync(); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
try{
            ServerBootstrap bootstrap=newServerBootstrap();
            bootstrap.group(bossGroup,workerGroup)
                    .channel(socketChannelClass)
                    .handler(newLoggingHandler(LogLevel.DEBUG))
                    .childHandler(channelInitializer)// 这里应该调用的是TcpChannelInitializer的initChannel方法
                    .option(ChannelOption.SO_BACKLOG,128)
                    .option(ChannelOption.SO_REUSEADDR,true)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
                            newWriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK,DEFAULT_WRITE_HIGH_WATERMARK))
                    .childOption(ChannelOption.WRITE_SPIN_COUNT,DEFAULT_WRITE_SPIN_COUNT);
 
            if(startupAddress!=null){
                f=bootstrap.bind(startupAddress.getHostAddress(),port).sync();
            }else{
                f=bootstrap.bind(port).sync();
            }

当switch底层连上控制器tcp server监听的端口6633/6653,netty在接受channel后,会调用channelInitialize的initChannel方法,即TcpChannelInitializer.initChannel

关于netty,可以参考《netty in action》,推荐阅读。

1.初始化Channel

当switch通过tcp连接上控制器,会触发TcpChannelInitializer.initChannel方法初始化channel。

Java

@Override @SuppressWarnings("checkstyle:IllegalCatch") protected void initChannel(final SocketChannel ch) { if (ch.remoteAddress() != null) { final InetAddress switchAddress = ch.remoteAddress().getAddress(); final int port = ch.localAddress().getPort(); final int remotePort = ch.remoteAddress().getPort(); LOG.debug("Incoming connection from (remote address): {}:{} --> :{}", switchAddress.toString(), remotePort, port);

if (!getSwitchConnectionHandler().accept(switchAddress)) { ch.disconnect(); LOG.debug("Incoming connection rejected"); return; } } LOG.debug("Incoming connection accepted - building pipeline"); allChannels.add(ch); ConnectionFacade connectionFacade = null; connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null, useBarrier(), getChannelOutboundQueueSize()); try { LOG.debug("Calling OF plugin: {}", getSwitchConnectionHandler()); // 当channel建立,调用ConnectionManageImpl的onSwitchConnected方法 getSwitchConnectionHandler().onSwitchConnected(connectionFacade); // 检查上一步设置的3个listener是否存在 connectionFacade.checkListeners(); ch.pipeline().addLast(PipelineHandlers.IDLE_HANDLER.name(), new IdleHandler(getSwitchIdleTimeout(), TimeUnit.MILLISECONDS)); boolean tlsPresent = false;

// If this channel is configured to support SSL it will only support SSL if (getTlsConfiguration() != null) { tlsPresent = true; final SslContextFactory sslFactory = new SslContextFactory(getTlsConfiguration()); final SSLEngine engine = sslFactory.getServerContext().createSSLEngine(); engine.setNeedClientAuth(true); engine.setUseClientMode(false); List<String> suitesList = getTlsConfiguration().getCipherSuites(); if (suitesList != null && !suitesList.isEmpty()) { LOG.debug("Requested Cipher Suites are: {}", suitesList); String[] suites = suitesList.toArray(new String[suitesList.size()]); engine.setEnabledCipherSuites(suites); LOG.debug("Cipher suites enabled in SSLEngine are: {}", Arrays.toString(engine.getEnabledCipherSuites())); } final SslHandler ssl = new SslHandler(engine); final Future<Channel> handshakeFuture = ssl.handshakeFuture(); final ConnectionFacade finalConnectionFacade = connectionFacade; // 会调用ConnectionReadyListenerImpl.onConnectionReady()发起handshake handshakeFuture.addListener(future -> finalConnectionFacade.fireConnectionReadyNotification()); ch.pipeline().addLast(PipelineHandlers.SSL_HANDLER.name(), ssl); } // Decodes incoming messages into message frames. ch.pipeline().addLast(PipelineHandlers.OF_FRAME_DECODER.name(), new OFFrameDecoder(connectionFacade, tlsPresent)); ch.pipeline().addLast(PipelineHandlers.OF_VERSION_DETECTOR.name(), new OFVersionDetector()); final OFDecoder ofDecoder = new OFDecoder(); ofDecoder.setDeserializationFactory(getDeserializationFactory()); ch.pipeline().addLast(PipelineHandlers.OF_DECODER.name(), ofDecoder); final OFEncoder ofEncoder = new OFEncoder(); ofEncoder.setSerializationFactory(getSerializationFactory()); ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(), ofEncoder); // Delegates translated POJOs into MessageConsumer. ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade)); if (!tlsPresent) { // 如果没有配置tls加密 // 会调用ConnectionReadyListenerImpl.onConnectionReady()发起handshake connectionFacade.fireConnectionReadyNotification(); } } catch (RuntimeException e) { LOG.warn("Failed to initialize channel", e); ch.close(); } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Override
    @SuppressWarnings("checkstyle:IllegalCatch")
    protectedvoidinitChannel(finalSocketChannel ch){
        if(ch.remoteAddress()!=null){
            finalInetAddress switchAddress=ch.remoteAddress().getAddress();
            finalintport=ch.localAddress().getPort();
            finalintremotePort=ch.remoteAddress().getPort();
            LOG.debug("Incoming connection from (remote address): {}:{} --> :{}",
                switchAddress.toString(),remotePort,port);
 
            if(!getSwitchConnectionHandler().accept(switchAddress)){
                ch.disconnect();
                LOG.debug("Incoming connection rejected");
                return;
            }
        }
        LOG.debug("Incoming connection accepted - building pipeline");
        allChannels.add(ch);
        ConnectionFacade connectionFacade=null;
        connectionFacade=connectionAdapterFactory.createConnectionFacade(ch,null,useBarrier(),
                getChannelOutboundQueueSize());
        try{
            LOG.debug("Calling OF plugin: {}",getSwitchConnectionHandler());
            // 当channel建立,调用ConnectionManageImpl的onSwitchConnected方法
            getSwitchConnectionHandler().onSwitchConnected(connectionFacade);
            // 检查上一步设置的3个listener是否存在
            connectionFacade.checkListeners();
            ch.pipeline().addLast(PipelineHandlers.IDLE_HANDLER.name(),
                    newIdleHandler(getSwitchIdleTimeout(),TimeUnit.MILLISECONDS));
            booleantlsPresent=false;
 
            // If this channel is configured to support SSL it will only support SSL
            if(getTlsConfiguration()!=null){
                tlsPresent=true;
                finalSslContextFactory sslFactory=newSslContextFactory(getTlsConfiguration());
                finalSSLEngine engine=sslFactory.getServerContext().createSSLEngine();
                engine.setNeedClientAuth(true);
                engine.setUseClientMode(false);
                List<String>suitesList=getTlsConfiguration().getCipherSuites();
                if(suitesList!=null&&!suitesList.isEmpty()){
                    LOG.debug("Requested Cipher Suites are: {}",suitesList);
                    String[]suites=suitesList.toArray(newString[suitesList.size()]);
                    engine.setEnabledCipherSuites(suites);
                    LOG.debug("Cipher suites enabled in SSLEngine are: {}",
                            Arrays.toString(engine.getEnabledCipherSuites()));
                }
                finalSslHandler ssl=newSslHandler(engine);
                finalFuture<Channel>handshakeFuture=ssl.handshakeFuture();
                finalConnectionFacade finalConnectionFacade=connectionFacade;
                // 会调用ConnectionReadyListenerImpl.onConnectionReady()发起handshake
                handshakeFuture.addListener(future->finalConnectionFacade.fireConnectionReadyNotification());
                ch.pipeline().addLast(PipelineHandlers.SSL_HANDLER.name(),ssl);
            }
            // Decodes incoming messages into message frames.
            ch.pipeline().addLast(PipelineHandlers.OF_FRAME_DECODER.name(),
                    newOFFrameDecoder(connectionFacade,tlsPresent));
            ch.pipeline().addLast(PipelineHandlers.OF_VERSION_DETECTOR.name(),newOFVersionDetector());
            finalOFDecoder ofDecoder=newOFDecoder();
            ofDecoder.setDeserializationFactory(getDeserializationFactory());
            ch.pipeline().addLast(PipelineHandlers.OF_DECODER.name(),ofDecoder);
            finalOFEncoder ofEncoder=newOFEncoder();
            ofEncoder.setSerializationFactory(getSerializationFactory());
            ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(),ofEncoder);
            // Delegates translated POJOs into MessageConsumer.
            ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
                    newDelegatingInboundHandler(connectionFacade));
            if(!tlsPresent){
                // 如果没有配置tls加密
                // 会调用ConnectionReadyListenerImpl.onConnectionReady()发起handshake
                connectionFacade.fireConnectionReadyNotification();
            }
        }catch(RuntimeExceptione){
            LOG.warn("Failed to initialize channel",e);
            ch.close();
        }
    }

在initchannel方法中主要逻辑:
1.创建ConnectionAdapterImpl对象,封装SocketChannel channel对象。

Java
connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null, useBarrier(), getChannelOutboundQueueSize());
1
connectionFacade=connectionAdapterFactory.createConnectionFacade(ch,null,useBarrier(),getChannelOutboundQueueSize());

会为每个connection(switch)创建一个ConnectionAdapterImpl对象,此对象是封装底层switch的关键对象,上层通过此对象与switch通信。从变量名Facade也能推敲出此对象的作用。

2.调用ConnectionManagerImpl.onSwitchConnected方法,传参传入的是ConnectionAdapterImpl对象

Java
getSwitchConnectionHandler().onSwitchConnected(connectionFacade);
1
getSwitchConnectionHandler().onSwitchConnected(connectionFacade);

而在ConnectionManagerImpl.onSwitchConnected的处理是给ConnectionAdapterImpl对象设置3个listener,用于处理底层各个事件。

  • 1.创建ConnectionReadyListenerImpl对象给ConnectionAdapterImpl对象传入引用(setConnectionReadyListener);
    • ConnectionReadyListenerImpl对象封装ConnectionContextImplHandshakeContextImpl
    • ConnectionReadyListenerImpl对象提供onConnectionReady()方法,该方法处理是调用HandshakeManagerImpl.shake()
  • 2.创建OpenflowProtocolListenerInitialImpl对象,给ConnectionAdapterImpl对象传入引用(setMessageListener);
    • OpenflowProtocolListenerInitialImpl对象用于处理底层switch发给控制器的消息,比如提供onHelloMessage方法。
    • 注意:该对象仅用于处理handshake过程中涉及的基本消息,在handshake后会被另一对象OpenflowProtocolListenerFullImpl替换。
  • 3.创建SystemNotificationsListenerImpl对象,给ConnectionAdapterImpl对象传入引用(setSystemListener
    • SystemNotificationsListenerImpl对象用于处理SwitchIdleEvent和DisconnectEvent事件。提供onSwitchIdleEvent()方法, 当swich idle发送echo心跳消息;提供onDisconnectEvent方法处理disconnect

      Java

      @Override public void onSwitchConnected(final ConnectionAdapter connectionAdapter) { LOG.trace("prepare connection context"); // connectionAdapter是:ConnectionAdapterImpl: 包含基本的socket channel信息

      // 创建ConnectionContext final ConnectionContext connectionContext = new ConnectionContextImpl(connectionAdapter); connectionContext.setDeviceDisconnectedHandler(this.deviceDisconnectedHandler);

      // 创建HandshakeListenerImpl, 当handshake成功,会调用 deviceConnectedHandler.deviceConnected() // deviceConnectedHandlerContext是在OpenflowPluginProvider中设置的ContextChainHolderImpl HandshakeListener handshakeListener = new HandshakeListenerImpl(connectionContext, deviceConnectedHandler);

      // 创建HandshakeManagerImpl: 提供shake()等方法实现握手 final HandshakeManager handshakeManager = createHandshakeManager(connectionAdapter, handshakeListener);

      LOG.trace("prepare handshake context"); // manager传入context HandshakeContext handshakeContext = new HandshakeContextImpl(executorService, handshakeManager); // 设置handshakeContext,用于当handshake失败时调用close()方法 handshakeListener.setHandshakeContext(handshakeContext); connectionContext.setHandshakeContext(handshakeContext);

      // 至此connectionContext封装了 connectionAdapter, handshakeContext

      LOG.trace("prepare connection listeners"); /* 传入handshakeContext, 提供onConnectionReady()方法. onConnectionReady()效果: 1.会修改 connectionContext的状态为HANDSHAKING 2.创建 HandshakeStepWrapper(), 最终调用HandshakeManagerImpl.shake() */ final ConnectionReadyListener connectionReadyListener = new ConnectionReadyListenerImpl( connectionContext, handshakeContext); // 给底层TcpCannelInitializer传递上来的connectionAdapter 设置connectionReadyListener connectionAdapter.setConnectionReadyListener(connectionReadyListener);

      // 给底层TcpCannelInitializer传递上来的connectionAdapter 设置ofMessageListener(监听openflow消息处理) final OpenflowProtocolListener ofMessageListener = new OpenflowProtocolListenerInitialImpl(connectionContext, handshakeContext); connectionAdapter.setMessageListener(ofMessageListener);

      // 提供onSwitchIdleEvent()方法, 当swich idle发送echo心跳消息 final SystemNotificationsListener systemListener = new SystemNotificationsListenerImpl( connectionContext, config.getEchoReplyTimeout().getValue(), executorService); // 给底层TcpCannelInitializer传递上来的connectionAdapter 设置systemListener connectionAdapter.setSystemListener(systemListener);

      LOG.trace("connection ballet finished"); }

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      @Override
          publicvoidonSwitchConnected(finalConnectionAdapter connectionAdapter){
              LOG.trace("prepare connection context");
              // connectionAdapter是:ConnectionAdapterImpl: 包含基本的socket channel信息
       
              // 创建ConnectionContext
              finalConnectionContext connectionContext=newConnectionContextImpl(connectionAdapter);
              connectionContext.setDeviceDisconnectedHandler(this.deviceDisconnectedHandler);
       
              // 创建HandshakeListenerImpl, 当handshake成功,会调用 deviceConnectedHandler.deviceConnected()
              //      deviceConnectedHandlerContext是在OpenflowPluginProvider中设置的ContextChainHolderImpl
              HandshakeListener handshakeListener=newHandshakeListenerImpl(connectionContext,deviceConnectedHandler);
       
              // 创建HandshakeManagerImpl: 提供shake()等方法实现握手
              finalHandshakeManager handshakeManager=createHandshakeManager(connectionAdapter,handshakeListener);
       
              LOG.trace("prepare handshake context");
              // manager传入context
              HandshakeContext handshakeContext=newHandshakeContextImpl(executorService,handshakeManager);
              // 设置handshakeContext,用于当handshake失败时调用close()方法
              handshakeListener.setHandshakeContext(handshakeContext);
              connectionContext.setHandshakeContext(handshakeContext);
       
              // 至此connectionContext封装了 connectionAdapter, handshakeContext
       
              LOG.trace("prepare connection listeners");
              /*
                  传入handshakeContext, 提供onConnectionReady()方法. onConnectionReady()效果:
                      1.会修改 connectionContext的状态为HANDSHAKING
                      2.创建 HandshakeStepWrapper(), 最终调用HandshakeManagerImpl.shake()
                */
              finalConnectionReadyListener connectionReadyListener=newConnectionReadyListenerImpl(
                      connectionContext,handshakeContext);
              // 给底层TcpCannelInitializer传递上来的connectionAdapter 设置connectionReadyListener
              connectionAdapter.setConnectionReadyListener(connectionReadyListener);
       
              // 给底层TcpCannelInitializer传递上来的connectionAdapter 设置ofMessageListener(监听openflow消息处理)
              finalOpenflowProtocolListener ofMessageListener=
                      newOpenflowProtocolListenerInitialImpl(connectionContext,handshakeContext);
              connectionAdapter.setMessageListener(ofMessageListener);
       
              // 提供onSwitchIdleEvent()方法, 当swich idle发送echo心跳消息
              finalSystemNotificationsListener systemListener=newSystemNotificationsListenerImpl(
                      connectionContext,config.getEchoReplyTimeout().getValue(),executorService);
              // 给底层TcpCannelInitializer传递上来的connectionAdapter 设置systemListener
              connectionAdapter.setSystemListener(systemListener);
       
              LOG.trace("connection ballet finished");
          }

3.给channel.pipeline设置ChannelHandler
会给channel的pipeline对象传入ChannelHandler对象,用于处理channel idle/inactive、处理openflow消息编码解码等。

pipeline是netty针对数据流处理的设计,具体参考《netty in action》

4.调用ConnectionAdapterImpl.fireConnectionReadyNotification()方法发起handshake
TcpChannelInitializer.initChannel方法中,可以看到无论是否开启tls,最终都会调用ConnectionAdapterImpl.fireConnectionReadyNotification()方法:
1.开tls

Java
final ConnectionFacade finalConnectionFacade = connectionFacade; handshakeFuture.addListener(future -> finalConnectionFacade.fireConnectionReadyNotification());
1
2
finalConnectionFacade finalConnectionFacade=connectionFacade;
handshakeFuture.addListener(future->finalConnectionFacade.fireConnectionReadyNotification());

2.没开tls

Java
if (!tlsPresent) { connectionFacade.fireConnectionReadyNotification(); }
1
2
3
if(!tlsPresent){
connectionFacade.fireConnectionReadyNotification();
}

而上面两个代码片段的connectionFacade变量正是ConnectionAdapterImpl对象。其fireConnectionReadyNotification()方法如下:

Java

@Override public void fireConnectionReadyNotification() { versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name()); Preconditions.checkState(versionDetector != null);

new Thread(() -> connectionReadyListener.onConnectionReady()).start(); }

1
2
3
4
5
6
7
@Override
    publicvoidfireConnectionReadyNotification(){
        versionDetector=(OFVersionDetector)channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
        Preconditions.checkState(versionDetector!=null);
 
        newThread(()->connectionReadyListener.onConnectionReady()).start();
    }

可以看到fireConnectionReadyNotification()方法实际是调用connectionReadyListener.onConnectionReady(),而connectionReadyListener变量正是上面第二步中调用setConnectionReadyListener传入的ConnectionReadyListenerImpl对象。

即分配新的线程执行ConnectionReadyListenerImpl.onConnectionReady(),而onConnectionReady()方法会触发handshake,在下面开展。

总结,可以看到在Tcp channel初始化时(TcpChannelInitializer.initChannel),会:

  • 创建ConnectionAdapterImpl对象,封装传入的SocketChannel channel对象;
  • 调用ConnectionManagerImpl.onSwitchConnected方法,给ConnectionAdapterImpl对象setConnectionReadyListener, setMessageListener, setSystemListener
  • 给pipeline设置各种channelHandler
  • 调用ConnectionAdapterImpl.fireConnectionReadyNotification()发起handshake;

2.ConnectionReady开始Handshake

TcpChannelInitializer.initChannel最后,调用ConnectionReadyListenerImpl.onConnectionReady()如下:

Java

@Override @SuppressWarnings("checkstyle:IllegalCatch") public void onConnectionReady() { if (LOG.isDebugEnabled()) { LOG.debug("device is connected and ready-to-use (pipeline prepared): {}", connectionContext.getConnectionAdapter().getRemoteAddress()); }

if (connectionContext.getConnectionState() == null) { synchronized (connectionContext) { if (connectionContext.getConnectionState() == null) { connectionContext.changeStateToHandshaking(); HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( null, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter()); // 调用线程运行 final Future<?> handshakeResult = handshakeContext.getHandshakePool().submit(handshakeStepWrapper);

try { // As we run not in netty thread, // need to remain in sync lock until initial handshake step processed. handshakeResult.get(); } catch (Exception e) { LOG.error("failed to process onConnectionReady event on device {}, reason {}", connectionContext.getConnectionAdapter().getRemoteAddress(), e); connectionContext.closeConnection(false); handshakeContext.close(); } } else { LOG.debug("already touched by hello message from device {} after second check", connectionContext.getConnectionAdapter().getRemoteAddress()); } } } else { LOG.debug("already touched by hello message from device {} after first check", connectionContext.getConnectionAdapter().getRemoteAddress()); } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
    @SuppressWarnings("checkstyle:IllegalCatch")
    publicvoidonConnectionReady(){
        if(LOG.isDebugEnabled()){
            LOG.debug("device is connected and ready-to-use (pipeline prepared): {}",
                    connectionContext.getConnectionAdapter().getRemoteAddress());
        }
 
        if(connectionContext.getConnectionState()==null){
            synchronized(connectionContext){
                if(connectionContext.getConnectionState()==null){
                    connectionContext.changeStateToHandshaking();
                    HandshakeStepWrapper handshakeStepWrapper=newHandshakeStepWrapper(
                            null,handshakeContext.getHandshakeManager(),connectionContext.getConnectionAdapter());
                    // 调用线程运行
                    finalFuture<?>handshakeResult=handshakeContext.getHandshakePool().submit(handshakeStepWrapper);
 
                    try{
                        // As we run not in netty thread,
                        // need to remain in sync lock until initial handshake step processed.
                        handshakeResult.get();
                    }catch(Exceptione){
                        LOG.error("failed to process onConnectionReady event on device {}, reason {}",
                                connectionContext.getConnectionAdapter().getRemoteAddress(),
                                e);
                        connectionContext.closeConnection(false);
                        handshakeContext.close();
                    }
                }else{
                    LOG.debug("already touched by hello message from device {} after second check",
                            connectionContext.getConnectionAdapter().getRemoteAddress());
                }
            }
        }else{
            LOG.debug("already touched by hello message from device {} after first check",
                    connectionContext.getConnectionAdapter().getRemoteAddress());
        }
    }

onConnectionReady()方法主要逻辑:

  1. connectionContext状态设置为HANDSHAKING
  2. 创建HandshakeStepWrapper对象,分配线程运行:实际上是运行HandshakeManagerImpl对象的shake方法(在ConnectionManagerImpl中创建的)

    Java
    @Override public void run() { if (connectionAdapter.isAlive()) { handshakeManager.shake(helloMessage); } else { LOG.debug("connection is down - skipping handshake step"); } }
    1
    2
    3
    4
    5
    6
    7
    8
    @Override
        publicvoidrun(){
            if(connectionAdapter.isAlive()){
                handshakeManager.shake(helloMessage);
            }else{
                LOG.debug("connection is down - skipping handshake step");
            }
        }

3.控制器主动发送Hello消息

HandshakeManagerImpl.shake,注意此时调用shake方法时,传入的receivedHello为null,所以会调用sendHelloMessage(highestVersion, getNextXid())

Java

@Override @SuppressWarnings("checkstyle:IllegalCatch") public synchronized void shake(HelloMessage receivedHello) {

if (version != null) { // Some switches respond with a second HELLO acknowledging our HELLO // but we've already completed the handshake based on the negotiated // version and have registered this switch. LOG.debug("Hello recieved after handshake already settled ... ignoring."); return; }

LOG.trace("handshake STARTED"); setActiveXid(ACTIVE_XID);

try { if (receivedHello == null) { // first Hello sending sendHelloMessage(highestVersion, getNextXid()); lastProposedVersion = highestVersion; LOG.trace("ret - firstHello+wait"); return; }

// process the 2. and later hellos Short remoteVersion = receivedHello.getVersion(); List<Elements> elements = receivedHello.getElements(); setActiveXid(receivedHello.getXid()); List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements); LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(), remoteVersionBitmap);

if (useVersionBitmap && remoteVersionBitmap != null) { // versionBitmap on both sides -> ONE STEP DECISION handleVersionBitmapNegotiation(elements); } else { // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying handleStepByStepVersionNegotiation(remoteVersion); } } catch (Exception ex) { errorHandler.handleException(ex); LOG.trace("ret - shake fail - closing"); handshakeListener.onHandshakeFailure(); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
    @SuppressWarnings("checkstyle:IllegalCatch")
    publicsynchronizedvoidshake(HelloMessage receivedHello){
 
        if(version!=null){
            // Some switches respond with a second HELLO acknowledging our HELLO
            // but we've already completed the handshake based on the negotiated
            // version and have registered this switch.
            LOG.debug("Hello recieved after handshake already settled ... ignoring.");
            return;
        }
 
        LOG.trace("handshake STARTED");
        setActiveXid(ACTIVE_XID);
 
        try{
            if(receivedHello==null){
                // first Hello sending
                sendHelloMessage(highestVersion,getNextXid());
                lastProposedVersion=highestVersion;
                LOG.trace("ret - firstHello+wait");
                return;
            }
 
            // process the 2. and later hellos
            ShortremoteVersion=receivedHello.getVersion();
            List<Elements>elements=receivedHello.getElements();
            setActiveXid(receivedHello.getXid());
            List<Boolean>remoteVersionBitmap=MessageFactory.digVersions(elements);
            LOG.debug("Hello message: version={}, xid={}, bitmap={}",remoteVersion,receivedHello.getXid(),
                      remoteVersionBitmap);
 
            if(useVersionBitmap&&remoteVersionBitmap!=null){
                // versionBitmap on both sides -> ONE STEP DECISION
                handleVersionBitmapNegotiation(elements);
            }else{
                // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
                handleStepByStepVersionNegotiation(remoteVersion);
            }
        }catch(Exception ex){
            errorHandler.handleException(ex);
            LOG.trace("ret - shake fail - closing");
            handshakeListener.onHandshakeFailure();
        }

sendHelloMessage方法如下,实际是调用ConnectionAdapterImpl对象的hello方法。最终控制器发送hello消息给switch,进行协商openflow版本。

  • 这里就可以看出,控制器与底层switch通信靠ConnectionAdapterImpl对象封装

    Java

    private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {

    HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);

    final SettableFuture<Void> resultFtr = SettableFuture.create();

    LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements()));

    // 发送hello消息 Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);

    ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult); Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() { @Override public void onSuccess(@Nonnull RpcResult<Void> result) { if (result.isSuccessful()) { LOG.debug("hello successfully sent, xid={}, addr={}", helloXid, connectionAdapter.getRemoteAddress()); resultFtr.set(null); } else { for (RpcError error : result.getErrors()) { LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, error.getInfo(), error.getSeverity(), error.getMessage(), connectionAdapter.getRemoteAddress()); if (error.getCause() != null) { LOG.trace("DETAIL of sending hello failure", error.getCause()); } } resultFtr.cancel(false); handshakeListener.onHandshakeFailure(); } }

    @Override public void onFailure(Throwable throwable) { LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid, connectionAdapter.getRemoteAddress(), throwable.getMessage()); LOG.trace("DETAIL of sending of hello failure:", throwable); resultFtr.cancel(false); handshakeListener.onHandshakeFailure(); } }, MoreExecutors.directExecutor()); LOG.trace("sending hello message [{}] - result hooked ..", helloXid); return resultFtr; }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    privateListenableFuture<Void>sendHelloMessage(ShorthelloVersion,finalLonghelloXid)throwsException{
     
     
            HelloInput helloInput=MessageFactory.createHelloInput(helloVersion,helloXid,versionOrder);
     
            finalSettableFuture<Void>resultFtr=SettableFuture.create();
     
            LOG.debug("sending hello message: version{}, xid={}, version bitmap={}",helloVersion,helloXid,
                      MessageFactory.digVersions(helloInput.getElements()));
     
            // 发送hello消息
            Future<RpcResult<Void>>helloResult=connectionAdapter.hello(helloInput);
     
            ListenableFuture<RpcResult<Void>>rpcResultListenableFuture=JdkFutureAdapters.listenInPoolThread(helloResult);
            Futures.addCallback(rpcResultListenableFuture,newFutureCallback<RpcResult<Void>>(){
                @Override
                publicvoidonSuccess(@NonnullRpcResult<Void>result){
                    if(result.isSuccessful()){
                        LOG.debug("hello successfully sent, xid={}, addr={}",helloXid,
                                  connectionAdapter.getRemoteAddress());
                        resultFtr.set(null);
                    }else{
                        for(RpcError error:result.getErrors()){
                            LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}",helloXid,error.getInfo(),
                                      error.getSeverity(),error.getMessage(),connectionAdapter.getRemoteAddress());
                            if(error.getCause()!=null){
                                LOG.trace("DETAIL of sending hello failure",error.getCause());
                            }
                        }
                        resultFtr.cancel(false);
                        handshakeListener.onHandshakeFailure();
                    }
                }
     
                @Override
                publicvoidonFailure(Throwable throwable){
                    LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}",helloXid,
                             connectionAdapter.getRemoteAddress(),throwable.getMessage());
                    LOG.trace("DETAIL of sending of hello failure:",throwable);
                    resultFtr.cancel(false);
                    handshakeListener.onHandshakeFailure();
                }
            },MoreExecutors.directExecutor());
            LOG.trace("sending hello message [{}] - result hooked ..",helloXid);
            returnresultFtr;
        }

    4.控制器处理Switch回复的Hello消息

    在上述步骤,控制器主动会发送hello包到switch,然后switch也会回复数据包给控制器。下面展开探讨控制器是如何处理Switch回复。

    首先回到TcpChannelInitializer.initChannel,给ConnectionAdapterImpl对象设置了DelegatingInboundHandler

    Java
    // Delegates translated POJOs into MessageConsumer. ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade));
    1
    2
    3
    // Delegates translated POJOs into MessageConsumer.
    ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
    newDelegatingInboundHandler(connectionFacade));

    根据netty pipeline的数据流处理模型,当收到switch发送的消息,会调用DelegatingInboundHandler处理。会调用DelegatingInboundHandler.channelRead方法。

    DelegatingInboundHandlerchannelRead方法调用的是ConnectionAdapterImpl对象的consume方法。

    Java
    @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { consumer.consume((DataObject) msg); }
    1
    2
    3
    4
    @Override
        publicvoidchannelRead(finalChannelHandlerContext ctx,finalObjectmsg){
            consumer.consume((DataObject)msg);
        }

    最终就会调用到ConnectionAdapterImpl.consumeDeviceMessage方法:

    Java

    @Override public void consumeDeviceMessage(final DataObject message) { LOG.debug("ConsumeIntern msg on {}", channel); if (disconnectOccured) { return; } if (message instanceof Notification) {

    // System events if (message instanceof DisconnectEvent) { systemListener.onDisconnectEvent((DisconnectEvent) message); responseCache.invalidateAll(); disconnectOccured = true; } else if (message instanceof SwitchIdleEvent) { systemListener.onSwitchIdleEvent((SwitchIdleEvent) message); // OpenFlow messages } else if (message instanceof EchoRequestMessage) { if (outputManager != null) { outputManager.onEchoRequest((EchoRequestMessage) message); } else { messageListener.onEchoRequestMessage((EchoRequestMessage) message); } } else if (message instanceof ErrorMessage) { // Send only unmatched errors if (outputManager == null || !outputManager.onMessage((OfHeader) message)) { messageListener.onErrorMessage((ErrorMessage) message); } } else if (message instanceof ExperimenterMessage) { if (outputManager != null) { outputManager.onMessage((OfHeader) message); } messageListener.onExperimenterMessage((ExperimenterMessage) message); } else if (message instanceof FlowRemovedMessage) { messageListener.onFlowRemovedMessage((FlowRemovedMessage) message); } else if (message instanceof HelloMessage) { LOG.info("Hello received"); messageListener.onHelloMessage((HelloMessage) message); } else if (message instanceof MultipartReplyMessage) { if (outputManager != null) { outputManager.onMessage((OfHeader) message); } messageListener.onMultipartReplyMessage((MultipartReplyMessage) message); } else if (message instanceof PacketInMessage) { messageListener.onPacketInMessage((PacketInMessage) message); } else if (message instanceof PortStatusMessage) { messageListener.onPortStatusMessage((PortStatusMessage) message); } else { LOG.warn("message listening not supported for type: {}", message.getClass()); } } else if (message instanceof OfHeader) { LOG.debug("OF header msg received");

    if (alienMessageListener != null && alienMessageListener.onAlienMessage((OfHeader) message)) { LOG.debug("Alien message {} received", message.getImplementedInterface()); } else if (outputManager == null || !outputManager.onMessage((OfHeader) message) || message instanceof EchoOutput) { final RpcResponseKey key = createRpcResponseKey((OfHeader) message); final ResponseExpectedRpcListener<?> listener = findRpcResponse(key); if (listener != null) { LOG.debug("Corresponding rpcFuture found"); listener.completed((OfHeader) message); LOG.debug("After setting rpcFuture"); responseCache.invalidate(key); } } } else { LOG.warn("message listening not supported for type: {}", message.getClass()); } }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    @Override
        publicvoidconsumeDeviceMessage(finalDataObject message){
            LOG.debug("ConsumeIntern msg on {}",channel);
            if(disconnectOccured){
                return;
            }
            if(message instanceofNotification){
     
                // System events
                if(message instanceofDisconnectEvent){
                    systemListener.onDisconnectEvent((DisconnectEvent)message);
                    responseCache.invalidateAll();
                    disconnectOccured=true;
                }elseif(message instanceofSwitchIdleEvent){
                    systemListener.onSwitchIdleEvent((SwitchIdleEvent)message);
                // OpenFlow messages
                }elseif(message instanceofEchoRequestMessage){
                    if(outputManager!=null){
                        outputManager.onEchoRequest((EchoRequestMessage)message);
                    }else{
                        messageListener.onEchoRequestMessage((EchoRequestMessage)message);
                    }
                }elseif(message instanceofErrorMessage){
                    // Send only unmatched errors
                    if(outputManager==null||!outputManager.onMessage((OfHeader)message)){
                        messageListener.onErrorMessage((ErrorMessage)message);
                    }
                }elseif(message instanceofExperimenterMessage){
                    if(outputManager!=null){
                        outputManager.onMessage((OfHeader)message);
                    }
                    messageListener.onExperimenterMessage((ExperimenterMessage)message);
                }elseif(message instanceofFlowRemovedMessage){
                    messageListener.onFlowRemovedMessage((FlowRemovedMessage)message);
                }elseif(message instanceofHelloMessage){
                    LOG.info("Hello received");
                    messageListener.onHelloMessage((HelloMessage)message);
                }elseif(message instanceofMultipartReplyMessage){
                    if(outputManager!=null){
                        outputManager.onMessage((OfHeader)message);
                    }
                    messageListener.onMultipartReplyMessage((MultipartReplyMessage)message);
                }elseif(message instanceofPacketInMessage){
                    messageListener.onPacketInMessage((PacketInMessage)message);
                }elseif(message instanceofPortStatusMessage){
                    messageListener.onPortStatusMessage((PortStatusMessage)message);
                }else{
                    LOG.warn("message listening not supported for type: {}",message.getClass());
                }
            }elseif(message instanceofOfHeader){
                LOG.debug("OF header msg received");
     
                if(alienMessageListener!=null&&alienMessageListener.onAlienMessage((OfHeader)message)){
                    LOG.debug("Alien message {} received",message.getImplementedInterface());
                }elseif(outputManager==null||!outputManager.onMessage((OfHeader)message)
                        ||message instanceofEchoOutput){
                    finalRpcResponseKey key=createRpcResponseKey((OfHeader)message);
                    finalResponseExpectedRpcListener<?>listener=findRpcResponse(key);
                    if(listener!=null){
                        LOG.debug("Corresponding rpcFuture found");
                        listener.completed((OfHeader)message);
                        LOG.debug("After setting rpcFuture");
                        responseCache.invalidate(key);
                    }
                }
            }else{
                LOG.warn("message listening not supported for type: {}",message.getClass());
            }
        }

    以Handshake过程的Hello message为例,会调用messageListener.onHelloMessage((HelloMessage) message);,即调用OpenflowProtocolListenerInitialImpl.onHelloMessage方法:

    • 回忆上述步骤:在ConnectionManagerImpl.onSwitchConnected方法中,会将OpenflowProtocolListenerInitialImpl对象传入(setMessageListener)

      Java

      @Override public void onHelloMessage(final HelloMessage hello) { LOG.debug("processing HELLO.xid: {} from device {}", hello.getXid(), connectionContext.getConnectionAdapter().getRemoteAddress()); final ConnectionContext.CONNECTION_STATE connectionState = connectionContext.getConnectionState(); if (connectionState == null || ConnectionContext.CONNECTION_STATE.HANDSHAKING.equals(connectionState)) { synchronized (connectionContext) { if (connectionContext.getConnectionState() == null) { // got here before connection ready notification connectionContext.changeStateToHandshaking(); }

      if (checkState(ConnectionContext.CONNECTION_STATE.HANDSHAKING)) { final HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( hello, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter()); // use up netty thread handshakeStepWrapper.run(); } else { LOG.debug("already out of handshake phase but still received hello message from device {}", connectionContext.getConnectionAdapter().getRemoteAddress()); } } } else { //TODO: consider disconnecting of bad behaving device LOG.warn("Hello message received outside handshake phase:{} ", hello); LOG.debug("already touched by onConnectionReady event from device {} (or finished handshake)", connectionContext.getConnectionAdapter().getRemoteAddress()); } }

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      @Override
          publicvoidonHelloMessage(finalHelloMessage hello){
              LOG.debug("processing HELLO.xid: {} from device {}",hello.getXid(),
                      connectionContext.getConnectionAdapter().getRemoteAddress());
              finalConnectionContext.CONNECTION_STATE connectionState=connectionContext.getConnectionState();
              if(connectionState==null
                      ||ConnectionContext.CONNECTION_STATE.HANDSHAKING.equals(connectionState)){
                  synchronized(connectionContext){
                      if(connectionContext.getConnectionState()==null){
                          // got here before connection ready notification
                          connectionContext.changeStateToHandshaking();
                      }
       
                      if(checkState(ConnectionContext.CONNECTION_STATE.HANDSHAKING)){
                          finalHandshakeStepWrapper handshakeStepWrapper=newHandshakeStepWrapper(
                                  hello,handshakeContext.getHandshakeManager(),connectionContext.getConnectionAdapter());
                          // use up netty thread
                          handshakeStepWrapper.run();
                      }else{
                          LOG.debug("already out of handshake phase but still received hello message from device {}",
                                  connectionContext.getConnectionAdapter().getRemoteAddress());
                      }
                  }
              }else{
                  //TODO: consider disconnecting of bad behaving device
                  LOG.warn("Hello message received outside handshake phase:{} ",hello);
                  LOG.debug("already touched by onConnectionReady event from device {} (or finished handshake)",
                          connectionContext.getConnectionAdapter().getRemoteAddress());
              }
          }

      onHelloMessage方法中,会查询connectionContext的状态为HANDSHAKING时,会再次分配线程运行HandshakeStepWrapper,即再次调用HandshakeManagerImpl.shake方法。

    5.协商Openflow协议版本

    HandshakeManagerImpl.shake中,可以看到处理第二个或更后的hello包后续逻辑是根据switch的第一个hello返回是否带有openflow版本bit,而进行不同协商过程(handleVersionBitmapNegotiation, handleStepByStepVersionNegotiation)。

    而具体两种协商过程可以参考官方文档说明,在这里不展开。

    odl-ofp-handshake.png

    两种协商过程,最终都会调用HandshakeManagerImpl.postHandshake方法。

    HandshakeManagerImpl.postHandshake

    在控制器与switch通过协商确定openflow版本号后,会调用postHandshake方法。postHandshake方法主要操作:

    • 调用get-features rpc,向switch请求获取features。这里也是通过调用ConnectionAdapterImpl对象(connectionAdapter.getFeatures)

      • features包括:datapathId,buffers,tables,auxiliaryId,capabilities,reserved,actions,phy-port等(参考openflow-protocol.yang)

        Java

        protected void postHandshake(final Short proposedVersion, final Long xid) { // set version version = proposedVersion;

        LOG.debug("version set: {}", proposedVersion); // request features GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder(); featuresBuilder.setVersion(version).setXid(xid); LOG.debug("sending feature request for version={} and xid={}", version, xid); Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter.getFeatures(featuresBuilder.build());

        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture), new FutureCallback<RpcResult<GetFeaturesOutput>>() { @Override public void onSuccess(@Nonnull RpcResult<GetFeaturesOutput> rpcFeatures) { LOG.trace("features are back"); if (rpcFeatures.isSuccessful()) { GetFeaturesOutput featureOutput = rpcFeatures.getResult(); if (!deviceConnectionRateLimiter.tryAquire()) { LOG.debug("Permit not acquired for dpn {}, disconnecting the device.", featureOutput.getDatapathId()); connectionAdapter.disconnect(); return; }

        LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId()); LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId()); LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}", version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId()); // 成功 handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion); } else { // handshake failed LOG.warn("issuing disconnect during handshake [{}]", connectionAdapter.getRemoteAddress()); for (RpcError rpcError : rpcFeatures.getErrors()) { LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid, rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(), rpcError.getCause()); } handshakeListener.onHandshakeFailure(); }

        LOG.debug("postHandshake DONE"); }

        @Override public void onFailure(Throwable throwable) { LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid, connectionAdapter.getRemoteAddress(), throwable.getMessage()); LOG.trace("DETAIL of sending of hello failure:", throwable); } }, MoreExecutors.directExecutor()); LOG.debug("future features [{}] hooked ..", xid); }

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        protectedvoidpostHandshake(finalShortproposedVersion,finalLongxid){
                // set version
                version=proposedVersion;
         
                LOG.debug("version set: {}",proposedVersion);
                // request features
                GetFeaturesInputBuilder featuresBuilder=newGetFeaturesInputBuilder();
                featuresBuilder.setVersion(version).setXid(xid);
                LOG.debug("sending feature request for version={} and xid={}",version,xid);
                Future<RpcResult<GetFeaturesOutput>>featuresFuture=connectionAdapter.getFeatures(featuresBuilder.build());
         
                Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
                        newFutureCallback<RpcResult<GetFeaturesOutput>>(){
                            @Override
                            publicvoidonSuccess(@NonnullRpcResult<GetFeaturesOutput>rpcFeatures){
                                LOG.trace("features are back");
                                if(rpcFeatures.isSuccessful()){
                                    GetFeaturesOutput featureOutput=rpcFeatures.getResult();
                                    if(!deviceConnectionRateLimiter.tryAquire()){
                                        LOG.debug("Permit not acquired for dpn {}, disconnecting the device.",
                                                  featureOutput.getDatapathId());
                                        connectionAdapter.disconnect();
                                        return;
                                    }
         
                                    LOG.debug("obtained features: datapathId={}",featureOutput.getDatapathId());
                                    LOG.debug("obtained features: auxiliaryId={}",featureOutput.getAuxiliaryId());
                                    LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
                                              version,featureOutput.getDatapathId(),
                                              featureOutput.getAuxiliaryId());
                                    // 成功
                                    handshakeListener.onHandshakeSuccessful(featureOutput,proposedVersion);
                                }else{
                                    // handshake failed
                                    LOG.warn("issuing disconnect during handshake [{}]",
                                             connectionAdapter.getRemoteAddress());
                                    for(RpcError rpcError:rpcFeatures.getErrors()){
                                        LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}",xid,
                                                  rpcError.getInfo(),rpcError.getMessage(),rpcError.getSeverity(),
                                                  rpcError.getCause());
                                    }
                                    handshakeListener.onHandshakeFailure();
                                }
         
                                LOG.debug("postHandshake DONE");
                            }
         
                            @Override
                            publicvoidonFailure(Throwable throwable){
                                LOG.warn("getting feature failed seriously [{}, addr:{}]: {}",xid,
                                         connectionAdapter.getRemoteAddress(),throwable.getMessage());
                                LOG.trace("DETAIL of sending of hello failure:",throwable);
                            }
                        },MoreExecutors.directExecutor());
                LOG.debug("future features [{}] hooked ..",xid);
            }

    get-features成功后,会调用handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);继续接下来的处理。

    HandshakeListenerImpl.onHandshakeSuccessful

    onHandshakeSuccessful方法逻辑:

    • 设置connectionContext状态为WORKING
    • 设置connectionContext.featuresReply为上一步调用get-features的返回
    • 设置connectionContext.nodeId为datapathId
    • 调用connectionContext.handshakeSuccessful(),创建DeviceInfoImpl对象
      • this.deviceInfo = new DeviceInfoImpl()
    • 最后,向switch发送barrier消息。如果成功回调addBarrierCallback()方法

      • 用于保证在switch之前的命令都已经被执行

        Java

        @Override public void onHandshakeSuccessful(final GetFeaturesOutput featureOutput, final Short version) { if (LOG.isDebugEnabled()) { LOG.debug("handshake succeeded: {}", connectionContext.getConnectionAdapter().getRemoteAddress()); } this.handshakeContext.close(); connectionContext.changeStateToWorking(); // 设置connectionContext的features信息 connectionContext.setFeatures(featureOutput); // nodeId是datapathId connectionContext.setNodeId(InventoryDataServiceUtil.nodeIdFromDatapathId(featureOutput.getDatapathId())); // 效果:connectionContext中new DeviceInfoImpl connectionContext.handshakeSuccessful();

        // fire barrier in order to sweep all handshake and posthandshake messages before continue final ListenableFuture<RpcResult<BarrierOutput>> barrier = fireBarrier(version, 0L); // barrier消息回调 Futures.addCallback(barrier, addBarrierCallback(), MoreExecutors.directExecutor()); }

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        @Override
            publicvoidonHandshakeSuccessful(finalGetFeaturesOutput featureOutput,finalShortversion){
                if(LOG.isDebugEnabled()){
                    LOG.debug("handshake succeeded: {}",connectionContext.getConnectionAdapter().getRemoteAddress());
                }
                this.handshakeContext.close();
                connectionContext.changeStateToWorking();
                // 设置connectionContext的features信息
                connectionContext.setFeatures(featureOutput);
                // nodeId是datapathId
                connectionContext.setNodeId(InventoryDataServiceUtil.nodeIdFromDatapathId(featureOutput.getDatapathId()));
                // 效果:connectionContext中new DeviceInfoImpl
                connectionContext.handshakeSuccessful();
         
                // fire barrier in order to sweep all handshake and posthandshake messages before continue
                finalListenableFuture<RpcResult<BarrierOutput>>barrier=fireBarrier(version,0L);
                // barrier消息回调
                Futures.addCallback(barrier,addBarrierCallback(),MoreExecutors.directExecutor());
            }

    HandshakeListenerImpl.addBarrierCallback

    addBarrierCallback()方法,核心逻辑deviceConnectedHandler.deviceConnected(connectionContext);,用于调用ContextChainHolderImpl.deviceConnected方法

    • deviceConnectedHandler变量是在ConnectionManagerImpl.onSwitchConnected方法,创建HandshakeListenerImpl对象时传入,即ContextChainHolderImpl

      Java

      private FutureCallback<RpcResult<BarrierOutput>> addBarrierCallback() { return new FutureCallback<RpcResult<BarrierOutput>>() { @Override @SuppressWarnings("checkstyle:IllegalCatch") public void onSuccess(@Nullable final RpcResult<BarrierOutput> result) { if (LOG.isDebugEnabled()) { LOG.debug("succeeded by getting sweep barrier after post-handshake for device {}", connectionContext.getDeviceInfo()); } try { // device连接成功: 调用ContextChainHolderImpl.deviceConnected() ConnectionStatus connectionStatusResult = deviceConnectedHandler.deviceConnected(connectionContext); if (connectionStatusResult != ConnectionStatus.MAY_CONTINUE) { connectionContext.closeConnection(false); } SessionStatistics.countEvent(connectionContext.getDeviceInfo().toString(), SessionStatistics.ConnectionStatus.CONNECTION_CREATED); } catch (final Exception e) { LOG.warn("initial processing failed for device {}", connectionContext.getDeviceInfo(), e); SessionStatistics.countEvent(connectionContext.getDeviceInfo().toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP); connectionContext.closeConnection(true); } }

      @Override public void onFailure(final Throwable throwable) { LOG.warn("failed to get sweep barrier after post-handshake for device {}", connectionContext.getDeviceInfo(), throwable); connectionContext.closeConnection(false); } }; }

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      privateFutureCallback<RpcResult<BarrierOutput>>addBarrierCallback(){
              returnnewFutureCallback<RpcResult<BarrierOutput>>(){
                  @Override
                  @SuppressWarnings("checkstyle:IllegalCatch")
                  publicvoidonSuccess(@NullablefinalRpcResult<BarrierOutput>result){
                      if(LOG.isDebugEnabled()){
                          LOG.debug("succeeded by getting sweep barrier after post-handshake for device {}",
                                  connectionContext.getDeviceInfo());
                      }
                      try{
                          // device连接成功: 调用ContextChainHolderImpl.deviceConnected()
                          ConnectionStatus connectionStatusResult=deviceConnectedHandler.deviceConnected(connectionContext);
                          if(connectionStatusResult!=ConnectionStatus.MAY_CONTINUE){
                              connectionContext.closeConnection(false);
                          }
                          SessionStatistics.countEvent(connectionContext.getDeviceInfo().toString(),
                                  SessionStatistics.ConnectionStatus.CONNECTION_CREATED);
                      }catch(finalExceptione){
                          LOG.warn("initial processing failed for device {}",connectionContext.getDeviceInfo(),e);
                          SessionStatistics.countEvent(connectionContext.getDeviceInfo().toString(),
                                  SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP);
                          connectionContext.closeConnection(true);
                      }
                  }
       
                  @Override
                  publicvoidonFailure(finalThrowable throwable){
                      LOG.warn("failed to get sweep barrier after post-handshake for device {}",
                              connectionContext.getDeviceInfo(),throwable);
                      connectionContext.closeConnection(false);
                  }
              };
          }

    ContextChainHolderImpl.deviceConnected

    当调用到ContextChainHolderImpl.deviceConnected方法时,代表switch已经与控制器完成handshake。在此方法中,除了处理辅助连接,最核心的是为第一次连上控制器的switch创建ContextChainImpl对象!调用createContextChain(connectionContext)方法,而后续的步骤已经不是handshake过程,是为switch创建各个context,并进行mastership选举等,本文不展开。

    Java

    @Override public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception { final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); final ContextChain contextChain = contextChainMap.get(deviceInfo); final FeaturesReply featuresReply = connectionContext.getFeatures(); final Short auxiliaryId = featuresReply != null ? featuresReply.getAuxiliaryId() : null;

    // 辅助连接 if (auxiliaryId != null && auxiliaryId != 0) { if (contextChain == null) { LOG.warn("An auxiliary connection for device {}, but no primary connection. Refusing connection.", deviceInfo); return ConnectionStatus.REFUSING_AUXILIARY_CONNECTION; } else { if (contextChain.addAuxiliaryConnection(connectionContext)) { LOG.info("An auxiliary connection was added to device: {}", deviceInfo); return ConnectionStatus.MAY_CONTINUE; } else { LOG.warn("Not able to add auxiliary connection to the device {}", deviceInfo); return ConnectionStatus.REFUSING_AUXILIARY_CONNECTION; } } } else { LOG.info("Device {} connected.", deviceInfo); final boolean contextExists = contextChain != null; final boolean isClosing = contextExists && contextChain.isClosing();

    if (!isClosing && connectingDevices.putIfAbsent(deviceInfo, connectionContext) != null) { LOG.warn("Device {} is already trying to connect, wait until succeeded or disconnected.", deviceInfo); return ConnectionStatus.ALREADY_CONNECTED; }

    if (contextExists) { if (isClosing) { LOG.warn("Device {} is already in termination state, closing all incoming connections.", deviceInfo); return ConnectionStatus.CLOSING; }

    LOG.warn("Device {} already connected. Closing previous connection", deviceInfo); destroyContextChain(deviceInfo); LOG.info("Old connection dropped, creating new context chain for device {}", deviceInfo); createContextChain(connectionContext); } else { // 设备第一次connect,需要创建contextChain LOG.info("No context chain found for device: {}, creating new.", deviceInfo); createContextChain(connectionContext); }

    return ConnectionStatus.MAY_CONTINUE; }

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    @Override
        publicConnectionStatus deviceConnected(finalConnectionContext connectionContext)throwsException{
            finalDeviceInfo deviceInfo=connectionContext.getDeviceInfo();
            finalContextChain contextChain=contextChainMap.get(deviceInfo);
            finalFeaturesReply featuresReply=connectionContext.getFeatures();
            finalShortauxiliaryId=featuresReply!=null?featuresReply.getAuxiliaryId():null;
     
            // 辅助连接
            if(auxiliaryId!=null&&auxiliaryId!=0){
                if(contextChain==null){
                    LOG.warn("An auxiliary connection for device {}, but no primary connection. Refusing connection.",
                             deviceInfo);
                    returnConnectionStatus.REFUSING_AUXILIARY_CONNECTION;
                }else{
                    if(contextChain.addAuxiliaryConnection(connectionContext)){
                        LOG.info("An auxiliary connection was added to device: {}",deviceInfo);
                        returnConnectionStatus.MAY_CONTINUE;
                    }else{
                        LOG.warn("Not able to add auxiliary connection to the device {}",deviceInfo);
                        returnConnectionStatus.REFUSING_AUXILIARY_CONNECTION;
                    }
                }
            }else{
                LOG.info("Device {} connected.",deviceInfo);
                finalbooleancontextExists=contextChain!=null;
                finalbooleanisClosing=contextExists&&contextChain.isClosing();
     
                if(!isClosing&&connectingDevices.putIfAbsent(deviceInfo,connectionContext)!=null){
                    LOG.warn("Device {} is already trying to connect, wait until succeeded or disconnected.",deviceInfo);
                    returnConnectionStatus.ALREADY_CONNECTED;
                }
     
                if(contextExists){
                    if(isClosing){
                        LOG.warn("Device {} is already in termination state, closing all incoming connections.",
                                 deviceInfo);
                        returnConnectionStatus.CLOSING;
                    }
     
                    LOG.warn("Device {} already connected. Closing previous connection",deviceInfo);
                    destroyContextChain(deviceInfo);
                    LOG.info("Old connection dropped, creating new context chain for device {}",deviceInfo);
                    createContextChain(connectionContext);
                }else{
                    // 设备第一次connect,需要创建contextChain
                    LOG.info("No context chain found for device: {}, creating new.",deviceInfo);
                    createContextChain(connectionContext);
                }
     
                returnConnectionStatus.MAY_CONTINUE;
            }
     
        }

    总结

    至此,我们看到了switch连上控制器,从TcpChannelInitializerContextChainHolderImpl,可以看到整个handshake过程的调用,更加认识到了ConnectionAdapterImpl对象就是与底层switch通信的关键封装对象。

    关键类/对象

    个人理解整理:

    table--odl-1.PNG


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK