7

AMQP协议学习

 3 years ago
source link: https://zhuanlan.zhihu.com/p/147675691
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

AMQP协议学习

Node.js/Python/Go

最近在阅读AMQP协议。AMQP协议算是消息队列里无法绕开的一个协议,通过阅读该协议来学习消息队列以及自有协议设计。该协议的阅读体验非常好,协议本身没有过于复杂,规范里也会解释各个地方的设计思路。

该文章是基于AMQP 0.9.1编写,AMQP 0.9.1规范发布于2008年。所以后面提到的很多特性在当时是先进的,但放到现在可能就习以为常了。比如Kafka于2011年发布第一版,RTMP协议1.0版本于2012年发布,HTTP/2于2015年发布,

AMQP协议中的各个概念和组件

AMQP的全称为:Advanced Message Queuing Protocol(高级消息队列协议)

AMQP所覆盖的内容包含了网络协议以及服务端服务

  • 一套被称作”高级消息队列协议模型(AMQ Model)“的消息能力定义。该模型涵盖了Broker服务中用于路由和存储消息的组件,以及把这些组件连在一起的规则。
  • 一个网络层协议AMQP。能够让客户端程序与实现了AMQ Model的服务端进行通信。

AMQP像是一个把东西连在一起的语言,而不是一个系统。其设计目标是:让服务端可通过协议编程。理解了AMQP的这个设计目标,也就能够理解其协议的设计思路了。

AMQP协议是一个二进制协议,具有一些现代特性:多通道(multi-channel),可协商(negotiated),异步、安全、便携、语言中立、高效的。其协议主要分成两层:

功能层(Functional Layer):定义了一系列的命令

传输层(Transport Layer):携带了从应用 → 服务端的方法,用于处理多路复用、分帧、编码、心跳、data-representation、错误处理。

这样分层之后,可以把传输层替换为其它传输协议,而不需要修改功能层。同样,也可以使用同样的传输层,基于此实现不同的上层协议。可能RabbitMQ也是因为类似的原因,能够比较容易的支持MQTT、STOMP等协议的吧。

AMQ Model的设计是由以下需求驱动的:

  • 确保符合标准的实现之间的互操作性。
  • 提供清晰且直接的方式控制QoS
  • 保持一致和明确的命名
  • 通过协议能够修改服务端的各种配置
  • 使用可以轻松映射到应用程序级API的命令符号
  • 清晰,每个操作只能做一件事。

AMQP传输层是由以下需求驱动的

  • 紧凑。能够快速封包和解包
  • 可以携带任意大小的消息,没有明显的限制
  • 同一个连接可以承载多个通道(Channel)
  • 长时间存活,没有显著的限制
  • 允许异步命令流水线
  • 容易扩展。易于处理新需求、或者变更需求
  • 使用强大的断言模型,可修复
  • 对编程语言保持中立
  • 适合代码生成过程

在设计过程中,希望能够支持不同的消息架构:

  • 先存后发模型。有多个Writer,只有一个Reader
  • 分散工作负载。有多个Writer和多个Reader
  • 发布订阅模型,多个Writer和多个reader
  • 基于消息内容的路由,多个Writer,多个Reader
  • 队列文件传输,多个Writer,多个Reader
  • 两个节点之间点对点连接
  • 市场数据(Market data)分发。多个数据源,多个Reader

AMQ Model

主要包含了三个主要的组件:

  • exchange(交换器):从Publisher程序中收取消息,并把这些消息根据一些规则路由到消息队列(Message Queue)中
  • message queue(消息队列):存储消息。直到消息被安全的投递给了消费者。
  • binding :定义了 message queueexchange 之间的关系,提供了消息路由的规则。

AMQ Model的整体架构

可以把AMQP的架构理解为一个邮件服务:

  • 一个AMQP消息类似于一封邮件信息
  • 消息队列类似于一个邮箱(Mailbox)
  • 消费者类似一个邮件客户端,能够拉取和删除邮件。
  • 交换器类似一个MTA(邮件服务器)。检查邮件,基于邮件里的路由信息、路由表,来决定如何把邮件发送到一个或多个邮箱里。
  • Routing Key类似于邮件中的To:Cc:Bcc: 的地址。不包含服务端信息。
  • 每一个交换器实例,类似于各个MTA进程。用于处理不同子域名的邮件,或者特定类型的邮件。
  • Binding 类似于MTA中的路由表。

在AMQP里,生产者直接把消息发到服务端,服务端再把这些消息路由到邮箱中。消费者直接从邮箱里取消息。但在AMQP之前的很多中间件中,发布者是把消息直接发到对应的邮箱里(类似于存储发布队列),或者直接发到邮件列表里(类似topic订阅)。

这里的主要区别在于,用户可以控制消息队列和交换器的绑定规则,而不是依赖中间件自身的代码。这样就可以做很多有趣的事情。比如定义一个这样的规则:把所有包含这样和这样Header的消息,都复制一份到这个消息队列中。“

而这一点也是我认为AMQP和其他一些消息队列最重要的差异。

消息的生命周期

  1. 消息由生产者产生。生产者把内容放到消息里,并设置一些属性以及消息的路由。然后生产者把消息发给服务端。
  2. 服务端收到消息,交换器(大部分情况)把消息路由到若干个该服务器上的消息队列中。如果这个消息找不到路由,则会丢弃或者退回给生产者(生产者可自行决定)。
  3. 一条消息可以存在于许多消息队列中。 服务器可以通过复制消息,引用计数等方式来实现。这不会影响互操作性。 但是,将一条消息路由到多个消息队列时,每个消息队列上的消息都是相同的。 没有可以区分各种副本的唯一标识符。
  4. 消息到达消息队列。消息队列会立即尝试通过AMQP将其传递给消费者。 如果做不到,消息队列将消息存储(按生产者的要求存储在内存中或磁盘上),并等待消费者准备就绪。 如果没有消费者,则消息队列可以通过AMQP将消息返回给生产者(同样,如果生产者要求这样做)。
  5. 当消息队列可以将消息传递给消费者时,它将消息从其内部缓冲区中删除。以立即删除,也可以在使用者确认其已成功处理消息之后删除(ack)。 由消费者选择“确认”消息的方式和时间。消费者也可以拒绝消息(否定确认)。
  6. 生产者发消息与消费者确认,被分组成一个事务。当一个应用同时扮演多个角色时:发消息,发ack,commit或者回滚事务。消息从服务端投递给消费者这个过程不是事务的。消费者对消息进行确认就够了。

在这个过程中,生产者只能把所有消息发到一个单点(交换器),而不能直接把消息发到某个消息队列(message-queue)中。

交换器(exchange)的生命周期

每个AMQP服务端都会自己创建一些交换器,这些不能被销毁。AMQP程序也可以创建其自己的交换器。AMQP并不使用 create 这个方法,而是使用 declare 方法来表示:如果不存在,则创建,存在了则继续。程序可以创建交换器用于私有使用,并在任务完成后销毁它们。虽然AMQP提供了销毁交换器的方法,但一般来讲程序不需要销户它。

队列(queue)的生命周期

队列分为两种,

  • 持久化消息队列:由很多消费者共享。当消费者都退出后,队列依然存在,并会继续收集消息。
  • 临时消息队列:临时消息队列对于消费者是私有和绑定的。当消费者断开连接,则消息队列被删除。
临时消息队列的生命周期

绑定(Bindings)

绑定是交换器和消息队列之间的关系,告诉交换器如何路有消息。

// 绑定命令的伪代码
Queue.Bind <queue> TO <exchange> WHERE <condition>

几个经典的使用案例:共享队列、私有的回复队列、发布-订阅。

构造一个共享队列

Queue.Declare queue=app.svc01 // 声明一个叫做 app.svc01 的队列

// Comsumer
Basic.Consume queue=app.svc01 // 消费者消费该队列

// Producer
Basic.Publish routing-key=app.svc01 // 生产者发布消息。routingKey为队列名称
https://www.rabbitmq.com/tutorials/tutorial-two-python.html

构造一个私有回复队列

一般来讲,回复队列是私有的、临时的、由服务端命名、只有一个消费者。(没有直接使用AMQP协议中的例子,而是使用了RabbitMQ的例子)

Queue.Declare queue=rpc_queue // 调用的队列

// Server
Basic.Consume queue=rpc_queue

// Client
Queue.Declare queue=<empty> exclusive=TRUE
S:Queue.Declare-Ok queue=amq.gen-X... // AMQP服务端告诉队列名称
Basic.Publish queue=rpc_queue reply_to=amq_gen-X... // 客户端向服务端发送请求

// Server
handleMessage()
// 服务端处理好消息后,向消息列的reply-to字段中的队列发送响应
Basic.Publish exchange=<empty> routing-key={message.replay_to}
https://www.rabbitmq.com/tutorials/tutorial-six-python.html

构造一个发布-订阅队列

在传统的中间件中,术语 subscription 含糊不清。至少包含两个概念:匹配消息的条件集,和一个临时队列用于存放匹配的消息。AMQP把这两部分拆成:bindingmessage queus。在AMQP中,并没有一个实体叫做 subscription

AMQP的发布订阅模型为:

  • 给一个消费者保留消息(一些场景下是多个消费者)
  • 从多个源收集消息,比如匹配Topic、消息的字段、或者内容等方式

订阅队列与命名队列或回复队列之间的关键区别在于,订阅队列名称与路由目的无关,并且路由是根据抽象的匹配条件完成的,而不是路由键字段的一对一匹配。

// Consumer
Queue.Declare queue=<empty> exclusive=TRUE
// 这里是使用服务端下发的队列名称,并设置为独占。
// 也可以使用约定的队列名称。这样就相当于把发布-订阅模型与共享队列组合使用了
S:Queue.Declare-Ok queue=tmp.2
Queue.Bind queue=tmp.2 TO exchange=amq.topic WHERE routing-key=*.orange.*
Basic.Consume queue=tmp.2

// Producer
Basic.Publish exchange=amq.topic routing-key=quick.orange.rabbit
https://www.rabbitmq.com/tutorials/tutorial-five-python.html

AMQP命令架构

中间件复杂度很高,所以设计协议时的挑战是要驯服其复杂性。AMQP采用方法是基于类来建立传统API模型。类中包含方法,并定义了方法明确应该做什么。

AMQP中有两种不同的方式进行对话:

  • 同步请求-响应。一个节点发送请求,另一个阶段发送响应。适用于性能不重要的方法。发送同步请求时,该节点直到收到回复后,才能发送下一个请求
  • 异步通知。一个节点发送数据,但是不期待回复。一般用于性能很重要的地方。异步请求会尽可能快的发送消息,不等待确认。只在需要的时候在更上层(比如消费者层)实现限流等功能。AMQP中可以没有确认,要么成功,要么就会收到关闭Channel或者连接的异常。如果需要明确的追踪成功或者失败,那么应该使用事务。

AMQP中的类

Connection类

AMQP是一个长连接协议。Connection被设计为长期使用的,可以携带多个Channel。Connection的生命周期是:

  1. 客户端打开到服务端的TCP/IP连接,发送协议头。这是客户端发送的数据里,唯一不能被解析为方法的数据。
  2. 服务端返回其协议版本、属性(比如支持的安全机制列表)。 the Start method
  3. 客户端选择安全机制 Start-Ok
  4. 服务端开始认证过程, 它使用SASL的质询-响应模型(challenge-response model)。它向客户端发送一个质询 Secure
  5. 客户端向服务端发送一个认证响应Secure-Ok。比如,如果使用 plain 认证机制,则响应会包含登录名和密码
  6. 客户端重复质询Secure或转到协商步骤,发送一系列参数,如最大帧大小 Tune
  7. 客户端接受,或者调低这些参数 Tune-Ok
  8. 客户端正式打开连接,并选择一个Vhost Open
  9. 服务端确认VHost有效 Open-Ok
  10. 客户端可以按照预期使用连接
  11. 当一个节点打算结束连接 Close
  12. 另一个节点需要结束握手 Close-Ok
  13. 服务端和客户端关闭Socket连接。

如果在发送或者收到 Open 或者 Open-Ok 之前,某一个节点发现了一个错误,则必须直接关闭Socket,且不发送任何数据。

Channel类

AMQP是一个多通道协议。Channel提供了一种方式,在比较重的TCP/IP连接上建立多个轻量级的连接。这会让协议对防火墙更加友好,因为端口使用是可预知的。它也意味着很容易支持流量调整和其他QoS特性。

Channels相互是独立的,可以同步执行不同的功能。可用带宽会在当前活动之间共享。

这里期望也鼓励多线程客户端程序应该使用 每个线程一个channel 的模型。不过,一个客户端在一个或多个AMQP服务端上打开多个连接也是可以的。

Channel的生命周期为:

  1. 客户端打开一个新通道 Open
  2. 服务端确认新通道准备就绪 Open-Ok
  3. 客户端和服务端按预期来使用通道.
  4. 一个节点关闭了通道 Close
  5. 另一个节点对通道关闭进行握手 Close-Ok

Exchange类

Exchange类能够让应用操作服务端的交换器。这个类能够让程序自己设置路由,而不是通过某些配置。不过大部分程序并不需要这个级别的复杂度,过去的中间件也不只支持这个语义。

Exchange的生命周期为:

  1. 客户端让服务端确保该exchange存在Declare。客户端可以细化为:“如果交换器不存在则进行创建” 或 “如果交换器不存在,警告我,不需要创建”
  2. 客户端向Exchange发消息
  3. 客户端也可以选择删掉Exchange Delete

Queue类

该类用于让程序管理服务端上的消息队列。几乎所有的消费者应用都是基本步骤,至少要验证使用的消息队列是否存在。

一个持久化消息队列的生命周期非常简单

  1. 客户端断言这个消息队列存在 Declare(设置 passive 参数)
  2. 服务端确认消息队列存在 Declare-Ok
  3. 客户端消息队列中读消息

一个临时消息队列的生命周期会更有趣些:

  1. 客户端创建消息队列 Declare(不提供队列名称,服务器会分配一个名称)。服务端确认 Declare-Ok
  2. 客户端在消息队列上启动一个消费者
  3. 客户端取消消费,可以是显示取消,也可以是通过关闭通道或者连接连接隐式取消的
  4. 当最后一个消费者从消息队列中消失的时候,在过了礼貌性超时后,服务端会删除消息队列

AMQP实现了Topic订阅的分发模型。这可以让订阅在合作的订阅者间进行负载均衡。涉及到额外的绑定阶段的生命周期:

  1. 客户端创建一个队列Declare,服务端确认Declare-Ok
  2. 客户端绑定消息队列到一个topic exchange上Bind,服务端确认Bind-Ok
  3. 客户端像之前一样使用消息队列。

Basic类

Basic实现本规范中描述的消息功能。支持如下语义:

  • 从客户端→服务端发消息。异步Publish
  • 开始或者停止消费ConsumeCancel
  • 从服务端到客户端发消息。异步DeliverReturn
  • 确认消息AckReject
  • 同步的从消息队列中读取消息Get

事务类:

AMQP支持两种类型的事务:

  1. 自动事务。每个发布的消息和应答都处理为独立事务.
  2. 服务端本地事务:服务器会缓存发布的消息和应答,并会根据需要由client来提交它们.

Transaction 类(“tx”) 使应用程序可访问第二种类型,即服务器事务。这个类的语义是:

  1. 应用程序要求服务端事务,在需要的每个channel里Select
  2. 应用程序做一些工作PublishAck
  3. 应用程序提交或回滚工作CommitRoll-back
  4. 应用程序正常工作,循环往复。

事务包含发布消息和ack,不包含分发。所以,回滚并不能重入队列或者重新分发任何消息。客户端有权在事务中确认这些消息。

AMQP的功能描述,一定程度上也是RabbitMQ的功能描述,不过RabbitMQ基于AMQP做了一些扩展

消息和内容

消息会携带一些属性,以及具体内容(二进制数据)

消息是可被持久化的。持久化消息是可以安全的存在硬盘上的,即使发生了验证的网络错误、服务端崩溃溢出等情况,也可以确保被投递。

消息可以有优先级。同一个队列中,高优先级的消息会比低优先级的消息先被发送。当消息需要被丢弃时(比如服务端内存不足等),将会优先丢弃低优先级消息

服务端一定不能修改消息的内容。但服务端可能会在消息头上添加一些属性,但一定不会移除或者修改已经存在的属性。

虚拟主机(VHost)

虚拟主机是服务端的一个数据分区。在多租户使用是,可以方便进行管理。

虚拟主机有自己的命名空间、交换器、消息队列等等。所有连接,只可能和一个虚拟主机建立。

交换器(Exchange)

交换器是一个虚拟主机内的消息路由Agent。用于处理消息的路由信息(一般是Routing-Key),然后将其发送到消息队列或者内部服务中。交换器可能是持久化的、临时的、自动删除的。交换器把消息路由到消息队列时可以是并行的。这会创建一个消息的多个实例。

Direct 交换器

  1. 一个消息队列使用RoutingKey K 绑定到交换器
  2. 生产者向交换器发送RoutingKey为R的消息
  3. K=R时,消息被转发到该消息队列中

Fanout 交换器

  1. 一个消息队列没有使用任何参数绑定交换器
  2. 生产者向交换器发了一条消息
  3. 这个消息无条件的发送到该消息队列

Topic 交换器

  1. 消息队列使用路由规则 P 绑定到交换器
  2. 生产者使用RoutingKey R 发送消息到交换器
  3. 如果R 能够匹配 P,则把消息发到该消息队列。

RoutingKey必须由若干个被点.分隔的单词组成。每个单词只能包含字母和数字。其中 * 匹配一个单词,# 匹配0个或者多个单词。比如 *.stock.# 匹配 usd.stock eur.stock.db 但是不匹配 stock.nasdaq

Headers 交换器

  1. 消息队列使用Header的参数表来绑定。不适用RoutingKey
  2. 生产者向交换器发送消息,Header中包含了指定的键值对
  3. 如果匹配,则传给消息队列。
format=json,type=log,x-match=all
format=line,type=log,x-match=any

如果 x-matchall,则必须都匹配才行。如果x-matchany,则有任意一个header匹配即可。

系统交换器

这个平时应该用不到,这里略过。感兴趣的可以直接查看AMQP0.9.1的3.1.3.5章节。

AMQP的传输架构

解释了命令如何映射到传输层的。在设计自有协议时,可以参考一下它的设计思路,以及中间需要注意的问题。

AMQP是一个二进制协议。有不同类型的帧frame 构成。帧会携带协议的方法以及其他信息。所有的帧都有相同的基本结构,即:帧头,payload,帧尾。payload格式取决于帧的类型。

我们假设使用的是面向流的可靠网络层(比如TCP/IP)。单个Socket连接上可以有多个独立的控制线程,也就是通道Channel。不同的通道共享一个连接,每个通道上的帧都是按严格的顺序排列,这样可以用一个状态机来解析协议。

传输层(wire-level)的格式被设计为扩展性强、且足够通用,可以用于任何更高层的协议(不仅仅是AMQP)。我们假设AMQP是会被扩展、优化的。

主要涉及这几个部分:数据类型、协议协商、分帧方式、帧细节、方法帧、内容帧、心跳帧、错误处理、通道与连接的关闭。

AMQP的数据类型用于方法帧中,他们有

  • 整数(1-8个字节),表示大小,数量,范围等。全都是无符号整数
  • Bits。用于表示为开/关值,会被封包为字节。
  • 短字符串。用于存放短的文本属性。最多255字节,解析时不用担心缓冲区溢出。
  • 长字符串:用于存放二进制数据块
  • 字段表(Field Table),用于存放键值对

客户端连接时,和服务端协商可接受的配置。当两个节点达成一致后,连接才能继续使用。通过协商,可以让我们断言假设和前提条件。主要协商这几方面的信息

  • 实现的协议和版本。服务端可能会在同一端口提供多种协议的支持
  • 加密参数和验证
  • 最大帧尺寸、Channel的数量、某些操作的限制。

如果协商达成一致,双方会根据协商预分配缓冲区避免死锁。传入的帧如果满足协商条件,则认为其实安全的。如果超过了,那么另一方必须断开连接。

TCP/IP是流协议。没有内置的分帧机制。现有的协议一般有这几种方式进行分帧:

  • 每个连接只发送一个帧。简单,但是慢。
  • 在流中加入分隔符来分帧。简单,但是解析较慢(因为需要不断的读取,去寻找分隔符)
  • 计算帧的尺寸,并在每个帧之前发送尺寸。简单且快速。也是AMQP的选择

帧头包括:帧类型、通道、尺寸。帧尾包含错误检测信息。

处理一个帧的步骤:

  1. 读帧头,检查帧类型和Channel
  2. 根据帧类型,读取payload并处理
  3. 读帧尾校验

在实现时,性能很重要的时候,我们会使用 read-ahead buffering 或者 gathering reads 去避免读帧时进行三次系统调用。

方法帧

处理方式:

  1. 读取方法帧的payload
  2. 解包为结构
  3. 检查方法在当前上下文中是否允许
  4. 检查参数是否有效
  5. 执行方法。

方法帧是由AMQP数据字段组成。编码代码可以直接从协议规范中生成,速度非常快。

内容帧

内容是端到端直接发送的应用数据。内容由一系列属性和二进制数据组成。其中一系列的属性组成了 ”内容帧的帧头“。而二进制数据,可以是任意大小,它可能被拆分成多个块发送,每个块是一个 content-body帧

一些方法(比如 Basic.PublishBasic.Deliver)是会携带内容的。一个内容帧的帧头如下结构:

这里把 content-body 作为单独的帧,这样就可以支持Zero-copy技术,这部分内容就不需要被编码。把内容属性放到自己的帧里,这样收件人就可以选择性的丢弃不想处理的内容。

通道与连接的关闭

对于客户端,只要发送了 Open 就认为连接和通道是打开的。对于服务端则是Open-Ok。如果一个节点想要关闭通道和连接必须要进行握手。

如果突然或者意外关闭,没办法立刻被检测到,可能会导致丢失返回值。所以需要在关闭之前进行握手。在一个节点发送 Close 后,另一个节点必须发送 Close-Ok 来回复。然后双方可以关闭通道或者连接。如果节点忽略了 Close 操作,当双方同时发送 Close 时,可能会导致死锁。

最后,更多细节可以查看AMQP的官方规范,以及RabbitMQ在其基础上扩充的其它特性。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK