2

Apollo Cyber RT 通信(下)

 2 years ago
source link: https://dingfen.github.io/apollo/2020/11/07/CyberCommu2.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Apollo Cyber RT 通信(下)Permalink

前言Permalink

欢迎回来!今天我继续和大家聊 Cyber RT 的通信,上文我探讨了 Cyber RT 的两种通信方式和三种通信模型,并从通信架构的角度,一层层地给大家详细介绍了通信时代码的具体工作情况。由于通信内容过多,全放在一篇博客理论太长,于是我将这些内容简单地一分为二,事实上它们应当是有机的整体。

上文末尾,我介绍了 Blocker 类的功能,今天这篇博客,我们继续向深处进军↖(^ω^)↗。

CyberLayer.png

Receiver & TransmitterPermalink

我们在上文已经认识了 ReaderWriter ,现在继续往下走。如果你仔细看代码的话,在 ReaderWriter 初始化时,都会分别构建好底层的 ReceiverTransmitter 对象。为了描述简便,我之前有意地忽略了,现在把它拿出来。Reader 部分的比较复杂,还使用了 ReceiverManager 进行管理;Writer 就比较直接了,在 Init() 函数中可以直接看到。

bool Reader<MessageT>::Init() {
	receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);
}

bool Writer<MessageT>::Init() {
   /*  ....   */
    transmitter_ =
        transport::Transport::Instance()->CreateTransmitter<MessageT>(role_attr_);
}

我们不妨把这里作为突破口,打开新世界的大门。

ReceiverManagerPermalink

之前提到过,Reader 在初始化时,需要用 ReceiverManager::GetReceiver() 获得 Receiver 对象。它的内部分封装了一个 unordered_map 表,将信道名字和与之对应的 Receiver 对象保存在表中。再看看下面的代码,可得出一个结论,如果同一个进程内,不同的 Reader 对象订阅同一个信道,事实上使用的是同一个 Receiver 2。再看看那个很长的 CreateReceiver() 函数调用,除了传递一个配置信息参数外,还有一个很长的回调函数。回调函数会做:

  • 加入一个 Transport 事件,类型为 Dispatch
  • 调用数据分发器 DataDispatcher::Dispatch() 函数
  • 加入一个 Transport 事件,类型为 Notify

这些步骤具体做了什么?我们目前还不知道,求知的欲望驱使着我们继续往下探索。

template <typename MessageT>
auto ReceiverManager<MessageT>::GetReceiver(const proto::RoleAttributes& role_attr) 
    -> typename std::shared_ptr<transport::Receiver<MessageT>> {
  // because multi reader for one channel will write datacache multi times,
  // so reader for datacache we use map to keep one instance for per channel
  const std::string& channel_name = role_attr.channel_name();
    // 如果信道名字对应的 Receiver 还没有创建 那就创建
  if (receiver_map_.count(channel_name) == 0) {
      // 一个巨长的 CreateReceiver() 函数调用
    receiver_map_[channel_name] = transport::Transport::Instance()->CreateReceiver<MessageT>(
            		role_attr, [](const std::shared_ptr<MessageT>& msg,
                          const transport::MessageInfo& msg_info,
                          const proto::RoleAttributes& reader_attr) {
              (void)msg_info;
              (void)reader_attr;
              PerfEventCache::Instance()->AddTransportEvent(
                  TransPerf::DISPATCH, reader_attr.channel_id(),
                  msg_info.seq_num());
              data::DataDispatcher<MessageT>::Instance()->Dispatch(
                  reader_attr.channel_id(), msg);
              PerfEventCache::Instance()->AddTransportEvent(
                  TransPerf::NOTIFY, reader_attr.channel_id(),
                  msg_info.seq_num());
            });
  }
    // 如果已经有了 直接返回
  return receiver_map_[channel_name];
}

TransportPermalink

Transport 类,单例模式,有如下指针成员,emm……在还没完全弄懂底层代码的情况下,也很难告诉你们这些类的具体作用:

  • Participant 类 FastRtps 相关
  • Notifier 类 与 Shm 相关
  • IntraDispatcher 类 Intra 的分发器
  • ShmDispatcher 类 Shm 的分发器
  • RtpsDispatcher 类 Rtps 的分发器
class Transport {
 private:
  ParticipantPtr participant_ = nullptr;
  NotifierPtr notifier_ = nullptr;
  IntraDispatcherPtr intra_dispatcher_ = nullptr;
  ShmDispatcherPtr shm_dispatcher_ = nullptr;
  RtpsDispatcherPtr rtps_dispatcher_ = nullptr;
}

和之前 Cyber RT 给人的感觉一样,一旦它要创建什么重要的东西,不调用个几层是根本不可能完成的。在这里,Transport 类的两个函数 CreateTransmitterCreateReceiver 都会根据传入的 mode ,去构造出对应的子类对象,分别对应我在这篇博客开头提到的四种不同场景下的传输实现类。哦,提醒一下,默认的 mode 是 Hybrid ,也就是三种模式混用。

template <typename M>
auto Transport::CreateTransmitter(const RoleAttributes& attr,
  const OptionalMode& mode) -> typename std::shared_ptr<Transmitter<M>> {
  /*  ....  */
 // 往 modified_attr 中加入 qos profile
 // 根据各种模式 创建相应的 Transmitter 子类
  switch (mode) {
    case OptionalMode::INTRA:
      transmitter = std::make_shared<IntraTransmitter<M>>(modified_attr);
      break;
    case OptionalMode::SHM:
      transmitter = std::make_shared<ShmTransmitter<M>>(modified_attr);
      break;
    case OptionalMode::RTPS:
      transmitter =
          std::make_shared<RtpsTransmitter<M>>(modified_attr, participant());
      break;
    default:
      transmitter =
          std::make_shared<HybridTransmitter<M>>(modified_attr, participant());
      break;
  }
  if (mode != OptionalMode::HYBRID)
    transmitter->Enable();
  return transmitter;
}

Transmitter 类是写消息,而 Receiver 类是读消息,因此 Receiver 类比 Transmitter 类多了一个参数 MessageListener ,其本质就是个回调函数:

using MessageListener = std::function<void(const MessagePtr&, const MessageInfo&, const RoleAttributes&)>;
template <typename M>
auto Transport::CreateReceiver(const RoleAttributes& attr,
    const typename Receiver<M>::MessageListener& msg_listener,
    const OptionalMode& mode) -> typename std::shared_ptr<Receiver<M>> {
 /*  ....   */
 // 往 modified_attr 中加入 qos profile
 // 根据各种模式 创建相应的 Receiver 子类
  switch (mode) {
    case OptionalMode::INTRA:
      receiver =
          std::make_shared<IntraReceiver<M>>(modified_attr, msg_listener);
      break;
    case OptionalMode::SHM:
      receiver = std::make_shared<ShmReceiver<M>>(modified_attr, msg_listener);
      break;
    case OptionalMode::RTPS:
      receiver = std::make_shared<RtpsReceiver<M>>(modified_attr, msg_listener);
      break;
    default:
      receiver = std::make_shared<HybridReceiver<M>>(
          modified_attr, msg_listener, participant());
      break;
  }
  if (mode != OptionalMode::HYBRID)
    receiver->Enable();
  return receiver;
}

最后一部分代码:在 Receiver 对象被创建时,只要模式不是 Hybrid,都会立刻调用 Receiver::Enable() 函数开启接收。

Receiver & TransmitterPermalink

这两个类是 EndPoint 的子类。关于 Endpoint 类5,那可就是整个架构的类继承的终点了,其内部有三个成员

  • bool enabled_ 用来标记是否被启用
  • Identity id_ 用于标识,对于每个 Endpoint 对象拥有唯一的 id 号,其子类也是用这个来进行标识
  • RoleAttributes attr_ 用来记录配置文件中的数据。

其中 Receiver 类只有一个回调函数 msg_listener_,该回调函数就是 Receiver 构造时传入的函数。在新消息到来时,会被调用👇:

template <typename M>
void Receiver<M>::OnNewMessage(const MessagePtr& msg,
                               const MessageInfo& msg_info) {
  if (msg_listener_ != nullptr)
    msg_listener_(msg, msg_info, attr_);
}

ReceiverManager 那一小节中,已经说到该函数有三个步骤,其中5,调用 DataDispatcher::Dispatch 非常关键。因为从这步可以看出上层和底层最终完成了闭环。当底层 Receiver 的回调函数 msg_listener 收到消息被调用时,上层的分发器 DataDispatcher 会把来自底层的消息发到等待的消息缓存里,然后调用上层的通知器 DataNotifier::Notify() ,唤醒对应的 Component 的封装了 Process() 的协程,让协程处理这些消息。

再看看 Receiver 的四个子类,每个子类都包含了相应的 Dispatcher 的指针,例如,RtpsReceiver 类含有 RtpsDispatcherPtr 成员。这些 Dispatcher 的功能就是增删监听者,从而让 Receiver 关闭或开启,比如:

template <typename M>
void RtpsReceiver<M>::Enable() {
 /*  ....  */
  dispatcher_->AddListener<M>(
      this->attr_, std::bind(&RtpsReceiver<M>::OnNewMessage, this,
                             std::placeholders::_1, std::placeholders::_2));
  this->enabled_ = true;
}

template <typename M>
void RtpsReceiver<M>::Disable() {
  dispatcher_->RemoveListener<M>(this->attr_);
  this->enabled_ = false;
}

简单地介绍一下 Dispatcher 类(别和下面的 DataDispatcher 搞混了)。它主要负责记录一个 channel_id 和对应 ListenerHandlerBasePtr 的关系表。而AddListener()RemoveListener() 函数是从关系表中,拿到给定信道的对应 ListenerHandlerBase ,并在这上面连接(Connect)或不连接(Disconnect)相应的回调函数。由于时间精力有限,这边解释得比较混乱,若想具体了解其工作机制,可以参考文章最后的文献。总的来说,这有点像在 Qt 中实现的信号槽机制:信号在特定情况下被发射出去,对应的信号响应函数在槽中监听。信号与槽通过 Connect 函数关联,一个信号可以发射到多个槽,多个信号可以被一个槽监听。


再来看看 Transmitter 类,它是真正的数据写者的基类。它有两个成员,分别为序列号 seq_num_ 和消息信息 msg_info_

template <typename M>
class Transmitter : public Endpoint {
 public:
  explicit Transmitter(const RoleAttributes& attr);
  virtual bool Transmit(const MessagePtr& msg);
  virtual bool Transmit(const MessagePtr& msg, const MessageInfo& msg_info) = 0;

  uint64_t NextSeqNum() { return ++seq_num_; }
  uint64_t seq_num() const { return seq_num_; }
 protected:
  uint64_t seq_num_;
  MessageInfo msg_info_;
};

我主要研究其中的 Transmit() 函数,其四个子类都具体实现了 Transmit() 函数,如果你仔细看过前一篇博客,就知道这也是 Writer 类一直在调用的函数。那么这个 Transmit() 函数有什么具体步骤呢?

  • Writer 调用 Transmitter::Transmit() 函数
  • 设置 msg_info::seq_num = NextSeqNum 消息的序列号
  • 加入一个 Transport事件,类型为 Transmit Begin
  • 调用子类实现的 Transmit() 函数,该函数通过传入一条消息即可以完成数据写入任务。
template <typename M>
bool Transmitter<M>::Transmit(const MessagePtr& msg) {
  msg_info_.set_seq_num(NextSeqNum());
  PerfEventCache::Instance()->AddTransportEvent(
      TransPerf::TRANSMIT_BEGIN, attr_.channel_id(), msg_info_.seq_num());
  return Transmit(msg, msg_info_);
}

好,我对 ReceiverTransmitter 类的介绍就到这里了。我不打算再继续介绍它们的子类以及更加底层的设计是如何实现的,因为这边牵扯到非常多的技术细节,弄懂这些会消耗非常多的精力,而且这些东西也远远超出了课题组目前的要求。

Data 部分Permalink

通信架构的内容已经全部介绍完了(至少对于发布—订阅通信方式来说),但还是感觉缺了什么东西,把这两者有效地连接起来😅。没错,我至今还没有说清楚,Writer 发布的消息是如何让 Reader 看到的。而这就牵扯到 cyber/data 中实现的类了。

DataVisitorPermalink

先来说说数据访问类 DataVisitor ,它是 DataVisitorBase 的子类。它的主要成员和构造函数如下。先来说几个明显可以得到的结论:

  • DataVisitor 对象都会有若干个缓存类 ChannelBuffer ,还有一个 DataFusion 对象,融合了所有的消息类型
  • 在初始化的时候,首先构建这些 ChannelBuffer ,然后它们都会被加入到相应类型DataDispatcher 的管理中
  • data_notifier_ (存在于 DataVisitorBase 中),会向信道 0 加入一个 notifier_ ,类型为 struct Notifier { std::function<void()> callback; };,这表明信道 0 注定与其他信道不一般 :thinking:
  • data_fusion_ 会构建并指向 AllLatest 对象,从类型来看,它们整合了所有的消息类型,应当用于信息的最后收集发送
template <typename M0, typename M1>
class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase {
 public:
  explicit DataVisitor(const std::vector<VisitorConfig>& configs)
      : buffer_m0_(configs[0].channel_id, new BufferType<M0>(configs[0].queue_size)),
     buffer_m1_(configs[1].channel_id, new BufferType<M1>(configs[1].queue_size)) {
    DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);
    DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);
    data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);
    data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_);
  }
 private:
  fusion::DataFusion<M0, M1>* data_fusion_ = nullptr;
  ChannelBuffer<M0> buffer_m0_;
  ChannelBuffer<M1> buffer_m1_;
};

/** DataVisitorBase 内有
 * 指向 DataNotifier 的指针
 * 封装为 Notifier 的回调函数  */
class DataVisitorBase {
 public:
  void RegisterNotifyCallback(std::function<void()>&& callback) {
    notifier_->callback = callback;
  }
 protected:
  uint64_t next_msg_index_ = 0;
  DataNotifier* data_notifier_ = DataNotifier::Instance();
  std::shared_ptr<Notifier> notifier_;
};

该类主要用于消息数据的访问,存放到来的消息数据,并提供接口供消息读取。事实上,我们之前在讨论组件的初始化的最后部分时见过它,也在订阅者初始化时遇见过它,但当时我都有意地略过了。现在我们重拾这部分内容(为了简单且不失一般性,我以两个信道的情况为例),把这部分彻底搞明白:

template<typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Initialize() {
   /*   ....   */
  // 创建 DataVisitor 和 RoutineFactory   最后创建任务
  std::vector<data::VisitorConfig> config_list;
  for (auto& reader : readers_)
    config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
   // 创建了两个信道的 DataVisitor 并用在了协程工厂类
  auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
  croutine::RoutineFactory factory = croutine::CreateRoutineFactory<M0, M1>(func, dv);
  return sched->CreateTask(factory, node_->Name());
}

上面代码做了:

  • 收集了所有的 Reader 对象的所读信道 id 和消息队列大小,放入到 config_list 后就创建 DataVisitor 对象

  • 在上面 DataVisitor 类的构造函数中,根据传入的信道 id 和消息队列尺寸,DataVisitor 内为其创建了两个 ChannelBuffer 作为缓冲区

    • 调用数据分发器将它们加入到 DataDispatcher 的管理中
    • 调用 DataNotifier::AddNotifier() 函数,传入第 0 个读者的信道 id ,加入到 DataNotifier 的管理中。DataDispatcherDataNotifier 类均为单例,之后我们会对它们做详细介绍
    • 创建 DataFusion 对象,这也是之后要了解的╮(╯▽╰)╭
  • 创建协程工厂,并构建出要封装为协程的函数:

      factory.create_routine = [=]() {
      return [=]() {
        std::shared_ptr<M0> msg0;     std::shared_ptr<M1> msg1;
        for (;;) {
          CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
          if (dv->TryFetch(msg0, msg1)) {  // 取数据
            f(msg0, msg1);	// f 函数就是组件初始化时创建的 func 函数
            CRoutine::Yield(RoutineState::READY);
          } else
            CRoutine::Yield();
        }
      }; };
    

    协程做的就是调用 dv->TryFetch() 取数据(下文会详细说明),如果成功就调用组件的 Process() 函数,且协程的状态从等待数据转变为了就绪,而一旦协程就绪,就可以被 Processor 类运行

  • Scheduler 创建任务,在CreateTask() 函数中,调用 visitor->RegisterNotifyCallback() 函数

      visitor->RegisterNotifyCallback([this, task_id]() {
        if (cyber_unlikely(stop_.load()))
          return;
        this->NotifyProcessor(task_id);
      });
    

    上面的函数被赋值给了 DataVisitorBase::notifier_ ,用于唤醒相应的协程来处理该新消息

说完这部分过程后,我发现我们至少还要理解一下 DataDispatcherDataNotifierDataFusionAllLatest 类😂😂😂。

DataDispatcherPermalink

千里之行,始于足下。先来看看数据分发类 DataDispatcher ,顾名思义,它将底层传来的数据进行分发,具体来说,当新消息到来时,通过 Dispatch() 函数把它们放到该信道下的所有消息缓冲区中。它是个单例模式,但是个模板类,意味着每一个消息类型会有对应的一个唯一的 DataDispatcher 对象。类内记录了一个信道 id 与多个 CacheBuffer 对象对应的表。注意到:一个信道可以有多个订阅者 Reader ,每个订阅者拥有一个 CacheBuffer 缓冲区,而这个缓冲区就是之前 DataVisitor 类在构造时给每个消息类型创建的 ChannelBuffer

template <typename T>
class DataDispatcher {
 public:
  void AddBuffer(const ChannelBuffer<T>& channel_buffer);
  bool Dispatch(const uint64_t channel_id, const std::shared_ptr<T>& msg);
 private:
  DataNotifier* notifier_ = DataNotifier::Instance();
  AtomicHashMap<uint64_t, BufferVector> buffers_map_;
};

template <typename T>
bool DataDispatcher<T>::Dispatch(const uint64_t channel_id,
                                 const std::shared_ptr<T>& msg) {
  BufferVector* buffers = nullptr;
  if (apollo::cyber::IsShutdown())
    return false;
  // 每次收到该信道的消息时,就会给所有缓冲区都放一份消息
  if (buffers_map_.Get(channel_id, &buffers)) {
    for (auto& buffer_wptr : *buffers)
      if (auto buffer = buffer_wptr.lock()) {
        std::lock_guard<std::mutex> lock(buffer->Mutex());
       // 向 CacheBuffer 填入数据
        buffer->Fill(msg);
      }
  } else
    return false;
  return notifier_->Notify(channel_id);
}

DataDispatcher::Dispatch() 函数非常重要,在 ReceiverManagerReceiver && Transmitter 中我们已经提到,他是连接上层和底层的最关键一环。在 Receiver 对象每次收到该信道的消息时,就会调用DataDispatcher::Dispatch() 函数分发刚收到的数据,函数会先从表中取出所有对应信道的缓冲区,然后调用 CacheBuffer::Fill() 函数来给缓冲区填数据(稍后介绍这个函数),最后调用 DataNotifier::Notify() 函数,唤醒它们对应的协程来取数据并运行。现在你应该明白,为何 DataVisitor 在构造时,需要把刚刚建立的缓冲区给 DataDispatcher 管理,不然的话,缓冲区拿不到消息啊。

DataNotifierPermalink

再来看看 DataNotifier 类。它是个单例模式,类内有一个信道 id 与多个 Notifer 对应的表,这是考虑到一个信道可以有多个订阅者。很显然,前文提到的在 DataVisitor 构造时,调用 AddNotifier() 就是要把自己的 Notifier 存到这个表中。

class DataNotifier {
 public:
  void AddNotifier(uint64_t channel_id, const std::shared_ptr<Notifier>& notifier);
  bool Notify(const uint64_t channel_id);
 private:
  AtomicHashMap<uint64_t, NotifyVector> notifies_map_;
};

至于 AddNotifier() 函数的实现,emm……很平凡,无非就是找到对应的信道 id ,然后将参数中的 Notifier 放入到数组里(如果没有数组,新建一个)。重要的是唤醒函数 Notify() ,该函数内部会调用 notifier->callback() ,回顾一下,这个 notifier 是在 Scheduler 创建任务时被设置的,内含有 NotifyProcessor() 函数,可以唤醒协程。在 Receiver && TransmitterReceiverManager 也提到,第二步的分发器最终会调用该函数,唤醒所有监听该信道的协程,来处理到来的消息。这样,你也就明白为什么 DataVisitor 类在构造时要把 notifier 加入进去了,不然的话信道来了个消息,就没法唤醒协程,Reader 就不知道了呀。

inline bool DataNotifier::Notify(const uint64_t channel_id) {
  NotifyVector* notifies = nullptr;
  if (notifies_map_.Get(channel_id, &notifies)) {
    for (auto& notifier : *notifies)
      if (notifier && notifier->callback)
        notifier->callback();
    return true;
  return false;
}

DataFusion & AllLatestPermalink

再来看看 DataVisitor 构造函数的最后一步,创建 DataFusion 对象,看看名字,就应该明白该对象用于信道数据的融合。DataFusionAllLatest 的基类,DataFusion 类十分简单,仅提供了一个 Fusion() 接口,具体由 AllLatest 实现。所以,我们重点看一下 AllLatest 类,哈,听名字就知道它会取所有信道中的最新值,再结合它是 DataFusion 的子类,所以主要功能应该是融合多个信道的最新数据

我还是以两个信道的情况为例,该类成员有几个 ChannelBuffer 类,其中一个比较特殊,类型是数据融合的 buffer_fusion_

template <typename M0, typename M1>
class AllLatest<M0, M1, NullType, NullType> : public DataFusion<M0, M1> {
     // 所谓融合消息,就是放在 tuple 里
 using FusionDataType = std::tuple<std::shared_ptr<M0>, std::shared_ptr<M1>>;
 private:
  ChannelBuffer<M0> buffer_m0_;
  ChannelBuffer<M1> buffer_m1_;
  ChannelBuffer<FusionDataType> buffer_fusion_;
};

在构造函数中,特殊的信道 0 的消息缓冲区会调用 SetFusionCallback() 来设置回调函数 :point_down:为方便表述,我给它起名 FusionFunc 。从下面的代码中看出, FusionFunc 先判断是否所有信道都有消息,并获取最新的消息,如果都有消息的话就将这些消息融合,即用 std::tuple 封装,再调用 Fill() 函数填入到 buffer_fusion_CacheBuffer 中。

AllLatest(const ChannelBuffer<M0>& buffer_0, const ChannelBuffer<M1>& buffer_1)
      : buffer_m0_(buffer_0), buffer_m1_(buffer_1),
        buffer_fusion_(buffer_m0_.channel_id(),
           new CacheBuffer<std::shared_ptr<FusionDataType>>(
              buffer_0.Buffer()->Capacity() - uint64_t(1))) {
    buffer_m0_.Buffer()->SetFusionCallback( // buffer0 设置融合的回调函数
        [this](const std::shared_ptr<M0>& m0) {
          std::shared_ptr<M1> m1;
          if (!buffer_m1_.Latest(m1)) // 信道内是否有消息 有的话取出最后一个
            return;
          auto data = std::make_shared<FusionDataType>(m0, m1);
          std::lock_guard<std::mutex> lg(buffer_fusion_.Buffer()->Mutex());
          // 填充到消息中
          buffer_fusion_.Buffer()->Fill(data);
        });
  }

何时调用 FusionFunc 呢?答案在这个 Fill() 函数(见下代码)中,它诡计多端——如果 CacheBuffer 有回调函数 FusionFunc,会调用回调函数;如果没有,会把接收到的数据放入缓冲区中。很显然在上面的构造函数中,只有信道 0 设置了回调函数 FusionFunc因此当信道 0 有数据到来, DataDispatcher::Dispatch() 被调用时(代码见 DataDispatcher 部分),进而调用 Fill() 函数时, FusionFunc 才会被调用,将最新的消息融合,并将融合的消息填入到 buffer_fusion_ 中。而其他信道的数据到来时, Fill() 函数只是单纯往对应的 CacheBuffer 中填数据。

  void CacheBuffer::Fill(const T& value) {
    if (fusion_callback_)
      fusion_callback_(value);  // buffer 0 运行这个
    else {
      if (Full()) {
        buffer_[GetIndex(head_)] = value;
        ++head_;     ++tail_;
      } else {
        buffer_[GetIndex(tail_ + 1)] = value;
        ++tail_;
      }
    }
  }

在协程的处理函数中(回顾一下, DataVisitor 的创建协程工厂中提到的)会调用 DataVisitor::TryFetch() 函数,再调用 Fusion() 函数(代码如下),它从融合数据的缓冲区 buffer_fusion_ 中拿走(Fetch)融合消息,这也就意味着同时拿多个信道上的最新消息,保证了每次给 Component::Process() 函数的参数都必须“全员到齐”,并且所有信息都是最新的。综上所述,只有信道 0 收到消息后,才会融合其他信道的消息,往往主导通信处理的节奏,因此信道 0 的选取就比较关键了。

bool DataVisitor::TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) {
  if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) {
    next_msg_index_++;
    return true;
  }
  return false;
}

bool AllLatest::Fusion(uint64_t* index, std::shared_ptr<M0>& m0,
              std::shared_ptr<M1>& m1) override {
  std::shared_ptr<FusionDataType> fusion_data;
  // 从 fusion 缓冲区中取数据
  if (!buffer_fusion_.Fetch(index, fusion_data))
    return false;
  // 得到了数据 分别赋值给 m0 m1
  m0 = std::get<0>(*fusion_data);
  m1 = std::get<1>(*fusion_data);
  return true;
}

ChannelBuffer & CacheBufferPermalink

前文我一直提到一个词,叫做缓冲区。事实上这个词具体指向了两个类,ChannelBufferCacheBuffer 类。为了让读者更好地理解“缓冲区”这个词,我简要地介绍一下这两个类。ChannelBuffer 类包含了两个成员:信道 id 和 指向 CacheBuffer 的指针。它的函数 Fetch()Latest() 分别用于取对应索引的消息和取得最新消息。而 CacheBuffer 类,其实质就是一个循环队列,用来放置某个信道产生的数据。需要注意的是,CacheBuffer 占用的内存是恒定的,因为里面的数组长度一开始就被限定了,所以,一旦缓冲区装满了,它会毫不犹豫地丢弃最旧的消息,推入最新的消息。具体的队列实现在 cyber/data/cache_buffer.hcyber/data/channel_buffer.h,很简单,有兴趣可以直接读代码。

总结Permalink

(⊙o⊙)哦终于,我把 Cyber RT 所有的通信机制都看完了,哦,BTW,上篇通信链接在这里。现在,在把所有的内容都搞明白后,让我们理一理自己昏沉的头脑,跳脱出代码的边边框框,从上帝视角审视一下数据是如何流通的。

这里有两张非常好的图,感谢 [2],我可以不用自己动手,花费数小时画图了😀

在上面这张图中,仍然以两个信道的 Component 为例,从左上角出发:

  • Component 初始化建成了两个 Reader 对象,然后创建了一个 DataVisitor<M0, M1> 对象
  • 两个 Reader 对象也分别创建了一个 DataVisitor<> 对象,回顾一下上篇提到的 Reader 的初始化过程
  • 这些 DataVisitor 对象会分别创建出 ChannelBuffer ,并使用 DataDispatcher 管理这些缓冲区(注意看它内部的表)
  • 当接收到新消息后,DataDispatcher 会给对应的信道上的所有缓冲区进行分派,特别地,若信道 0 有新消息,还会对其他消息进行融合(注意看 AllLatest 对象)
  • 之后,DataVisitor 会使用 DataNotifier 对象,唤醒相应的协程,处理收到的数据

现在,我们再来看看底层的通信架构,虽然有些部分我略去未讲,但这张图我们还是可以看明白的🐶。这次,我们从最右边开始看起。

  • 可以看出两个 Reader 对象共用了一个 Receiver 。这是因为在同一个进程内,不同的 Reader 对象订阅同一个信道,就会使用同一个 Receiver
  • 默认选择的 Receiver 是 hybrid 的,因此需要三个底层接收类 IntraReceiverShmReceiverRtpsReceiver 配合
  • 在创建 Receiver 时,监听者处理函数 msg_listener_ 就已经“准备就绪”了。
  • Dispatcher 的表中记录了监听者和它们负责的信道,并把它们连接 Connect 起来,如同 Qt 中的信号槽机制
  • 一有新消息到达(最左边的函数是我陌生的),那么就会立刻触发信号槽机制,调用 msg_listener 函数,之后就是上层 DataDispatcher 的工作了

最后的最后,事实上我还是落了一个东西:服务—客户通信方式😓😓😂😂。的确,这篇博客的内容全是关于发布—订阅通信方式的,对于 ServiceClient ,几乎没有提及,之后,我就会补上这一部分服务发现的内容。

参考Permalink

[1] Dig into Apollo - Cyber

[2] 自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的通信传输

[3] 百度Apollo系统学习-Cyber RT 通信-上层

[4] 百度 Apollo Cyber RT简介、基本概念以及与 ROS 对照

[5] 百度Apollo系统学习-Cyber RT 通信-底层

[6] cyber-rt.readthedocs.io

[7] 自动驾驶汽车平台技术基础/杨世春等编著. —北京:清华大学出版社


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK