10

DDS与FastRTPS

 3 years ago
source link: https://paul.pub/dds-and-fastrtps/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

DDS是一套通信协议和API标准,它提供了以数据为中心的连接服务。Fast-RTPS是DDS的开源实现,借助它可以方便的开发出高效,可靠的分布式系统。本文是对DDS以及Fast RTPS的介绍文章。

bg.jpg

从《变形金刚》电影说起

这里要提到的是2011年的真人版电影,变形金刚第三部《Transformers: Dark of the Moon》。

这是一篇技术文章,为什么要扯到《变形金刚》电影呢?这是因为这部电影的主要内容与本文所提到的技术有一定的相关性。

在这部电影中,御天敌背叛了擎天柱,与霸天虎合作。在地球的各地布置了许多的能量柱,他试图借助这些能量柱将赛博坦星球传送到地球上,以此来重建自己的家园。

这些能量柱必须组合起来才能完成传输工作,并且在这其中有一个红色的能量柱比较特殊,因为它负责控制其他的传送柱。

t1.png

由此可见,这是一个大型的分布式系统。在这个系统中,这个红色的能量柱被称之为“中心节点”,中心节点正如其名称那样,它是整个系统的中心。对于带有中心节点的分布式系统来说,一旦中心节点被摧毁,整个系统都将无法工作。

因此电影的后来,自然是擎天柱摧毁了这个中心节点,使得御天敌的传送计划彻底失败。

t2.png

从设计上来说,对于一个如此大型的系统,却存在一个非常薄弱和重要的中心节点,这并不是一个好的方案。

而本文介绍的DDS就是一个去中心化的分布式技术。因此在这类系统中,不存在负责总控制的中心节点,所有节点都完全对等。任何一个节点的异常都不会影响整个系统的运行。

DDS介绍

DDS全称是Data Distribution Service,这是一套通信协议和API标准,它提供了以数据为中心的连接服务,基于发布者-订阅者模型。这是一套中间件,它提供介于操作系统和应用程序之间的功能,使得组件之间可以互相通信。并且提供了低延迟,高可靠的通信以及可扩展的架构。

或许,你已经知道很多种网络通信协议,对于发布-订阅这些概念也很熟悉。那DDS到底有什么特别之处呢?

下图展示了4个时代的数据通信方式:

gen.png
  • (第一代)点对点的CS(Client-Server)结构,这是大家最为熟悉的:一个服务器角色被许多的客户端使用,每次通信时,通信双方必须建立一条连接。当通信节点增多时,通信的连接数也会增多。并且,每个客户端都必须知道服务器的具体地址和所提供的服务。一旦服务器地址发生变化,所有客户端都会受到影响。
  • (第二代)Broker模型:存在一个中间人,它负责初步处理大家的请求,并进一步找到真正能响应服务的角色,这就好像存在一个经纪人。这为客户端提供了一层抽象,使得服务器的具体地址变得不重要了。服务端地址如果发生变化,只需要告诉Broker就可以了。但这个模型的问题在于,Broker变成了模型的中心,它的处理速度会影响所有人的效率,这就好像城市中心的路口,当系统规则增长到一定程度,Broker终究会成为瓶颈。更糟糕的是,如果Broker瘫痪了,可能整个系统都将无法运转。
  • (第三代)广播模型:所有人都可以在通道上广播消息,并且所有人都可以收到消息。这个模型解决了服务器地址的问题,且通信双方不用单独建立连接,但它存在的问题是:广播通道上的消息太多,太嘈杂,所有人都必须关心每条消息是否与自己有关。这就好像全公司一千号人坐在同一个房间里面办公一样。
  • (第四代)DDS模型:这种模型与广播模型有些类似,所有人都可以在DataBus上发布和读取消息。但它更进一步的是,通信中包含了很多并行的通路,每个人可以只关心自己感兴趣的消息,自动忽略自己不需要的消息。

下图展示了DDS在网络栈中的位置,它位于传输层的上面,并且以TCP,UDP为基础。

stack.png

这个图之所以是沙漏形状是因为:两头的技术变化都发展很快,但是中间的却鲜有变化。

对比大家常见的Socker API,DDS有如下特点:

特性 Socket API DDS 架构 TCP:点对点
UDP:点对点,广播,多播 Publish-subscribe模型 平台独立 需要为不同硬件,操作系统和编程语言编写不同的代码 所有硬件,操作系统和编程语言使用相同的API 发现 需要硬编码IP地址和端口号 动态发现,无需关注端点所在位置 类型安全 没有类型安全,应用需要将字节流转换成正确类型 强类型安全,write()read()针对特定数据类型 通信行为定制 需要通过自定义的代码来实现 通过QoS策略来完成 互操作性 不支持 具有公认的互操作性的开放标准

关于DDS的更多特性,可以点击这个链接:《What is DDS?》

DDS可以降低系统复杂度

对于分布式系统来说,有很多复杂的逻辑需要处理,例如:如何发现其他节点,如何为每个节点分配地址,如何配置消息的可靠性等。这使得应用程序变得臃肿。

而如果说通信的中间件能够完全处理好这些逻辑,则应用程序将可以集中处理自己的业务,变得更加敏捷。

下图是两种情况的对比:

complex.png

如果考虑系统的演化,问题会更突出。

由于分布式系统中包含了许多的角色需要互相通信,随着角色数量的不断增长,其通信的通道数量会以爆炸式增长。

traditioan.png

而如果有统一的DataBus,则即便新增了通信角色其通信模型也不会变得更加复杂。

dds_arch.png

DDS应用范围

尽管可能你原先没有听过DDS这个术语,但其实它的应用非常广泛,广泛到它涉及到了我们每天都要依赖的许多重要行业,例如:航空,国防,交通,医疗,能源等等。

下图是一些示例:

application.png

DDS 提供商

DDS本身是一套标准。由Object Management Group(简称OMG)维护。

OMG是一个开放性的非营利技术标准联盟,由许多大型IT公司组成:包括IBM,Apple Computer,Sun Microsystems等。

但OMG仅仅负责制定标准,而标准的实现则由其他服务提供商完成。

目前DDS的提供商包括下面这些:

DDS与RTPS

在DDS规范中,有两个描述标准的基本文档:

  • DDS Specification:描述了以数据为中心的发布-订阅模型。该规范定义了API和通信语义(行为和服务质量),使消息从消息生产者有效地传递到匹配的消费者。DDS规范的目的可以概括为:“能够在正确的时间将正确的信息高效,可靠地传递到正确的位置”。
  • DDSI-RTPS:描述了RTPS(Real Time Publish Subscribe Protocol)协议。该协议通过UDP等不可靠的传输,实现最大努力(Best-Effort)和可靠的发布-订阅通信。RTPS是DDS实现的标准协议,它的目的和范围是确保基于不同DDS供应商的应用程序可以实现互操作。
spec.png

DDS与汽车行业

对于汽车行业来说,汽车开放系统架构(AUTomotive Open System ARchitecture)已经在AUTOSAR Adaptive Platform 18.03中包含了DDS协议

ROS2架构也以DDS为基础

另外,DDS的实时特性可能特别适合自动驾驶系统。在这类系统中,通常会存在感知,预测,决策和定位模块,这些需要非常高速和频繁的交换数据。借助DDS,可以很好的满足它们的通信需求。

autonomous.png

Fast-RTPS 介绍

Fast-RTPS是eprosima对于RTPS的C++实现,这是一个免费开源软件,遵循Apache License 2.0。

eProsima Fast RTPS在性能,功能和对最新版本RTPS标准(RTPS 2.2)的遵守方面均处于领先地位。关于Fast RTPS的性能可以查看这个链接:eProsima Fast RTPS Performance

它最为被大家知道的可能是因为被ROS2设定为默认的消息中间件。

Fast-RTPS支持平台包括:Windows, Linux, Mac OS, QNX, VxWorks, iOS, Android, Raspbian。

Fast-RTPS具有以下优点:

  • 对于实时应用程序来说,可以在Best-Effort和可靠通信两种策略上进行配置。
  • 即插即用的连接性,使得网络的所有成员自动发现其他新的成员。
  • 模块化和可扩展性允许网络中设备不断增长。
  • 可配置的网络行为和可互换的传输层:为每个部署选择最佳协议和系统输入/输出通道组合。
  • 两个API层:一个简单易用的发布者-订阅者层和一个提供对RTPS协议内部更好控制的Writer-Reader层。

源码与编译

Fast-RTPS的源码位于Github上:eProsima/Fast-RTPS

可以通过下面这条命令获取其源码:

git clone https://github.com/eProsima/Fast-RTPS.git

关于如何编译Fast-RTPS可以参见这个链接:Fast RTPS Installation from Sources

Fast-RTPS概述

Fast-RTPS提供了两个层次的API:

  • Publisher-Subscriber层:RTPS上的简化抽象。
  • Writer-Reader层:对于RTPS端点的直接控制。

相较而言,后者更底层。两个层次的核心角色如下图所示:

architecture.png

Publisher-Subscriber层为大多数开发者提供了一个方便的抽象。它允许定义与Topic关联的发布者和订阅者,以及传输Topic数据的简单方法。

Writer-Reader层更接近于RTPS标准中定义的概念,并且可以进行更精细的控制,但是要求开发者直接与每个端点的历史记录缓存进行交互。

Fast RTPS是并发且基于事件的。每个参与者都会生成一组线程来处理后台任务,例如日志记录,消息接收和异步通信。

事件系统使得Fast RTPS能够响应某些条件并安排定期活动。用户中几乎不用感知它们,因为这些事件大多数仅仅与RTPS元数据有关。

对象与数据结构

下面是Fast-RTPS实现中的核心结构。

Publish-Subscriber模块

RTPS标准的高层类型。

  • Domain:用来创建,管理和销毁Participants。
  • Participant:包括Publisher和Subscriber,并管理它们的配置。
    • ParticipantAttributes:创建Participant的配置参数。
    • ParticipantListener:可以让开发者实现Participant的回调函数。
  • Publisher:在Topic上发布数据的对象。
    • PublisherAttributes:创建Publisher的配置参数。
    • PublisherListener:可以让开发者实现Publisher的回调函数。
  • Subscriber:在Topic上接受数据的对象。
    • SubscriberAttributes:创建Subscriber的配置参数。
    • SubscriberListener:可以让开发者实现Subscriber的回调函数。

RTPS模块

RTPS的底层模型。包含下面几个子模块:

  • RTPS Common
    • CacheChange_t:描述Topic上的变更,存储在历史Cache中。
    • Data:Cache变化的负载。
    • Message:RTPS消息。
    • Header:RTPS协议的头信息。
    • Sub-Message Header:标识RTPS的订阅消息。
    • MessageReceiver:反序列化和处理接受到的RTPS消息。
    • RTPSMessageCreator:构建RTPS消息。
  • RTPS Domain
    • RTPSDomain:用来创建,管理和销毁底层的RTPSParticipants。
    • RTPSParticipant:包括Writer和Reader。
  • RTPS Reader
    • RTPSReader:读者的基类。
    • ReaderAttributes:包含RTPS读者的配置参数。
    • ReaderHistory:存储Topic变化的历史数据。
    • ReaderListener:读者的回调类型。
  • RTPS Writer
    • RTPSWriter:写者的基类。
    • WriterAttributes:包含RTPS写者的配置参数。
    • WriterHistory:存储写者的历史数据。

配置Attributes

上面的数据结构中看到了许多Attributes后缀的类名。这些类包含了对协议或者对象的配置参数,很多特性都需要设置这些属性来完成。

这些类的定义基本都位于下面三个文件夹中:

Fast RTPS支持非常多的配置参数,并且参数的结构常常是嵌套的。

通过代码去配置这些参数会产生很多啰嗦的代码,而且最大的问题在于:每次更改配置参数都需要重新编译。这个问题并非Fast RTPS才有,只要包含大量配置参数的软件都会这样的问题。通常的解决方法就是:提供文本格式的配置文件的方式来配置参数。因此对于Fast-RTPS来说,除了支持通过代码配置参数,它也支持通过XML文件的方式来进行配置。

有了配置文件之后,在代码中直接读取就好了,例如:

Participant *participant = Domain::createParticipant("participant_xml_profile");

在这之后,如果需要调整配置,只需要修改配置文件,不用在改动代码,自然也不用重新编译。这对于项目部署是很重要的。

Fast-RTPS支持的配置项,以及这些配置项说明和默认值都可以到这个链接中查看:XML profiles

Domain

RTPS中的通信参数者之间,通过Domain进行隔离。

同一时刻可能有多个Domain同时存在,一个Domain中可以包含任意数目的消息发送者和接受者。

其结构如下图所示:

RTPS-structure.png

开发者可以通过domainId来指定参与者所属Domain。

如果没有指定,默认的domainId = 80

作为DDS的实现,Fast-RTPS提供了Publisher和Subscriber自动发现和匹配的功能。在实现上,这分为两个步骤来完成:

  • Participant Discovery Phase (PDP):在这个阶段,参与者互相通知彼此的存在。为了达到这个目的,每个参与者需要定时发送公告消息。公告消息通过周知的多播地址和端口发送(根据domain计算得到)。
  • Endpoint Discovery Phase (EDP):在这个阶段,Publisher和Subscriber互相确认。为此,参与者使用在PDP期间建立的通信通道,彼此共享有关其发布者和订阅者的信息。 该信息包含了Topic和数据类型。为了使两个端点匹配,它们的Topic和数据类型必须一致。 一旦发布者和订阅者匹配,他们就发送/接收数据了。

这两个阶段对应了两个独立的协议:

  • Simple Participant Discovery Protocol:指定参与者如何在网络中发现彼此。
  • Simple Endpoint Discovery Protocol:定义了已经互相发现的参与者交换信息的协议。

Fast-RTPS提供了四种发现机制:

  • Simple:这是默认机制。它在PDP和EDP阶段均使用RTPS标准,因此可与任何其他DDS和RTPS实现兼容。
  • Static:此机制在PDP阶段使用Simple Participant Discovery Protocol。如果所有发布者和订阅者的地址以及端口和主题信息是事先知道的,则允许跳过EDP阶段。
  • Server-Client:这种发现机制使用集中式发现结构,由服务器充当发现机制的Hub。
  • Manual:此机制仅与RTPSDomain层兼容。它禁用了PDP阶段,使用户可以使用其选择的任何外部元信息通道手动匹配和取消匹配RTPS参与者,读者和写者。

不同的发现机制具有一些共同的配置:

名称 描述 默认值 Ignore Participant flags 在必要的时候,可以选择忽略一些参与者。
例如:另一台主机上的,另一个进程的或者同一个进程的。 NO_FILTER Lease Duration 指定远程参与者在多少时间内认为本地参与者还活着。 20 s Announcement Period 指定参与者的PDP公告消息的周期。 3s

关于发现机制的更多信息可以浏览这个链接:Discovery

Fast-RTPS实现了可插拔的传输架构,这意味着每一个参与者可以随时加入和退出。

在传输上,Fast-RTPS支持以下五种传输方式:

  • UDPv4
  • UDPv6
  • TCPv4
  • TCPv6
  • SHM(Shared Memory)

默认的,当Participant创建时,会自动的配置两个传输通道:

  • SHM:用来与同一个机器上的参与者通信。
  • UDPv4:同来与跨机器的参与者通信。

当然,开发者可以改变这个默认行为,通过C++接口或者XML配置文件都可以。

SHM要求所有参与者位于同一个系统上,它是借助了操作系统提供的共享内存机制实现。共享内存的好处是:支持大数据传输,减少了数据拷贝,并且也减少系统负载。因此通常情况下,使用SHM会获得更好的性能。使用SHM时,可以配置共享内存的大小。

网络通信包含了非常多的参数需要配置,例如:Buffer大小,端口号,超时时间等等。框架本身为参数设置了默认值,大部分情况下开发者不用调整它们。但是知道这些默认值是什么,在一些情况下可能会对分析问题有所帮助。关于这些配置的默认值,以及如果配置可以查看这个链接:Transport descriptors

与UDP不同,TCP传输是面向连接的,因此,Fast-RTPS必须在发送RTPS消息之前建立TCP连接。TCP传输可以具有两种行为:充当TCP服务器或充当TCP客户端。服务器打开一个TCP端口以侦听传入的连接,然后客户端尝试连接到服务器。服务器和客户端的概念独立于RTPS概念,例如:Publisher,Subscriber,Reader或Writer。它们中的任何一个都可以用作TCP服务器或TCP客户端,因为这些实体仅用于建立TCP连接,而RTPS协议可以在该TCP连接上工作。

如果要使用TCP传输,开发者需要做更多的配置,关于这部分内容可以继续阅读官方文档,这里不再赘述。

FastRTPS代码示例

FastRTPS不仅有框架文档API Reference,还有丰富的代码示例。

对于开发者来说,浏览这些代码可能是上手最快捷的方法。

你可以在这里浏览这些示例:Fast-RTPS/examples/C++/

FASTRTPSGEN

FASTRTPSGEN是一个Java程序。用来为在Topic上传输的数据类型生成源码。

开发者通过接口描述语言(Interface Definition Language)定义需要传输的数据类型。然后通过FASTRTPSGEN生成C++编译需要的源文件。

可以通过下面的方法获取和编译FASTRTPSGEN。

git clone --recursive https://github.com/eProsima/Fast-RTPS-Gen.git
cd Fast-RTPS-Gen
gradle assemble

编译完成之后可执行文件位于./scripts/目录。如果需要,可以将该路径添加到$PATH变量中。

关于如何通过IDL定义数据类型请参见这里:Defining a data type via IDL

以下面这个示例文件为例:

struct TestData 
{
char char_type;
octet octet_type;
long long_type;
string string_type;

float float_array[4];

sequence<double> double_list;
};

我们将其保存到文件名为data_type.idl的文件中。然后通过下面这条命令生成C++文件:

~/Fast-RTPS-Gen/scripts/fastrtpsgen data_type.idl 

最后会得到下面四个文件:

data_type.cxx
data_type.h
data_typePubSubTypes.cxx
data_typePubSubTypes.h

前两个文件定义的是实际存储数据的结构,后两个文件定义的类是eprosima::fastrtps::TopicDataType的子类。用来在参与者上注册类型:

/**
 * Register a type in a participant.
 * @param part Pointer to the Participant.
 * @param type Pointer to the Type.
 * @return True if correctly registered.
 */
RTPS_DllAPI static bool registerType(
        Participant* part,
        fastdds::dds::TopicDataType * type);

每一套通信系统中通常都会包含一个或多个自定义的数据类型。

发布者-订阅者层

可以通过 HelloWorldExample 来熟悉发布者-订阅者层接口。

该目录下文件列表如下:

-rw-r--r--  1 paul  staff   1.8K  3 16 13:36 CMakeLists.txt
-rw-r--r--  1 paul  staff   2.8K  3 16 13:36 HelloWorld.cxx
-rw-r--r--  1 paul  staff   6.1K  3 16 13:36 HelloWorld.h
-rw-r--r--  1 paul  staff    62B  3 16 13:36 HelloWorld.idl
-rw-r--r--  1 paul  staff   4.4K  3 16 13:36 HelloWorldPubSubTypes.cxx
-rw-r--r--  1 paul  staff   1.7K  3 16 13:36 HelloWorldPubSubTypes.h
-rw-r--r--  1 paul  staff   4.6K  3 16 13:36 HelloWorldPublisher.cpp
-rw-r--r--  1 paul  staff   1.7K  3 16 13:36 HelloWorldPublisher.h
-rw-r--r--  1 paul  staff   3.8K  3 16 13:36 HelloWorldSubscriber.cpp
-rw-r--r--  1 paul  staff   1.8K  3 16 13:36 HelloWorldSubscriber.h
-rw-r--r--  1 paul  staff   2.0K  3 16 13:36 HelloWorld_main.cpp
-rw-r--r--  1 paul  staff   3.1K  3 16 13:36 Makefile
-rw-r--r--  1 paul  staff   203B  3 16 13:36 README.txt
  • README.txt是工程说明
  • CMakeLists.txt与Makefile是编译文件
  • HelloWorld_main.cpp包含了生成可执行文件的main函数
  • HelloWorld.idl是待传输的数据结构定义
  • HelloWorld.h,HelloWorld.cxx,HelloWorldPubSubTypes.h和HelloWorldPubSubTypes.cxx是由HelloWorld.idl文件生成
  • HelloWorldPublisher.h和HelloWorldPublisher.cpp是发布者的实现
  • HelloWorldSubscriber.h和HelloWorldSubscriber.cpp是订阅者的实现

熟悉一个C++工程可以先从main入手,HelloWorld_main.cpp中的主要逻辑就是根据用户输入的参数是"publisher"还是"subscriber"来确定启动哪个模块。

switch(type)
{
    case 1:
        {
            HelloWorldPublisher mypub;
            if(mypub.init())
            {
                mypub.run(count, sleep);
            }
            break;
        }
    case 2:
        {
            HelloWorldSubscriber mysub;
            if(mysub.init())
            {
                mysub.run();
            }
            break;
        }
}

接下来我们直接看HelloWorldPublisher和HelloWorldSubscriber就好。

HelloWorldPublisher::init中主要是为Publisher的对象设置参数:

bool HelloWorldPublisher::init()
{
    m_Hello.index(0);
    m_Hello.message("HelloWorld");
    ParticipantAttributes PParam;
    PParam.rtps.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE;
    PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader = true;
    PParam.rtps.builtin.domainId = 0;
    PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
    PParam.rtps.setName("Participant_pub");
    mp_participant = Domain::createParticipant(PParam);

    if(mp_participant==nullptr)
        return false;
    //REGISTER THE TYPE

    Domain::registerType(mp_participant,&m_type);

    //CREATE THE PUBLISHER
    PublisherAttributes Wparam;
    Wparam.topic.topicKind = NO_KEY;
    Wparam.topic.topicDataType = "HelloWorld";
    Wparam.topic.topicName = "HelloWorldTopic";
    Wparam.topic.historyQos.kind = KEEP_LAST_HISTORY_QOS;
    Wparam.topic.historyQos.depth = 30;
    Wparam.topic.resourceLimitsQos.max_samples = 50;
    Wparam.topic.resourceLimitsQos.allocated_samples = 20;
    Wparam.times.heartbeatPeriod.seconds = 2;
    Wparam.times.heartbeatPeriod.nanosec = 200*1000*1000;
    Wparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
    mp_publisher = Domain::createPublisher(mp_participant,Wparam,(PublisherListener*)&m_listener);
    if(mp_publisher == nullptr)
        return false;

    return true;

}

这里的参数配置请参阅API说明:

Publisher发送消息的逻辑很简单:

bool HelloWorldPublisher::publish(bool waitForListener)
{
    if(m_listener.firstConnected || !waitForListener || m_listener.n_matched>0)
    {
        m_Hello.index(m_Hello.index()+1);
        mp_publisher->write((void*)&m_Hello);
        return true;
    }
    return false;
}

注意,这里write的对象是通过idl文件生成的类型。

Subscriber的初始化和Publisher是类似的:

bool HelloWorldSubscriber::init()
{
    ParticipantAttributes PParam;
    PParam.rtps.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE;
    PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader = true;
    PParam.rtps.builtin.domainId = 0;
    PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
    PParam.rtps.setName("Participant_sub");
    mp_participant = Domain::createParticipant(PParam);
    if(mp_participant==nullptr)
        return false;

    //REGISTER THE TYPE

    Domain::registerType(mp_participant,&m_type);
    //CREATE THE SUBSCRIBER
    SubscriberAttributes Rparam;
    Rparam.topic.topicKind = NO_KEY;
    Rparam.topic.topicDataType = "HelloWorld";
    Rparam.topic.topicName = "HelloWorldTopic";
    Rparam.topic.historyQos.kind = KEEP_LAST_HISTORY_QOS;
    Rparam.topic.historyQos.depth = 30;
    Rparam.topic.resourceLimitsQos.max_samples = 50;
    Rparam.topic.resourceLimitsQos.allocated_samples = 20;
    Rparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
    Rparam.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
    mp_subscriber = Domain::createSubscriber(mp_participant,Rparam,(SubscriberListener*)&m_listener);

    if(mp_subscriber == nullptr)
        return false;


    return true;
}

当然,Subscriber有自己的配置参数类型:

需要注意的是,Subscriber与Publisher的通信是建立在Topic上的,因此对于Topic标识的配置要保持一致:

Wparam.topic.topicDataType = "HelloWorld";
Wparam.topic.topicName = "HelloWorldTopic";

有了Topic的这个抽象概念,使得Subscriber与Publisher不用物理地址上有任何关联,也屏蔽了硬件和操作系统的差异:同样的代码,其编译产物可以一个跑在x86的Mac系统上,一个跑在ARM架构的Android设备上。

Subscriber通过void HelloWorldSubscriber::SubListener::onNewDataMessage(Subscriber* sub)方法来处理接受到的数据。在示例的实现中,就是将消息体打印出来:

void HelloWorldSubscriber::SubListener::onNewDataMessage(Subscriber* sub)
{
    if(sub->takeNextData((void*)&m_Hello, &m_info))
    {
        if(m_info.sampleKind == ALIVE)
        {
            this->n_samples++;
            // Print your structure data here.
            std::cout << "Message "<<m_Hello.message()<< " "<< m_Hello.index()<< " RECEIVED"<<std::endl;
        }
    }

}

Publisher与Subscriber各自有一些回调,开发者可以利用它们来进行需要的处理:

回调 Publisher Subscriber onNewDataMessage - √ onSubscriptionMatched - √ on_requested_deadline_missed - √ on_liveliness_changed - √ onPublicationMatched √ - on_offered_deadline_missed √ - on_liveliness_lost √ -

读者-写者层

读者-写者层是相对于发布者-订阅者层更底层的API。

它提供了更多的控制,但也意味着使用起来会稍微麻烦一些。

两个层次在几个核心概念上存在一一对应的关系,如下表所示:

Publisher-Subscriber Layer Writer-Reader Layer Domain RTPSDomain Participant RTPSParticipant Publisher RTPSWriter Subscriber RTPSReader

如果你浏览Fast-RTPS的源码,你会发现其实发布者-订阅者层的实现就是依赖读者-写者层的。

想要很快的熟悉读者-写者层的使用可以浏览下面三个代码示例:

RTPSParticipant,RTPSWriter和RTPSReader都通过RTPSDomain创建。

相对于发布者-订阅层不一样的是,这一层不支持通过XML的形式配置参数。开发者必须通过代码的形式配置所有的参数,例如:

//CREATE PARTICIPANT
RTPSParticipantAttributes PParam;
PParam.builtin.discovery_config.discoveryProtocol = eprosima::fastrtps::rtps::DiscoveryProtocol::SIMPLE;
PParam.builtin.use_WriterLivelinessProtocol = true;
mp_participant = RTPSDomain::createParticipant(PParam);
if(mp_participant==nullptr)
	return false;

//CREATE WRITERHISTORY
HistoryAttributes hatt;
hatt.payloadMaxSize = 255;
hatt.maximumReservedCaches = 50;
mp_history = new WriterHistory(hatt);

//CREATE WRITER
WriterAttributes watt;
watt.endpoint.reliabilityKind = BEST_EFFORT;
mp_writer = RTPSDomain::createRTPSWriter(mp_participant,watt,mp_history,&m_listener);

这里的逻辑主要就是设置参数和创建RTPSParticipant,RTPSWriter对象。并且,RTPSParticipant将被用来注册RTPSWriter:

TopicAttributes Tatt;
Tatt.topicKind = NO_KEY;
Tatt.topicDataType = "string";
Tatt.topicName = "exampleTopic";
ReaderQos Rqos;
return mp_participant->registerReader(mp_reader, Tatt, Rqos);

在RTPS协议中,Reader和Writer将有关Topic的数据保存在其关联的历史记录中。每个数据段都由一个变更表示,对应的实现是CacheChange_t

更改通过历史记录管理。读者和写者的历史是两种类型:

  • eprosima::fastrtps::rtps::WriterHistory;
  • eprosima::fastrtps::rtps::ReaderHistory;

对于Writer来说,发送消息是往历史中添加变更:

//Request a change from the history
CacheChange_t* change = writer->new_change([]() -> uint32_t { return 255;}, ALIVE);
//Write serialized data into the change
change->serializedPayload.length = sprintf((char*) change->serializedPayload.data, "My example string %d", 2)+1;
//Insert change back into the history. The Writer takes care of the rest.
history->add_change(change);

而对于Reader来说,新消息会被放入到历史中,读取完了可以将其删除:

void TestReaderRegistered::MyListener::onNewCacheChangeAdded(
        RTPSReader* reader,
        const CacheChange_t* const change)
{
    printf("Received: %s\n", change->serializedPayload.data);
    reader->getHistory()->remove_change((CacheChange_t*)change);
    n_received++;
}

框架会根据消息触发Reader的回调。

默认情况下,Writer的历史在其生命周期以内可以被Reader访问。这意味着,一旦Writer退出,则其历史就没有了。但如果需要,你可以配置持久化,这使得即便Writer重启了,仍然可以维护早先的历史。

使用持久化功能可以保护端点的状态免受意外故障的影响,因为端点在重新启动后仍会继续通信,就像它们刚从网络断开连接一样。

你可以通过RTPSTest_persistent这个示例来了解如何使用这个功能。

要使用持久化功能,Writer和Reader需要进行以下设置:

  • durabilityKind设置为TRANSIENT
  • persistence_guid不能是全0
  • 为Writer,Reader或者RTPSParticipant设置持久化插件。目前内置的插件是SQLITE3。

下面是一段代码示例:

PropertyPolicy property_policy;
property_policy.properties().emplace_back("dds.persistence.plugin", "builtin.SQLITE3");
property_policy.properties().emplace_back("dds.persistence.sqlite3.filename", "test.db");

//CREATE WRITER
WriterAttributes watt;
watt.endpoint.reliabilityKind = BEST_EFFORT;
watt.endpoint.durabilityKind = TRANSIENT;
watt.endpoint.persistence_guid.guidPrefix.value[11] = 1;
watt.endpoint.persistence_guid.entityId.value[3] = 1;
watt.endpoint.properties = property_policy;

mp_writer = RTPSDomain::createRTPSWriter(mp_participant, watt, mp_history, &m_listener);

durabilityKind参数定义了Writer与新Reader匹配时对于已发送的数据的行为,该参数有三个选项:

  • VOLATILE(默认值):丢掉所有已经发送的数据。
  • TRANSIENT_LOCAL:保存最近发送的k条数据。
  • TRANSIENT:与TRANSIENT_LOCAL类似,但是还将消息保存到持久化存储中。这就使得即便它的进程异常退出了,其数据不会丢失。

对于读者来说,其配置方法是类似的:

PropertyPolicy property_policy;
property_policy.properties().emplace_back("dds.persistence.plugin", "builtin.SQLITE3");
property_policy.properties().emplace_back("dds.persistence.sqlite3.filename", "test.db");

//CREATE READER
ReaderAttributes ratt;
Locator_t loc(22222);
ratt.endpoint.unicastLocatorList.push_back(loc);
ratt.endpoint.durabilityKind = TRANSIENT;
ratt.endpoint.persistence_guid.guidPrefix.value[11] = 2;
ratt.endpoint.persistence_guid.entityId.value[3] = 1;
ratt.endpoint.properties = property_policy;
mp_reader = RTPSDomain::createRTPSReader(mp_participant, ratt, mp_history, &m_listener);

使用Fast-RTPS,你有非常多的QoS策略可以配置。

QoS.png

它们主要可以分为下面几类:

  • durability:定义了Writer与新Reader匹配时对于已发送的数据的行为,“持久化”一节已经提到过。
  • liveliness:定义Publisher的活跃程度。例如:多长时间发布一次公告消息。
  • reliability:定义消息的可靠性。它有两个选项:1、BEST_EFFORT,发送消息时,接收者(订阅者)没有到达确认。速度快,但是消息可能会丢失。2、RELIABLE,发送方(发布者)期望接收方(订阅者)进行到达确认。速度较慢,但可以防止数据丢失。
  • partition:可以在domain的物理分区上建立逻辑分区。
  • deadline:指定消息的更新频率,当新消息的频率降至某个阈值以下时,会发出警报。这对于需要定期更新数据的场景很有用。
  • lifespan:指定Publisher发布数据的最大有效期限。当使用寿命到期时,数据将从历史记录中删除。
  • disablePositiveAcks:指定是否需要取消确认消息。在不需要严格可靠的通信且带宽受限时,这么做可以减少网络流量。

在实现中,QoS包含了一系列的类,它们继承自QoSPolicy父类:

classeprosima_1_1fastrtps_1_1_qos_policy__inherit__graph.png

之所以提供如此多的选择,是因为不同的系统对于消息的质量有不同的要求。

在实际系统中,并非每个端点都需要本地存储所有数据。DDS在发送信息方面很聪明,如果消息不一定总是到达预期的目的地,则中间件将保证需要的可靠性。当系统发生更改时,Fast-RTPS会动态地找出将哪些数据发送到何处,并智能地将更改通知参与者。如果总数据量巨大,则DDS会智能过滤并仅发送每个端点真正需要的数据。当需要快速更新时,DDS发送多播消息以一次更新许多远程应用程序。当数据格式变更时,DDS会跟踪系统各个部分使用的版本并自动进行转换。对于安全性至关重要的应用程序,DDS控制访问,强制执行数据流路径并实时加密数据。

当系统要满足:以极高的速度,在动态,苛刻且不可预测的环境工作时,DDS的真正威力就会显现。

文章的最后,我们通过实际运行程序来进行一些实验。虽然Fast-RTPS支持非常多的操作系统,但在Ubuntu系统上验证可能是最方便的。

Fast-RTPS是面向分布式系统的,这意味着在一个系统上验证它的功能意义不大。但另一方面我们大部分人并没有同时拥有多个设备。

在这种情况下,我们可以借助docker,它可以在同一个系统上运行多个独立的虚拟系统。然后我们就可以在这些独立的系统上进行测试了,这样就模拟了分布式的环境。

Fast-RTPS提供了包含依赖环境的Docker容器,我们只要下载和运行这些容器,就可以拥有多个独立的系统了。

不过,在这运行下面这些示例之前,你需要配置好docker环境。关于docker的基本使用已经超过了本文的范畴,你可以浏览这个链接:Install Docker Engine on Ubuntu

Fast-RTPS需要的文件可以到官网下载:https://eprosima.com/index.php/downloads-all

点击上面这个链接,然后输入个人信息就可以进入下载页面了。你可以选择最新版本的Docker和Fast-RTPS包进行下载:

download.png

考虑到国内的网络状况,下载的速度可能非常慢。我下载需要的文件耗费的好几个小时,为了节省你的时间,我已经将下载好的文件放在了这里:

在Ubuntu系统中,先将eProsima_FastRTPS-1.9.3-Linux.tgz解压缩,为了编译它,还需要安装一些依赖,相关命令如下:

sudo apt install cmake g++
sudo apt install libasio-dev libtinyxml2-dev
mkdir fast-rtps
tar -xvf eProsima_FastRTPS-1.9.3-Linux.tgz -C fast-rtps/
cd fast-rtps/
chmod a+x install.sh
sudo ./install.sh

在这之后你就可以转到/fast-rtps/src/fastrtps/examples/目录下编译示例了。不过这个目录下的CMakeList.txt似乎存在问题,我在这个文件的开头增加了下面一行才完成编译:

SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")

编译完成之后,我们并非是在Ubuntu系统上运行程序,而是将这些可执行文件放到docker容器中,以分布式的环境来运行它们。

所以需要启动docker容器:

$ docker load -i ubuntu-fast-rtps.tar
$ docker run -it ubuntu-fast-rtps:v1.9.3

你可以通过docker run -it ...同时启动多个docker容器以进行测试(每个容器对应一个通信的参与者。当然,你需要同时打开多个shell窗口)。

例如我启动了两个docker容器:

CONTAINER ID        IMAGE                     COMMAND             CREATED             STATUS              PORTS               NAMES
2125504ee62f        ubuntu-fast-rtps:v1.9.3   "/bin/bash"         5 minutes ago       Up 5 minutes                            mystifying_jennings
b17517fefecd        ubuntu-fast-rtps:v1.9.3   "/bin/bash"         23 minutes ago      Up 23 minutes                           stoic_leavitt

运行docker run -it ...之后会直接进入docker的shell中,你可以在根目录创建fastrtps目录用来存放测试程序。

然后在Ubuntu系统上将编译出的示例程序拷贝到docker中:

sudo docker cp ./ b17517fefecd:/fastrtps
sudo docker cp ./ 2125504ee62f:/fastrtps

在这之后就可以转到docker容器的shell中运行测试程序了。

例如,在两个docker上运行HelloWorld的示例:

  • 下面是Publisher程序:
root@b17517fefecd:/fastrtps/HelloWorldExample# ./HelloWorldExample publisher
Publisher running 10 samples.
Publisher matched
Message: HelloWorld with index: 1 SENT
Message: HelloWorld with index: 2 SENT
Message: HelloWorld with index: 3 SENT
Message: HelloWorld with index: 4 SENT
Message: HelloWorld with index: 5 SENT
Message: HelloWorld with index: 6 SENT
Message: HelloWorld with index: 7 SENT
Message: HelloWorld with index: 8 SENT
Message: HelloWorld with index: 9 SENT
Message: HelloWorld with index: 10 SENT
  • 下面是Subscriber程序:
root@2125504ee62f:/fastrtps/HelloWorldExample# ./HelloWorldExample subscriber
Starting 
Subscriber running. Please press enter to stop the Subscriber
Subscriber matched
Message HelloWorld 1 RECEIVED
Message HelloWorld 2 RECEIVED
Message HelloWorld 3 RECEIVED
Message HelloWorld 4 RECEIVED
Message HelloWorld 5 RECEIVED
Message HelloWorld 6 RECEIVED
Message HelloWorld 7 RECEIVED
Message HelloWorld 8 RECEIVED
Message HelloWorld 9 RECEIVED
Message HelloWorld 10 RECEIVED
Subscriber unmatched

接下来是Benchmark程序:

  • Benchmark subscriber端:
root@b17517fefecd:/fastrtps/Benchmark# ./Benchmark subscriber
Subscriber running...
Subscriber matched
Publisher matched
Subscriber unmatched
Publisher unmatched
  • Benchmark publisher端:
root@2125504ee62f:/fastrtps/Benchmark# ./Benchmark publisher
Publisher running...
Subscriber matched
Publisher matched. Test starts...
RESULTS after 10000 milliseconds:
COUNT: 53951
SAMPLES: 0,771,668,548,582,716,700,706,408,440,592,636,738,698,648,574,706,776,690,584,638,556,750,740,640,584,572,542,526,560,552,528,608,504,630,478,598,708,620,528,660,718,578,646,702,528,652,528,450,508,566,544,516,616,652,584,532,434,542,678,752,696,412,544,654,766,736,612,496,470,662,580,566,634,674,568,532,546,528,552,552,528,490,508,598,620,672,506,468,654,

在运行的过程中,你可以感受到借助Fast-RTPS,不同系统上的参与者是多么快速的发现了对方并完成了通信的。 当然,你可以运行更多的用例,或者修改代码进行你想要的测试。

参考资料与推荐读物


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK