RabbitMQ 的通讯协议

发送消息流程:

< AMQP
> 10,10: Connection.start
< 10,11: Connection.start_ok
> 10,30: Connection.tune
< 10,31: Connection.tune_ok
< 10,40: Connection.open
> 10,41: Connection.open_ok
< 20,10: Channel.open
> 20,11: Channel.open_ok
< 85,10: Confirm.select
> 85,11: Confirm.select_ok
< 60,40: Basic.publish
< Message
> 60,80: Basic.ack
< 20,40: Channel.close
> 20,41: Channel.close_ok
< 10,50: Connection.close
> 10,51: Connection.close_ok

接收消息流程:

< AMQP
> 10,10: Connection.start
< 10,11: Connection.start_ok
> 10,30: Connection.tune
< 10,31: Connection.tune_ok
< 10,40: Connection.open
> 10,41: Connection.open_ok
< 20,10: Channel.open
> 20,11: Channel.open_ok
< 60,10: Basic.qos
> 60,11: Basic.qos_ok
< 60,20: Basic.consume
> 60,21: Basic.consume_ok
> 60,60: Basic.deliver
> Message
< 60,80: Basic.ack

RabbitMQ 中的保证组播的实现方式

RabbitMQ 的保证组播 Guaranteed Multicast 实现在 gm.erl 文件中。
保证组播是指:进程组中的进程可以动态添加和删除,发送到进程组的消息在消息的生命周期中,保证到达进程组中的每个进程。消息的生命周期从消息发送时算起,直到消息发送者了解到消息已经抵达了进程组中的所有进程为止。
这种保证指:

  1. 如果进程组中包含进程,消息就会发送到其中的每个进程;
  2. 如果 P 先后发送了 m 和 m’,对于组中所有先于 m 消息发送时加入进程组的进程 P’,如果它收到了消息 m’,则它必须要收到 m;
  3. 消息顺序保证,如果 P 先后发送了 m 和 m’,所有的成员要先后收到 m 和 m’;但是因果顺序并不强制要求,比如 P 收到 m 然后发送 m’,则其他成员不保证先收到 m,后收到 m’。

保证组播最简单的实现方式是由发送者发给组中每个成员,这需要组中所有成员都相互连接。存在的问题是假如发送者在消息发送中失效,谁负责消息发送成功的确认。并且由发送者发送消息给每个成员,对于发送者 CPU 和网络压力也会很大。
RabbitMQ 并不是这样实现的,它将所有成员组成一个链表,顺着链表发送。这样不需要所有成员之间建立连接,假如发送者失败,后继者可以接替它的地位,并且发送消息对于每个成员的开销接近一致,不会发生某个成员压力过大的情况。
消息的异步发送可以提高整个发送过程的性能,在链表 A -> B -> C -> D 中,如果 A 发送消息,它不需要和 C、D 建立连接,当 D 确认收到消息之后 A 才能确认消息已经发送到每个成员。

Backing Queue 的实现

Backing queue 即消息存储的具体形式和引擎。 RabbitMQ 的 backing queue 默认为 rabbit_variable_queue。为了提高内存和磁盘的利用率 rabbit_variable_queue 根据具体的情形将消息存储在内存或者磁盘中。rabbit_variable_queue 中消息的 4 种状态:

  • alpha 消息的内容和位置都在内存;
  • beta 消息内容在磁盘,位置在内存;
  • gamma 消息内容在磁盘,位置在内存和磁盘;
  • delta 消息内容和位置都在磁盘,表现为一组消息。

对于持久化消息 (delivery_mode = 2,并且 exchange、queue 都是持久化),消息和位置都存在磁盘,并且状态属于以上 4 状态之一。
大部分情况下,消息移动路线为 q1 -> q2 -> delta -> q3 -> q4, 4个状态大部分都被跳过。q1, q4 只包含 alpha 状态消息;q2,q3 包含 beta 和 gamma 状态。当新消息到来的时候,会确定归属于哪个状态,可能会跳过 q1 或者 其他 q。当消费者读取消息的时候,首先读取 q4 ,q4 为空的话则读取 q3 ,q3 为空的话 delta 中的下一组消息被读入 q3,并减小 delta 的大小。对于持久化的消息发送到持久化的队列,消息会立刻写入 msg_store 和 queue_index 。

RabbitMQ 中的消息索引 rabbit_queue_index

rabbit_queue_index 记录消息在磁盘文件系统的位置,启动的时候 RabbitMQ 会加载 index 文件, crush 后会进行恢复。
index 分为很多有序 index segment,index segment 从 0 开始递增,每个 index segment 文件 默认包含 16834 个 publish、develiver、ack。
所以 index segment 0 包含id为 0 – 16834-1 的消息, index segment 1 包含id为 16834 – 16834*2-1 的消息。message seq id % 16834 = 消息在 segment 中的位置。当消费速度很快,消息可能会不写入磁盘。当 segment 文件中的 publish num = ack number 时,就会删除这个 segment 。

Journal 文件

由于消息可能在生产时就被立即消费,为了避免多余的重复 IO 操作,还有一个固定长度的 journal 文件顺序记录每个动作。当 journal 文件满的时候,其中的操作再附加到对应的 segment 文件上,并把 journal 文件清空。journal 文件中的 seq id 是绝对 id,segment 中的 id 为相对 id。journal 文件也完全保存在内存中,其中还保存了 segment id 和 对应文件状态的映射。所有操作都附加到文件状态上,当 journal 文件 flush 到 segment 文件中时,发现 publish 数量和 ack 数量一致就不需要写操作了。当需要 sync 一致性的时候,可以 sync journal 文件。
消息在 Journal 文件中状态是这样的: {(‘no_pub’|{MsgId, MsgProps, IsPersistent}),(‘del’|’no_del’), (‘ack’|’no_ack’)}
Journal 文件名为 journal.jif
qistate 记录 dirty count (日志中累积的数量)和 segment 缓存信息和文件目录,每个动作发生 dirty count+1, segment 缓存信息则包含了 journal 中的消息信息 JEntries 和 Unack 的数量。
当新动作 append 到日志中时,JEntries 以 Array 的形式保存在内存中,RelSeq: {…},同时记录 Unack 的数量,新消息+1,ack-1 。
新消息publish:
假如不存在则保存 {MsgId, MsgProps, IsPersistent}
RelSeq: {{MsgId, MsgProps, IsPersistent}, no_del, no_ack}
消息删除:
原来为 {{MsgId, MsgProps, IsPersistent}, no_del, no_ack} 并且 action 为 del
RelSeq: {{MsgId, MsgProps, IsPersistent}, del, no_ack}
消息 ack:
原来为 {no_pub, del, no_ack} 并且 action 为 ack
RelSeq: {no_pub, no_del, ack}
消息 ack:
原来为 {{MsgId, MsgProps, IsPersistent}, del, no_ack} 并且 action 为 ack
则真正删除。
当 dirty count 大于日志最大限度的时候就开始将数据 flush 到 segment 中。如果 unack = 0,说明完全被消费,删除对应的 segment 文件,否则 append 到对应 segment 文件上,过程中需要跳过被 del 和 ack 以及 no_pub 的消息。

Segment 文件及其他目录文件

Segment 文件后缀 .idx。结构为 <<REL_SQL_ONLY_PREFIX, IsPersistent, RelSeq, Body, [DEL], [ACK]>> 其中 DEL 和 ACK 为 <<REL_SQL_ONLY_PREFIX, RelSeq>>。
RabbitMQ 会为在 queues 目录下每个 queue 创建一个 MD5 映射的文件夹,当正常停机的时候会在文件夹下创建 clean.dot 文件,并保存内存中的数据;非正常停机重启时,会从 journal 中恢复数据。journal 文件在 queue 空闲的时候也会 flush 到 segment 中。当加载 segment 的时候会采用 read ahead 方式读文件。

文件句柄缓存 file_handle_cache

这是 RabbitMQ 对 Erlang file API 的一个封装,无论是 index 还是消息体的存储都使用这些 API。
特点:

  • 每个文件支持一写多读
  • 写都是 append
  • 有 write buffer,没有 read buffer
  • 支持手动 sync
  • 都是调用 prim_file:* 函数

消息存储 rabbit_msg_store

Index,存储 MsgId 到消息位置的映射 {MsgId, RefCount, File, Offset, TotalSize}
FileSummary,ets 表存储文件到摘要的映射 {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
所有队列的消息存储在 1 个文件中,文件也是 append 模式,增大后分割新文件。当某些消息被消费的时候,文件就会产生空洞,ValidTotalSize 记录了真实空间占用,这个值用来计算是否执行文件 GC。文件 GC 通过将文件和其他文件合并实现,提高存储利用率和执行效率。非正常停机情况下,通过扫描 GC 过程影响的文件,重构 index 和 filesummary。默认情况下,一个文件中的空洞超过 50% 就会执行 GC。GC 过程为将右边的文件向左合并:左边的文件会重新写到临时文件构成没有空洞的文件,然后写回。右边文件中的有效数据 append 到左边文件中。然后更新 index 和 filesummary 。消息体是引用计数的,同一个 msgid 的消息只存储一次,并记录写的次数,当删除同样次数的时候才真正将消息删除。引用数并不存储在消息体中。读消息的时候只有引用数大于1的消息才会会读到 cache 中,减少了内存占用,提高了性能。删除消息的时候,即使引用计数等于0也不把消息真正删除,为了防止消息存储到多队列,一个队列写入删除完成而另一个队列写入还未开始。当文件 GC 的时候会锁定文件。将操作排队处理。文件有 write back cache,一个 client 写入的内容可以立刻被其他 client 读取,这消除了 sync 时对读取的阻塞。当写进程非常繁忙的时候 读写也不会有延迟。由于 msg_store 有一个缓冲区,有很多写和删除事件在排队。flying_ets 用来处理publish、remove到来在真正写入之前的情况,通过+1、-1计数抵消,这样可以避免一些写入。

rabbit_amqqueue

处理 queue 的声明和状态信息、高可用策略、权限验证、统计信息、以及提供 queue 操作接口。这些信息是存储在 mnesia #amqqueue 中。

rabbit_amqqueue_process

queue 操作处理进程。

去除 per-connection 流控

只需要修改:

%case {CS, (Throttle#throttle.conserve_resources orelse
%           credit_flow:blocked())} of

为:

case {CS, Throttle#throttle.conserve_resources} of

消费者负载均衡

rabbit_amqueue_process:deliver_msgs_to_consumers 中 queue:out 和 queue:in 实现了 RR 负载均衡。

根据消息 ID 从持久化文件中读取消息体

Ref = rabbit_guid:gen(),
MSCState = rabbit_msg_store:client_init(
msg_store_persistent, Ref, undefined, undefined),
MsgId = <<245,82,73,201,192,75,136,167,88,84,149,197,104,141,81,33>>,
MSCState1 = rabbit_msg_store:read(MsgId, MSCState).

推荐这些相关文章

订阅这个博客:

关注我的微博:

关注我的推特: