53

ODL Netconf应用层keepalive机制实现

 5 years ago
source link: https://www.sdnlab.com/23017.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发。

本文实现分析基于ODL Netconf版本1.4.2;
读者约定:了解netconf协议;ODL Netconf中请求底层设备RPC的基本实现;

背景

将device添加到netconf-topology Yang时,会触发Netconf底层创建一些列对象,并主动连接device。在这个过程中,会创建NetconfDeviceSalFacade对象封装对底层设备的操作。

Netconf协议并没有定义链接的keepalive机制,而是由底层的SSH/TLS实现链接的keepalive。但是,在应用层ODL Netconf实现了一套keepalive机制,在netconf-topology中定义的node节点,有一个属性keepaliveDelay,如果其大于0,就会为device创建KeepaliveSalFacade对象,其再封装刚刚提及的NetconfDeviceSalFacade对象。在KeepaliveSalFacade中,实现了从ODL作为client端到底层设备server端的keepalive,具体在下文展开。

KeepaliveSalFacade的实现

KeepaliveSalFacade实现了RemoteDeviceHandler接口,有几个关键的方法:

Java
void onDeviceConnected(SchemaContext remoteSchemaContext, PREF netconfSessionPreferences, DOMRpcService deviceRpc); void onDeviceDisconnected(); void onDeviceFailed(Throwable throwable); void onNotification(DOMNotification domNotification); void close();
1
2
3
4
5
6
voidonDeviceConnected(SchemaContext remoteSchemaContext,
                           PREF netconfSessionPreferences,DOMRpcService deviceRpc);
    voidonDeviceDisconnected();
    voidonDeviceFailed(Throwable throwable);
    voidonNotification(DOMNotification domNotification);
    voidclose();

当设备连上控制(connected)会调用onDeviceConnected方法,该方法实现了:

  • 创建KeepaliveDOMRpcService对象,封装NetconfDeviceRpc;
  • 代理调用其封装的NetconfDeviceSalFacade对象的onDeviceConnected方法;
  • 调用scheduleKeepalive实现应用层的keepalive机制;
Java

@Override public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) { this.currentDeviceRpc = deviceRpc; final DOMRpcService deviceRpc1 = new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor); salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1);

LOG.debug("{}: Netconf session initiated, starting keepalives", id); scheduleKeepalive(); }

1
2
3
4
5
6
7
8
9
10
11
@Override
    publicvoidonDeviceConnected(finalSchemaContext remoteSchemaContext,
                          finalNetconfSessionPreferences netconfSessionPreferences,finalDOMRpcService deviceRpc){
        this.currentDeviceRpc=deviceRpc;
        finalDOMRpcService deviceRpc1=
                newKeepaliveDOMRpcService(deviceRpc,resetKeepaliveTask,defaultRequestTimeoutMillis,executor);
        salFacade.onDeviceConnected(remoteSchemaContext,netconfSessionPreferences,deviceRpc1);
 
        LOG.debug("{}: Netconf session initiated, starting keepalives",id);
        scheduleKeepalive();
    }

可以看到在onDeviceConnected方法中,ODL Netconf有两个keepalive实现:KeepaliveDOMRpcService以及scheduleKeepalive,下面我们一一展开。

代理NetconfDeviceRpc

onDeviceConnected方法中创建的KeepaliveDOMRpcService对象,实际上是代理了NetconfDeviceRpc对象,并实现了keepalive机制。KeepaliveDOMRpcService同样实现了DOMRpcService接口,其中定义了我们应用调用底层设备的方法invokeRpc

在KeepaliveDOMRpcService中实现的invoke方法:

  • 代理调用其封装的NetconfDeviceRpc对象的invokeRpc方法,并回调ResetKeepalive(resetKeepaliveTask),我们再下面展开;
  • schedule设置超时时间(默认60s),调用RequestTimeoutTask;
    • RequestTimeoutTask实现如果domRpcResultDOMRpcExceptionCheckedFuture还没完成,则调用.cancel报错;
Java

@Override public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull final SchemaPath type, final NormalizedNode<?, ?> input) { final CheckedFuture<DOMRpcResult, DOMRpcException> domRpcResultDOMRpcExceptionCheckedFuture = deviceRpc.invokeRpc(type, input); Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask, MoreExecutors.directExecutor());

final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(domRpcResultDOMRpcExceptionCheckedFuture); executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);

return domRpcResultDOMRpcExceptionCheckedFuture; }

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
        publicCheckedFuture<DOMRpcResult,DOMRpcException>invokeRpc(@NonnullfinalSchemaPath type,
                                                                      finalNormalizedNode<?,?>input){
            finalCheckedFuture<DOMRpcResult,DOMRpcException>domRpcResultDOMRpcExceptionCheckedFuture=
                    deviceRpc.invokeRpc(type,input);
            Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture,resetKeepaliveTask,
                                MoreExecutors.directExecutor());
 
            finalRequestTimeoutTask timeoutTask=newRequestTimeoutTask(domRpcResultDOMRpcExceptionCheckedFuture);
            executor.schedule(timeoutTask,defaultRequestTimeoutMillis,TimeUnit.MILLISECONDS);
 
            returndomRpcResultDOMRpcExceptionCheckedFuture;
        }

在这个方法中代理了NetconfDeviceRpc对象的invokeRpc方法调用,并设置了回调ResetKeepalive,在ResetKeepalive中实现了invokeRpc调用底层设备成功与否的相应操作:

  • 如果调用底层成功(有回复),调用resetKeepalive方法重新设置keepalive,其会调用scheduleKeepalive方法。
    • 这里与onDeviceConnected最后调用相同的scheduleKeepalive方法,我们在下面展开;
  • 如果调用底层失败(连接不通),调用reconnect方法停止keepalive机制,并重新连接底层设备;
    • 实际上会调用底层netty channel close,如果使用SSH情况下,会触发与底层ssh channel断开;
Java

/** * Reset keepalive after each RPC response received. */ private class ResetKeepalive implements FutureCallback<DOMRpcResult> { @Override public void onSuccess(@Nullable final DOMRpcResult result) { // No matter what response we got, // rpc-reply or rpc-error, we got it from device so the netconf session is OK. resetKeepalive(); }

@Override public void onFailure(@Nonnull final Throwable throwable) { // User/Application RPC failed (The RPC did not reach the remote device or .. // TODO what other reasons could cause this ?) // There is no point in keeping this session. Reconnect. LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable); reconnect(); } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
     * Reset keepalive after each RPC response received.
     */
    privateclassResetKeepaliveimplementsFutureCallback<DOMRpcResult>{
        @Override
        publicvoidonSuccess(@NullablefinalDOMRpcResult result){
            // No matter what response we got,
            // rpc-reply or rpc-error, we got it from device so the netconf session is OK.
            resetKeepalive();
        }
 
        @Override
        publicvoidonFailure(@NonnullfinalThrowable throwable){
            // User/Application RPC failed (The RPC did not reach the remote device or ..
            // TODO what other reasons could cause this ?)
            // There is no point in keeping this session. Reconnect.
            LOG.warn("{}: Rpc failure detected. Reconnecting netconf session",id,throwable);
            reconnect();
        }
    }

应用层keepalive心跳时间实现

ODL Netconf就是在KeepaliveSalFacade的内部类Keepalive中实现应用层的keepalive机制。在展开ODL Netconf Keepalive机制之前,我们先回顾一下上文,在两种情况下都会调用scheduleKeepalive方法:

  • 当设备与控制器建立链接onDeviceConnected时,最后会调用scheduleKeepalive方法;
  • 当调用底层设备invokeRpc成功后,resetKeepalive重置keepalive会调用scheduleKeepalive方法;

在KeepaliveSalFacade中的scheduleKeepalive方法封装了对Keepalive类调用。其核心实现逻辑就是通过ScheduledExecutorService实现延迟keepaliveDelaySeconds时间调用Keepalive对象。

Java
private void scheduleKeepalive() { Preconditions.checkState(currentDeviceRpc != null); LOG.trace("{}: Scheduling next keepalive in {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS); currentKeepalive = executor.schedule(new Keepalive(currentKeepalive), keepaliveDelaySeconds, TimeUnit.SECONDS); }
1
2
3
4
5
privatevoidscheduleKeepalive(){
        Preconditions.checkState(currentDeviceRpc!=null);
        LOG.trace("{}: Scheduling next keepalive in {} {}",id,keepaliveDelaySeconds,TimeUnit.SECONDS);
        currentKeepalive=executor.schedule(newKeepalive(currentKeepalive),keepaliveDelaySeconds,TimeUnit.SECONDS);
    }

无论哪种情况调用scheduleKeepalive方法,都是与底层链接是正常的(刚连上,刚调用成功),所以不需要立即调用Keepalive类,而是延迟一定时间。下面展开Keepalive类中也会调用scheduleKeepalive方法,都是通过这个keepaliveDelaySeconds实现心跳的keepalive探测。可以看出keepaliveDelaySeconds时间,其实是心跳时间。

心跳时间(keepaliveDelaySeconds)定义在:netconf-node-topology yang中node的属性keepalive-delay

Java
// Keepalive configuration leaf keepalive-delay { config true; type uint32; default 120; description "Netconf connector sends keepalive RPCs while the session is idle, this delay specifies the delay between keepalive RPC in seconds If a value <1 is provided, no keepalives will be sent"; }
1
2
3
4
5
6
7
8
  // Keepalive configuration
        leafkeepalive-delay{
            config true;
            type uint32;
            default120;
            description"Netconf connector sends keepalive RPCs while the session is idle, this delay specifies the delay between keepalive RPC in seconds
                         If a value <1 is provided, no keepalives will be sent";
        }

ODL应用层keepalive实现

ODL Netconf应用层keepalive实现在Keepalive类,Keepalive类作为一个Runable对象,run()方法主体逻辑实现异步机制,并将自身实现作为回调对象:

Java
private class Keepalive implements Runnable, FutureCallback<DOMRpcResult> { ... }
1
2
3
privateclassKeepalive implementsRunnable,FutureCallback<DOMRpcResult>{
    ...
}

Keepalive类核心逻辑是向底层设备请求一个get-config的RPC通过应用层请求成功与否判断连接是否连接,逻辑细节:

  • 先判断前一个keepalive对象实现请求底层设备是否成功:
  • 1.如果失败则调用onFailure方法,其会发起重新连接底层设备;
  • 2.如果成功,则继续向发起一个请求invokeRpc到底层设备,并将自身作为异步回调对象;
    • 心跳实现,在回调的onSuccess方法中,下面再展开。
Java

// Keepalive RPC static resources private static final SchemaPath PATH = toPath(NETCONF_GET_CONFIG_QNAME); private static final ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME, getSourceNode(NETCONF_RUNNING_QNAME), NetconfMessageTransformUtil.EMPTY_FILTER);

... @Override public void run() { LOG.trace("{}: Invoking keepalive RPC", id);

try { if (previousKeepalive != null && !previousKeepalive.isDone()) { onFailure(new IllegalStateException("Previous keepalive timed out")); } else { // get-config 请求底层设备 Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this, MoreExecutors.directExecutor()); } } catch (NullPointerException e) { LOG.debug("{}: Skipping keepalive while reconnecting", id); // Empty catch block intentional // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and // attempted to send keepalive while we were reconnecting. Next keepalive will be scheduled // after reconnect so no action necessary here. } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Keepalive RPC static resources
    privatestaticfinalSchemaPath PATH=toPath(NETCONF_GET_CONFIG_QNAME);
    privatestaticfinalContainerNode KEEPALIVE_PAYLOAD=NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME,
            getSourceNode(NETCONF_RUNNING_QNAME),NetconfMessageTransformUtil.EMPTY_FILTER);
 
    ...
        @Override
        publicvoidrun(){
            LOG.trace("{}: Invoking keepalive RPC",id);
 
            try{
                if(previousKeepalive!=null&&!previousKeepalive.isDone()){
                    onFailure(newIllegalStateException("Previous keepalive timed out"));
                }else{
                    // get-config 请求底层设备
                    Futures.addCallback(currentDeviceRpc.invokeRpc(PATH,KEEPALIVE_PAYLOAD),this,
                                        MoreExecutors.directExecutor());
                }
            }catch(NullPointerExceptione){
                LOG.debug("{}: Skipping keepalive while reconnecting",id);
                // Empty catch block intentional
                // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and
                // attempted to send keepalive while we were reconnecting. Next keepalive will be scheduled
                // after reconnect so no action necessary here.
            }
        }

通过上面,我们可以了解到ODL Netconf是通过get-config这个标准netconf rpc实现应用层的链接探测,当调用底层设备rpc成功会回调onSuccess方法,其实现如下:

  • 只要不是链接断开导致的失败,无论底层返回rpc成功/失败,都说明连接是正常的,然后调用scheduleKeepalive方法,如上述该方法延迟调用Keepalive类实现心跳
Java
@Override public void onSuccess(final DOMRpcResult result) { // No matter what response we got, rpc-reply or rpc-error, // we got it from device so the netconf session is OK if (result != null && result.getResult() != null) { LOG.debug("{}: Keepalive RPC successful with response: {}", id, result.getResult()); scheduleKeepalive(); } else if (result != null && result.getErrors() != null) { LOG.warn("{}: Keepalive RPC failed with error: {}", id, result.getErrors()); scheduleKeepalive(); } else { LOG.warn("{} Keepalive RPC returned null with response: {}. Reconnecting netconf session", id, result); reconnect(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
        publicvoidonSuccess(finalDOMRpcResult result){
            // No matter what response we got, rpc-reply or rpc-error,
            // we got it from device so the netconf session is OK
            if(result!=null&&result.getResult()!=null){
                LOG.debug("{}: Keepalive RPC successful with response: {}",id,result.getResult());
                scheduleKeepalive();
            }elseif(result!=null&&result.getErrors()!=null){
                LOG.warn("{}: Keepalive RPC failed with error: {}",id,result.getErrors());
                scheduleKeepalive();
            }else{
                LOG.warn("{} Keepalive RPC returned null with response: {}. Reconnecting netconf session",id,result);
                reconnect();
            }
        }

fluorine版本改进

在fluorine版本中,keepalive的实现有很大的改进:

  • 心跳通过scheduleWithFixedDelay定时调用来实现,在上面的1.4.2版本时其心跳需要在Keepalive类中递归调用(通过schedule方法仅延时调用);
  • 旧版本实现有一个明显的缺点,只有当底层回复了探测请求才会异步回调onSuccess方法,进而递归调用下一次心跳探测。那么会存在一个问题,当底层channel异常或不知名异常下,阻塞了异步回调那么就会导致心跳探测阻塞,没有实现心跳探测该有的作用。在新版中,定时调用结合标识位可以更加主动探测到异常。
Java

private void scheduleKeepalives() { lastKeepAliveSucceeded.set(true); Preconditions.checkState(currentDeviceRpc != null); LOG.trace("{}: Scheduling keepalives every {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS); // 定时调用 currentKeepalive = executor.scheduleWithFixedDelay(new Keepalive(), keepaliveDelaySeconds, keepaliveDelaySeconds, TimeUnit.SECONDS); }

private class Keepalive implements Runnable, FutureCallback<DOMRpcResult> {

@Override public void run() { LOG.trace("{}: Invoking keepalive RPC", id);

try { // 判断标识位 boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false); if (!lastJobSucceeded) { onFailure(new IllegalStateException("Previous keepalive timed out")); } else { Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this, MoreExecutors.directExecutor()); } } catch (NullPointerException e) { LOG.debug("{}: Skipping keepalive while reconnecting", id); // Empty catch block intentional // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and // attempted to send keepalive while we were reconnecting. Next keepalive will be scheduled // after reconnect so no action necessary here. } }

@Override public void onSuccess(final DOMRpcResult result) { // No matter what response we got, rpc-reply or rpc-error, // we got it from device so the netconf session is OK if (result != null && result.getResult() != null) { LOG.debug("{}: Keepalive RPC successful with response: {}", id, result.getResult()); // 设置标识位 lastKeepAliveSucceeded.set(true); } else if (result != null && result.getErrors() != null) { LOG.warn("{}: Keepalive RPC failed with error: {}", id, result.getErrors()); // 设置标识位 lastKeepAliveSucceeded.set(true); } else { LOG.warn("{} Keepalive RPC returned null with response: {}. Reconnecting netconf session", id, result); reconnect(); } }

@Override public void onFailure(@Nonnull final Throwable throwable) { LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable); reconnect(); } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
privatevoidscheduleKeepalives(){
        lastKeepAliveSucceeded.set(true);
        Preconditions.checkState(currentDeviceRpc!=null);
        LOG.trace("{}: Scheduling keepalives every  {} {}",id,keepaliveDelaySeconds,TimeUnit.SECONDS);
        // 定时调用
        currentKeepalive=executor.scheduleWithFixedDelay(newKeepalive(),
                keepaliveDelaySeconds,keepaliveDelaySeconds,TimeUnit.SECONDS);
    }
 
    privateclassKeepalive implementsRunnable,FutureCallback<DOMRpcResult>{
 
        @Override
        publicvoidrun(){
            LOG.trace("{}: Invoking keepalive RPC",id);
 
            try{
                // 判断标识位
                booleanlastJobSucceeded=lastKeepAliveSucceeded.getAndSet(false);
                if(!lastJobSucceeded){
                    onFailure(newIllegalStateException("Previous keepalive timed out"));
                }else{
                    Futures.addCallback(currentDeviceRpc.invokeRpc(PATH,KEEPALIVE_PAYLOAD),this,
                                        MoreExecutors.directExecutor());
                }
            }catch(NullPointerExceptione){
                LOG.debug("{}: Skipping keepalive while reconnecting",id);
                // Empty catch block intentional
                // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and
                // attempted to send keepalive while we were reconnecting. Next keepalive will be scheduled
                // after reconnect so no action necessary here.
            }
        }
 
        @Override
        publicvoidonSuccess(finalDOMRpcResult result){
            // No matter what response we got, rpc-reply or rpc-error,
            // we got it from device so the netconf session is OK
            if(result!=null&&result.getResult()!=null){
                LOG.debug("{}: Keepalive RPC successful with response: {}",id,result.getResult());
                // 设置标识位
                lastKeepAliveSucceeded.set(true);
            }elseif(result!=null&&result.getErrors()!=null){
                LOG.warn("{}: Keepalive RPC failed with error: {}",id,result.getErrors());
                // 设置标识位
                lastKeepAliveSucceeded.set(true);
            }else{
                LOG.warn("{} Keepalive RPC returned null with response: {}. Reconnecting netconf session",id,result);
                reconnect();
            }
        }
 
        @Override
        publicvoidonFailure(@NonnullfinalThrowable throwable){
            LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.",id,throwable);
            reconnect();
        }
    }

相关commit:NECONF-524 : Setting the netconf keepalive logic to be more proactive.

TL;DR

ODL Netconf在应用层实现了Netconf的keepalive。

  • 通过KeepaliveSalFacade类代理NetconfDeviceSalFacade类,实现了应用层Keepalive,相关keepalive实现都在KeepaliveSalFacade中;
  • 通过ScheduledExecutorService.schedule延迟调用Runnable对象来实现心跳时间(fluorine之前的版本);在fluorine版本后,通过scheduleWithFixedDelay定时调用实现心跳探测;
  • 通过向底层设备发起标准rpc get-config来实现应用层的链接探测;
  • 当1)设备连上控制器、2)向底层设备发起RPC调用成功、3)get-config rpc探测心跳非异,这三种情况下,会开始调度发起下一次应用层keepalive探测;

Reference

https://tools.ietf.org/html/draft-ietf-netconf-server-model-05#section-5


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK