47

(七)ODL Openflowplugin Switch断开控制器下线源码分析

 5 years ago
source link: https://www.sdnlab.com/22484.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发。

由于篇幅问题,我们将“Openflowplugin中Switch生命周期”这个大问题拆分为几个篇章:Switch生命周期对象ContextChain创建;控制节点的Master选举及ContextChain/Context服务实例化;MastershipChangeService以ReconciliationFramework;控制节点成为Slave;Switch下线过程。
本文为Openflowplugin(0.6.2)源码分析第七篇,分析当Switch与控制器连接断开,OFP会怎么处理

附:
第一篇:(一)ODL OpenflowPlugin启动流程源码分析
第二篇:(二)ODL Openflowplugin Switch连上控制器Handshake过程源码分析
第三篇:(三)ODL Openflowplugin Switch生命周期对象ContextChain创建源码分析
第四篇:(四)ODL Openflowplugin Master选举及Context服务实例化源码分析
第五篇:(五)ODL Openflowplugin Mastership及ReconciliationFramework源码分析
第六篇:(六)ODL Openflowplugin 控制器成为SLAVE过程源码分析

读者约定:基本掌握Opendaylight的思想/有一定实践经验,想要深入理解openflowplugin源码/想对openflowplugin源码修改。

之前笔记深入探讨了Switch上线Handshake、初始化、选举Master/Slave,触发北向应用,那么Switch与控制器连接断开下线过程是怎样的?在集群环境下,Switch与Master节点断连,会自动切换Master节点并重新初始化、选举Master/Slave,触发北向应用,其中是怎么实现的?

Switch下线过程

1.ConnectionAdapterImpl处理消息/事件

在第二篇笔记提及,在Switch连上控制器就会为每个Switch创建ConnectionAdapterImpl对象,在Handshake过程就会为ConnectionAdapterImpl对象传入对象引用SystemNotificationsListenerImpl

SystemNotificationsListenerImpl用于处理底层Switch是否与控制器连接相关的事件。在ConnectionAdapterImpl中可以看到:底层Switch事件DisconnectEventSwitchIdleEvent会调用SystemNotificationsListener的方法。而针对这两个事件的处理,DisconnectEvent会触发控制器回收Switch ContextChain等对象,而SwitchIdleEvent有可能触发控制器回收switch ContextChain等对象(即控制器主动掉与Switch连接)。

Java

@Override public void consumeDeviceMessage(final DataObject message) { LOG.debug("ConsumeIntern msg on {}", channel); if (disconnectOccured) { return; } if (message instanceof Notification) {

// System events if (message instanceof DisconnectEvent) { systemListener.onDisconnectEvent((DisconnectEvent) message); responseCache.invalidateAll(); disconnectOccured = true; } else if (message instanceof SwitchIdleEvent) { systemListener.onSwitchIdleEvent((SwitchIdleEvent) message); } ... } ... }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
    publicvoidconsumeDeviceMessage(finalDataObject message){
        LOG.debug("ConsumeIntern msg on {}",channel);
        if(disconnectOccured){
            return;
        }
        if(message instanceofNotification){
 
            // System events
            if(message instanceofDisconnectEvent){
                systemListener.onDisconnectEvent((DisconnectEvent)message);
                responseCache.invalidateAll();
                disconnectOccured=true;
            }elseif(message instanceofSwitchIdleEvent){
                systemListener.onSwitchIdleEvent((SwitchIdleEvent)message);
            }
            ...
        }
        ...
}

2.DisconnectEvent和SwitchIdleEvent事件触发

首先我们先来看看DisconnectEventSwitchIdleEvent事件是怎么触发的?

2.1 SwitchIdleEvent事件

首先回到TcpChannelInitializer.initChannel方法(Switch连上控制器最早触发的逻辑),会给channel加入IdleHandler,用于当IdleTimeout时间内没收到switch消息,就会触发IdleHandler.readTimedOut方法。

Java
ch.pipeline().addLast(PipelineHandlers.IDLE_HANDLER.name(), new IdleHandler(getSwitchIdleTimeout(), TimeUnit.MILLISECONDS));
1
ch.pipeline().addLast(PipelineHandlers.IDLE_HANDLER.name(),newIdleHandler(getSwitchIdleTimeout(),TimeUnit.MILLISECONDS));

IdleHandler.readTimedOut方法,会build一个SwitchIdleEvent事件,并调用ctx.fireChannelRead(builder.build())将事件传递给下一个Netty pipeline的Handler。

Java
@Override protected void readTimedOut(final ChannelHandlerContext ctx) throws Exception { if (first) { LOG.debug("Switch idle"); SwitchIdleEventBuilder builder = new SwitchIdleEventBuilder(); builder.setInfo("Switch idle"); ctx.fireChannelRead(builder.build()); first = false; } }
1
2
3
4
5
6
7
8
9
10
@Override
    protectedvoidreadTimedOut(finalChannelHandlerContext ctx)throwsException{
        if(first){
            LOG.debug("Switch idle");
            SwitchIdleEventBuilder builder=newSwitchIdleEventBuilder();
            builder.setInfo("Switch idle");
            ctx.fireChannelRead(builder.build());
            first=false;
        }
    }

ctx.fireChannelRead(builder.build())方法最终会调用ConnectionAdapterImpl.consumeDeviceMessage方法,即如上述所说调用SystemNotificationsListenerImpl的方法(onSwitchIdleEvent

Java
} else if (message instanceof SwitchIdleEvent) { systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
1
2
}elseif(message instanceofSwitchIdleEvent){
                systemListener.onSwitchIdleEvent((SwitchIdleEvent)message);

2.2 DisconnectEvent事件

可以看到idle事件调用过程中,DelegatingInboundHandler对象用于从channel读取消息/数据。而当DelegatingInboundHandler实现的是ChannelInboundHandlerAdapter接口,根据netty底层,当channel非active时会调用channelInactive方法,发出DisconnectEvent

Java
@Override public void channelInactive(final ChannelHandlerContext ctx) { LOG.debug("Channel inactive"); if (!inactiveMessageSent) { DisconnectEventBuilder builder = new DisconnectEventBuilder(); builder.setInfo("Channel inactive"); consumer.consume(builder.build()); inactiveMessageSent = true; } }
1
2
3
4
5
6
7
8
9
10
@Override
    publicvoidchannelInactive(finalChannelHandlerContext ctx){
        LOG.debug("Channel inactive");
        if(!inactiveMessageSent){
            DisconnectEventBuilder builder=newDisconnectEventBuilder();
            builder.setInfo("Channel inactive");
            consumer.consume(builder.build());
            inactiveMessageSent=true;
        }
    }

IdleEvent类似的,最终调用ConnectionAdapterImpl.consumeDeviceMessage方法。如上述所说调用SystemNotificationsListenerImpl的方法(onDisconnectEvent

Java
if (message instanceof DisconnectEvent) { systemListener.onDisconnectEvent((DisconnectEvent) message); responseCache.invalidateAll(); disconnectOccured = true;
1
2
3
4
if(message instanceofDisconnectEvent){
                systemListener.onDisconnectEvent((DisconnectEvent)message);
                responseCache.invalidateAll();
                disconnectOccured=true;

3.SystemNotificationsListener处理事件

3.1 处理SwitchIdleEvent事件

调用SystemNotificationsListenerImpl.onSwitchIdleEvent方法,说明底层switch在idleTimeout时间内没响应,在SystemNotificationsListenerImpl对象中处理。

Java
@Override public void onSwitchIdleEvent(final SwitchIdleEvent notification) { executorService.execute(this::executeOnSwitchIdleEvent); }
1
2
3
4
@Override
    publicvoidonSwitchIdleEvent(finalSwitchIdleEvent notification){
        executorService.execute(this::executeOnSwitchIdleEvent);
    }

而处理idle事件是调用executeOnSwitchIdleEvent方法,该方法控制器会尝试发出echo消息:
(1)如果底层switch回应了,那么控制器认为Switch还在线,不作处理。
(2)如果底层switch没回应,那么控制器任务Switch已经断开连接了,调用ConnectionContextImpl.closeConnection(true);方法来清除switch在控制器的连接及相关对象:

Java

private void executeOnSwitchIdleEvent() { boolean shouldBeDisconnected = true;

final InetSocketAddress remoteAddress = connectionContext.getConnectionAdapter().getRemoteAddress();

// 当状态是WORKING处理 if (ConnectionContext.CONNECTION_STATE.WORKING.equals(connectionContext.getConnectionState())) { FeaturesReply features = connectionContext.getFeatures(); LOG.info("Switch Idle state occurred, node={}|auxId={}", remoteAddress, features.getAuxiliaryId()); // 设置状态为TIMEOUTING connectionContext.changeStateToTimeouting(); EchoInputBuilder builder = new EchoInputBuilder(); builder.setVersion(features.getVersion()); builder.setXid(ECHO_XID.getValue());

// 发送echo消息 Future<RpcResult<EchoOutput>> echoReplyFuture = connectionContext.getConnectionAdapter().echo(builder.build());

try { RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(echoReplyTimeout, TimeUnit.MILLISECONDS); if (echoReplyValue.isSuccessful() && Objects.equals(echoReplyValue.getResult().getXid(), ECHO_XID.getValue())) { // 设置状态为WORKING connectionContext.changeStateToWorking(); shouldBeDisconnected = false; } else { logErrors(remoteAddress, echoReplyValue); } } catch (Exception e) { if (LOG.isWarnEnabled()) { LOG.warn("Exception while waiting for echoReply from [{}] in TIMEOUTING state: {}", remoteAddress, e.getMessage()); }

if (LOG.isTraceEnabled()) { LOG.trace("Exception while waiting for echoReply from [{}] in TIMEOUTING state: {}", remoteAddress, e); }

} } // switch没回应echo, 掉线, 断开连接 if (shouldBeDisconnected) { if (LOG.isInfoEnabled()) { LOG.info("ConnectionEvent:Closing connection as device is idle. Echo sent at {}. Device:{}, NodeId:{}", new Date(System.currentTimeMillis() - echoReplyTimeout), remoteAddress, connectionContext.getSafeNodeIdForLOG()); }

connectionContext.closeConnection(true); } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
privatevoidexecuteOnSwitchIdleEvent(){
        booleanshouldBeDisconnected=true;
 
        finalInetSocketAddress remoteAddress=connectionContext.getConnectionAdapter().getRemoteAddress();
 
        // 当状态是WORKING处理
        if(ConnectionContext.CONNECTION_STATE.WORKING.equals(connectionContext.getConnectionState())){
            FeaturesReply features=connectionContext.getFeatures();
            LOG.info("Switch Idle state occurred, node={}|auxId={}",remoteAddress,features.getAuxiliaryId());
            // 设置状态为TIMEOUTING
            connectionContext.changeStateToTimeouting();
            EchoInputBuilder builder=newEchoInputBuilder();
            builder.setVersion(features.getVersion());
            builder.setXid(ECHO_XID.getValue());
 
            // 发送echo消息
            Future<RpcResult<EchoOutput>>echoReplyFuture=
                    connectionContext.getConnectionAdapter().echo(builder.build());
 
            try{
                RpcResult<EchoOutput>echoReplyValue=echoReplyFuture.get(echoReplyTimeout,TimeUnit.MILLISECONDS);
                if(echoReplyValue.isSuccessful()
                        &&Objects.equals(echoReplyValue.getResult().getXid(),ECHO_XID.getValue())){
                    // 设置状态为WORKING
                    connectionContext.changeStateToWorking();
                    shouldBeDisconnected=false;
                }else{
                    logErrors(remoteAddress,echoReplyValue);
                }
            }catch(Exceptione){
                if(LOG.isWarnEnabled()){
                    LOG.warn("Exception while  waiting for echoReply from [{}] in TIMEOUTING state: {}",
                            remoteAddress,e.getMessage());
                }
 
                if(LOG.isTraceEnabled()){
                    LOG.trace("Exception while  waiting for echoReply from [{}] in TIMEOUTING state: {}",
                            remoteAddress,e);
                }
 
            }
        }
        // switch没回应echo, 掉线, 断开连接
        if(shouldBeDisconnected){
            if(LOG.isInfoEnabled()){
                LOG.info("ConnectionEvent:Closing connection as device is idle. Echo sent at {}. Device:{}, NodeId:{}",
                        newDate(System.currentTimeMillis()-echoReplyTimeout),
                        remoteAddress,connectionContext.getSafeNodeIdForLOG());
            }
 
            connectionContext.closeConnection(true);
        }
    }

调用ConnectionContextImpl.closeConnection(true);方法,最终调用disconnectDevice(true, true)方法。(onDisconnectEvent最终也会调用此方法,下面展开)

Java
@Override public void closeConnection(final boolean propagate) { disconnectDevice(propagate, true); }
1
2
3
4
@Override
    publicvoidcloseConnection(finalbooleanpropagate){
        disconnectDevice(propagate,true);
    }

3.2 处理DisconnectEvent事件

SystemNotificationsListenerImpl.onDisconnectEvent说明channel已经断开,会直接调用ConnectionContextImpl.onConnectionClosed();

Java
@Override public void onDisconnectEvent(final DisconnectEvent notification) { LOG.info("ConnectionEvent: Connection closed by device, Device:{}, NodeId:{}", connectionContext.getConnectionAdapter().getRemoteAddress(), connectionContext.getSafeNodeIdForLOG()); connectionContext.onConnectionClosed(); }
1
2
3
4
5
6
@Override
    publicvoidonDisconnectEvent(finalDisconnectEvent notification){
        LOG.info("ConnectionEvent: Connection closed by device, Device:{}, NodeId:{}",
                connectionContext.getConnectionAdapter().getRemoteAddress(),connectionContext.getSafeNodeIdForLOG());
        connectionContext.onConnectionClosed();
    }

ConnectionContextImpl.onConnectionClosed();方法,最终也是调用disconnectDevice(true, false)(与idleEvent不同的是传入第二个参数为false)。

Java
@Override public void onConnectionClosed() { disconnectDevice(true, false); }
1
2
3
4
@Override
    publicvoidonConnectionClosed(){
        disconnectDevice(true,false);
    }

4.Switch与控制器真正断连

ConnectionContextImpl.disconnectDevice的调用会分别被:IdleEvent引起以及DisconnectEvent引起。两个事件分别调用方法时,传入参数不同:

  • IdleEvent:disconnectDevice(true, true);
  • DisconnectEvent:disconnectDevice(true, false);

可以看到第二个参数传入不同,第二个参数的意思是disconnect device这个动作是控制器(ofp)主动发起还是device发起。IdleEvent是控制器监听不到心跳,是控制器主动发起断开,所以第二参数为true

我们深入ConnectionContextImpl.disconnectDevice逻辑:

Java

private void disconnectDevice(final boolean propagate, final boolean forced) { final String device = Objects.nonNull(nodeId) ? nodeId.getValue() : getConnectionAdapter().getRemoteAddress().toString(); final short auxiliaryId = Optional .ofNullable(getFeatures()) .flatMap(features -> Optional .ofNullable(features.getAuxiliaryId())) .orElse((short) 0);

if (connectionState == CONNECTION_STATE.RIP) { LOG.debug("Connection for device {} with auxiliary ID {} is already {}, so skipping closing.", device, auxiliaryId, getConnectionState()); return; }

connectionState = ConnectionContext.CONNECTION_STATE.RIP;

// force代表: 触发disconnectDevice动作是控制器还是设备 SessionStatistics.countEvent(device, forced ? SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP : SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_DEVICE);

LOG.debug("{}: device={} | auxiliaryId={} | connectionState={}", forced ? "Actively closing connection" : "Disconnecting", device, auxiliaryId, getConnectionState());

portStatusMessages.clear(); unregisterOutboundQueue(); closeHandshakeContext();

// idleEvent引起, forced为true, 此时connection应该还算active, 控制器主动断开channel if (forced && getConnectionAdapter().isAlive()) { getConnectionAdapter().disconnect(); }

if (propagate) { propagateDeviceDisconnectedEvent(); } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
privatevoiddisconnectDevice(finalbooleanpropagate,
                                  finalbooleanforced){
        finalStringdevice=
                Objects.nonNull(nodeId)?nodeId.getValue():getConnectionAdapter().getRemoteAddress().toString();
        finalshortauxiliaryId=Optional
                .ofNullable(getFeatures())
                .flatMap(features->Optional
                        .ofNullable(features.getAuxiliaryId()))
                .orElse((short)0);
 
        if(connectionState==CONNECTION_STATE.RIP){
            LOG.debug("Connection for device {} with auxiliary ID {} is already {}, so skipping closing.",
                    device,auxiliaryId,getConnectionState());
            return;
        }
 
        connectionState=ConnectionContext.CONNECTION_STATE.RIP;
 
        // force代表: 触发disconnectDevice动作是控制器还是设备
        SessionStatistics.countEvent(device,forced
                ?SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP
                :SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_DEVICE);
 
        LOG.debug("{}: device={} | auxiliaryId={} | connectionState={}",
                forced?"Actively closing connection":"Disconnecting",
                device,
                auxiliaryId,
                getConnectionState());
 
        portStatusMessages.clear();
        unregisterOutboundQueue();
        closeHandshakeContext();
 
        // idleEvent引起, forced为true, 此时connection应该还算active, 控制器主动断开channel
        if(forced&&getConnectionAdapter().isAlive()){
            getConnectionAdapter().disconnect();
        }
 
        if(propagate){
            propagateDeviceDisconnectedEvent();
        }
    }

ConnectionContextImpl.disconnectDevice方法,执行以下逻辑:
(1)unregisterOutboundQueue、关闭handshakeContext、修改状态等。
(2)如果是IdleEvent引起,且connection还是active,会主动disconnect关闭TCP连接(channel)。
(3)无论是IdleEvent还是DisconnectEvent引起,最终均会调用propagateDeviceDisconnectedEvent()方法:

Java
private void propagateDeviceDisconnectedEvent() { if (Objects.nonNull(deviceDisconnectedHandler)) { final BigInteger datapathId = featuresReply != null ? featuresReply.getDatapathId() : BigInteger.ZERO; if (LOG.isDebugEnabled()) { LOG.debug("Propagating connection closed event: {}, datapathId:{}.", connectionAdapter.getRemoteAddress(), datapathId); } // ContextChainHolderImpl.onDeviceDisconnected deviceDisconnectedHandler.onDeviceDisconnected(this); } }
1
2
3
4
5
6
7
8
9
10
11
privatevoidpropagateDeviceDisconnectedEvent(){
        if(Objects.nonNull(deviceDisconnectedHandler)){
            finalBigInteger datapathId=featuresReply!=null?featuresReply.getDatapathId():BigInteger.ZERO;
            if(LOG.isDebugEnabled()){
                LOG.debug("Propagating connection closed event: {}, datapathId:{}.",
                        connectionAdapter.getRemoteAddress(),datapathId);
            }
            // ContextChainHolderImpl.onDeviceDisconnected
            deviceDisconnectedHandler.onDeviceDisconnected(this);
        }
    }

propagateDeviceDisconnectedEvent方法,实际上就是调用deviceDisconnectedHandler.onDeviceDisconnected(this)方法。即调用ContextChainHolderImpl.onDeviceDisconnected方法。

Java

@Override public void onDeviceDisconnected(final ConnectionContext connectionContext) { final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();

Optional.ofNullable(connectionContext.getDeviceInfo()).map(contextChainMap::get).ifPresent(contextChain -> { if (contextChain.auxiliaryConnectionDropped(connectionContext)) { LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo); } else { LOG.info("Device {} disconnected.", deviceInfo); destroyContextChain(deviceInfo); } }); }

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
    publicvoidonDeviceDisconnected(finalConnectionContext connectionContext){
        finalDeviceInfo deviceInfo=connectionContext.getDeviceInfo();
 
        Optional.ofNullable(connectionContext.getDeviceInfo()).map(contextChainMap::get).ifPresent(contextChain->{
            if(contextChain.auxiliaryConnectionDropped(connectionContext)){
                LOG.info("Auxiliary connection from device {} disconnected.",deviceInfo);
            }else{
                LOG.info("Device {} disconnected.",deviceInfo);
                destroyContextChain(deviceInfo);
            }
        });
    }

此时,开始回收Switch在OFP中的生命周期相关对象及资源。从ContextChainHolderImpl中找到此需要disconnect的switch(device)的ContextChainImpl对象,如果此connection时辅助连接那仅仅需要删掉辅助连接的context;如果connection是与switch建立的主连接,调用destroyContextChain(deviceInfo);方法。

5.销毁ContextChain

destroyContextChain(deviceInfo)方法逻辑:
(1)通知注册到MastershipService的上层应用(原生/ReconciliationFramework),在前面几篇笔记有详细介绍。
(2)发送Device删除inventory的notification,实际上并不会删除YANG中node节点,此方法其实已被官方弃用,但为了向前兼容保持,实际删除node节点在下面步骤。
(3)调用contextChain.close()方法,会回收/删除ContextChain相关的一些列对象,包括Switch的各个Context(Device/Rpc/Role/Statistics)。此为最关键一步!

Java
private void destroyContextChain(final DeviceInfo deviceInfo) { // 通知注册到mastershipService的应用(原生/reconciliationFramework) ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo); Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { // 发送device删除inventory的通知 deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier()); /* 调用ContextChainImpl的close方法, 作用: 会回收/关闭switch在ofp层次相关的所有对象, 包括singleton service contextChain的关闭, 各个context的关闭等 */ contextChain.close(); }); }
1
2
3
4
5
6
7
8
9
10
11
12
privatevoiddestroyContextChain(finalDeviceInfo deviceInfo){
        // 通知注册到mastershipService的应用(原生/reconciliationFramework)
        ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo);
        Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain->{
            // 发送device删除inventory的通知
            deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier());
            /*
                调用ContextChainImpl的close方法, 作用: 会回收/关闭switch在ofp层次相关的所有对象, 包括singleton service contextChain的关闭, 各个context的关闭等
             */
            contextChain.close();
        });
    }

调用ContextChain.close清理对象/资源

ContextChain.close方法作为回收Switch在控制器节点中的所有对象/资源的入口。

Java

public void close() { if (ContextChainState.CLOSED.equals(contextChainState.get())) { LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo); return; }

// 设置状态CLOSED contextChainState.set(ContextChainState.CLOSED); /* 设置各个阶段master状态为false: registryFilling.set(false); initialSubmitting.set(false); initialGathering.set(false); masterStateOnDevice.set(false); rpcRegistration.set(false); */ unMasterMe();

// 关闭辅助连接 // Close all connections to devices auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false)); auxiliaryConnections.clear();

// If we are still registered and we are not already closing, then close the registration if (Objects.nonNull(registration)) { try { /* 此registeration是registerServices()方法中注册为singleton service的返回 registration = Objects.requireNonNull(clusterSingletonServiceProvider .registerClusterSingletonService(this));

结合mdsal源码此变量是对象:AbstractClusterSingletonServiceRegistration

最终会调用自身的方法this.closeServiceInstance()方法 */ registration.close(); registration = null; LOG.info("Closed clustering services registration for node {}", deviceInfo); } catch (final Exception e) { LOG.warn("Failed to close clustering services registration for node {} with exception: ", deviceInfo, e); } }

/* 调用所有context的close()方法: device/statistics/rpc/role 会关闭/回收各个资源对象 */ // Close all contexts (device, statistics, rpc) contexts.forEach(OFPContext::close); contexts.clear();

/* 调用各个manager.onDeviceRemoved方法: 作用都是删除manager中此device的context索引 DeviceManagerImpl RpcManagerImpl StatisticsManagerImpl RoleManagerImpl ContextChainHolderImpl */ // We are closing, so cleanup all managers now deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo)); deviceRemovedHandlers.clear();

// 保证关闭ConnectionContextImpl最终会调用ConnectionContextImpl.disconnectDevice(false, true) 回收handshakeContext等connection资源 primaryConnection.closeConnection(false);

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
publicvoidclose(){
        if(ContextChainState.CLOSED.equals(contextChainState.get())){
            LOG.debug("ContextChain for node {} is already in TERMINATION state.",deviceInfo);
            return;
        }
 
        // 设置状态CLOSED
        contextChainState.set(ContextChainState.CLOSED);
        /*
            设置各个阶段master状态为false:
                registryFilling.set(false);
                initialSubmitting.set(false);
                initialGathering.set(false);
                masterStateOnDevice.set(false);
                rpcRegistration.set(false);
         */
        unMasterMe();
 
        // 关闭辅助连接
        // Close all connections to devices
        auxiliaryConnections.forEach(connectionContext->connectionContext.closeConnection(false));
        auxiliaryConnections.clear();
 
        // If we are still registered and we are not already closing, then close the registration
        if(Objects.nonNull(registration)){
            try{
                /*
                    此registeration是registerServices()方法中注册为singleton service的返回
                            registration = Objects.requireNonNull(clusterSingletonServiceProvider
                                                .registerClusterSingletonService(this));
 
                    结合mdsal源码此变量是对象:AbstractClusterSingletonServiceRegistration
 
                    最终会调用自身的方法this.closeServiceInstance()方法
                 */
                registration.close();
                registration=null;
                LOG.info("Closed clustering services registration for node {}",deviceInfo);
            }catch(finalExceptione){
                LOG.warn("Failed to close clustering services registration for node {} with exception: ",
                        deviceInfo,e);
            }
        }
 
 
        /*
            调用所有context的close()方法: device/statistics/rpc/role
            会关闭/回收各个资源对象
         */
        // Close all contexts (device, statistics, rpc)
        contexts.forEach(OFPContext::close);
        contexts.clear();
 
        /*
            调用各个manager.onDeviceRemoved方法: 作用都是删除manager中此device的context索引
            DeviceManagerImpl
            RpcManagerImpl
            StatisticsManagerImpl
            RoleManagerImpl
            ContextChainHolderImpl
         */
        // We are closing, so cleanup all managers now
        deviceRemovedHandlers.forEach(h->h.onDeviceRemoved(deviceInfo));
        deviceRemovedHandlers.clear();
 
        // 保证关闭ConnectionContextImpl最终会调用ConnectionContextImpl.disconnectDevice(false, true) 回收handshakeContext等connection资源
        primaryConnection.closeConnection(false);
 
    }

用于关闭switch的ConnectionContextImpl的大入口,具体处理了:

  • 1.设置状态CLOSED,设置相关标志位false(在初始化过程设置)
  • 2.关闭所有辅助连接
  • 3.registration.close();,关闭ConnectionContextImpl singleton service,会触发执行ConnectionContextImpl.closeServiceInstance方法,最终会调用各个contect的closeServiceInstance方法
    • registration是registerClusterSingletonService(this)的返回,调用此.close()会关闭singleton service
  • 4.调用各个context的close方法
  • 5.清除各个context manager中switch对应的context索引
  • 6.primaryConnection.closeConnection(false);最终会调用ConnectionContextImpl.disconnectDevice(false, true)保证关闭ConnectionContextImpl会回收handshakeContext等connection资源
    • ConnectionContextImpl.disconnectDevice方法在上面IdleEvent/DisconnectedEvent过程中调用了

这里,我们额外关注一个调用:registration.close(),其会导致contextChain作为singleton运行的service关闭(回顾前文:ContextChain为作为singleton service在Switch的Master节点运行并实例化服务)。而这里则是关闭服务实例!

Java

@Override public ListenableFuture<Void> closeServiceInstance() {

// 调用ContextChainHolderImpl.onSlaveRoleAcquired, 效果是触发注册了mastershipService的上层应用 contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);

/* 调用各个context的closeServiceInstance方法 device: 关闭transactionChainManager rpc: unregister rpc statistics: 关闭数据收集 role: changeLastRoleFuture */ final ListenableFuture<List<Void>> servicesToBeClosed = Futures .allAsList(Lists.reverse(contexts) .stream() .map(OFPContext::closeServiceInstance) .collect(Collectors.toList()));

return Futures.transform(servicesToBeClosed, (input) -> { LOG.info("Closed clustering services for node {}", deviceInfo); return null; }, executorService); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
    publicListenableFuture<Void>closeServiceInstance(){
 
        // 调用ContextChainHolderImpl.onSlaveRoleAcquired, 效果是触发注册了mastershipService的上层应用
        contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
 
        /*
            调用各个context的closeServiceInstance方法
                device: 关闭transactionChainManager
                rpc: unregister rpc
                statistics: 关闭数据收集
                role: changeLastRoleFuture
         */
        finalListenableFuture<List<Void>>servicesToBeClosed=Futures
                .allAsList(Lists.reverse(contexts)
                        .stream()
                        .map(OFPContext::closeServiceInstance)
                        .collect(Collectors.toList()));
 
        returnFutures.transform(servicesToBeClosed,(input)->{
            LOG.info("Closed clustering services for node {}",deviceInfo);
            returnnull;
        },executorService);
    }

6.Master控制节点改变(Singleton迁移)

注意,在Switch连上多个控制器情况下,如果Switch与其Master节点断开,会触发上述过程。那么Switch与另外两个控制器仍然保持连接,此时会怎样呢?

答案是:会在另外某个控制节点重新选举出Master,然后重新经历第四篇笔记过程,ODL Openflowplugin Master选举及Context服务实例化。

在上一步我们可以看到,执行了registration.close()方法,如果是Master节点上执行,那么Singleton service就会关闭,触发其他控制器节点上Master重新选举,然后重复Context服务实例化过程。

这恰恰就是OFP实现高可用的设计:控制器集群,每个Switch在Master节点上映射一个Singleton service!

7.删除YANG(Inventory)中的Node节点

如果Switch完全从所有控制器节点下线后,除了上述资源/对象的回收,控制器还会处理什么?换一个问题,在上述资源/对象的回收过程中,没有看到Switch(Node)在Inventory的Yang树被清理,那么YANG是怎么清理的呢?

Switch(Node)在Inventory Yang树被清理是在ContextChainHolderImpl中的ownershipChanged方法!

后续涉及ODL中另一个集群关键服务:EntityOwnershipService,是ODL集群下的一个实现。Singleton Service就是基于EntityOwnershipService实现的!如果读者不熟悉,可以在我后续Singleton Service笔记后再回顾。

根据ContextChainHolderImpl的实现,我们发现其实现了接口EntityOwnershipListener。在创建ContextChainHolderImpl的构造器中监听了EOS:

Java

private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";

this.eosListenerRegistration = Objects.requireNonNull(entityOwnershipService.registerListener(ASYNC_SERVICE_ENTITY_TYPE, this));

1
2
3
privatestaticfinalStringASYNC_SERVICE_ENTITY_TYPE="org.opendaylight.mdsal.AsyncServiceCloseEntityType";
 
this.eosListenerRegistration=Objects.requireNonNull(entityOwnershipService.registerListener(ASYNC_SERVICE_ENTITY_TYPE,this));

并有如下方法实现:

Java

@Override @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE") public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) { if (entityOwnershipChange.getState().hasOwner()) { return; }

/* contextChainImpl是一个singleton service. 获取entityName是device id */ final String entityName = entityOwnershipChange .getEntity() .getIdentifier() .firstKeyOf(Entity.class) .getName();

if (Objects.nonNull(entityName)) { LOG.debug("Entity {} has no owner", entityName); try { final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = DeviceStateUtil.createNodeInstanceIdentifier(new NodeId(entityName)); // 发送notification通知inventory树要删除节点, 在opendaylight-inventory.yang中看到此方式不会删除节点,已被废弃Deprecated deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier);

LOG.info("Try to remove device {} from operational DS", entityName); // 删除inventory yang树的node节点 deviceManager.removeDeviceFromOperationalDS(nodeInstanceIdentifier) .get(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS); LOG.info("Removing device from operational DS {} was successful", entityName); } catch (TimeoutException | ExecutionException | NullPointerException | InterruptedException e) { LOG.warn("Not able to remove device {} from operational DS. ", entityName, e); } } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
    @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
    publicvoidownershipChanged(EntityOwnershipChange entityOwnershipChange){
        if(entityOwnershipChange.getState().hasOwner()){
            return;
        }
 
        /*
            contextChainImpl是一个singleton service.
            获取entityName是device id
         */
        finalStringentityName=entityOwnershipChange
                .getEntity()
                .getIdentifier()
                .firstKeyOf(Entity.class)
                .getName();
 
        if(Objects.nonNull(entityName)){
            LOG.debug("Entity {} has no owner",entityName);
            try{
                finalKeyedInstanceIdentifier<Node,NodeKey>nodeInstanceIdentifier=
                        DeviceStateUtil.createNodeInstanceIdentifier(newNodeId(entityName));
                // 发送notification通知inventory树要删除节点, 在opendaylight-inventory.yang中看到此方式不会删除节点,已被废弃Deprecated
                deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier);
 
                LOG.info("Try to remove device {} from operational DS",entityName);
                // 删除inventory yang树的node节点
                deviceManager.removeDeviceFromOperationalDS(nodeInstanceIdentifier)
                        .get(REMOVE_DEVICE_FROM_DS_TIMEOUT,TimeUnit.MILLISECONDS);
                LOG.info("Removing device from operational DS {} was successful",entityName);
            }catch(TimeoutException|ExecutionException|NullPointerException|InterruptedExceptione){
                LOG.warn("Not able to remove device {} from operational DS. ",entityName,e);
            }
        }
    }

ContextChainHolderImpl监听了类型是org.opendaylight.mdsal.AsyncServiceCloseEntityType的Entity ownership变化事件,作出动作。

这里给一些上下文供读者理解,每个Switch在控制器上都有一个ContextChain,其作为Singleton service运行,而Singleton service的底层实现是EntityOwnershipService。Singleton service会将每个Switch作为Entity往底层EOS注册,并且通过EOS在控制器集群中选举出Entity的Master节点,最终反映出来就是Singleton service的Master节点!

而这里我们监听的是org.opendaylight.mdsal.AsyncServiceCloseEntityType类型的Entity。在Singleton service中,为会Switch创建此类型的Entity并注册到EOS。这个类型的Entity是当Singleton service关闭时会改变其Ownership。当此类型entity没有owner,证明Singleton service在各个节点都关闭了(registration.close())!

所以,当监听到到此类型的Entity变化,且状态是没有owner情况下,即代表Switch与各个控制器节点都断开连接!所以,需要删除Inventory Yang的node节点!这样,yang完成了删除!

总结

从Switch断开控制器过程来看,触发Switch与控制器断开有两种类型:Switch主动断开(DisconnectEvent)、控制器主动断开(SwitchIdleEvent)。而断开有可能是真正网络连通性问题或者Switch挂了,导致控制器没有收到Switch回复心跳包,从而触发控制器主动断开连接。也有可能网络正常然而Switch数量太多控制器性能不足,导致控制器无法回复心跳,底层Switch主动断开连接!

Switch下线过程关键是理解DisconnectEvent和SwitchIdleEvent,而后续过程更多是回收消耗资源。同时,在这个过程中我们看到了Singleton service和EOS的身影,这两个是ODL提供集群的核心,除了在OFP使用之外,很多ODL南向插件都也是使用两者来构建集群应用!我们下回分解。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK