Message Broker
Last updated
Last updated
消息队列相关,如何保证order
RabbitMQ
RabbitMQ 的问题是由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。解决这个问题,我们可以给 RabbitMQ 创建多个 queue,每个消费者固定消费一个 queue 的消息,生产者发送消息的时候,同一个订单号的消息发送到同一个 queue 中,由于同一个 queue 的消息是一定会保证有序的,那么同一个订单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。
Message Broker(消息中介)因为其用途丰富,在各大公司的服务中都起到不可替代的地位。Message Broker 有时也被称为 Message Queue,在本文中对两个名称不作区分。
Message Broker 使得不同应用之间可以按照顺序不丢失地传递消息,并且在传递过程中提供暂时的存储。
值得注意的是,这里的 Message 指的是广义的消息,可以用来发布一个指令(比如启动某一个任务),回传一个已完成任务的结果,或者是单纯的文本消息。
想象一下我们有两个相关的服务,其中B接收A的指令做进一步处理。实现上的选择如下:
方案一 A 直接发一个 API Call 给 B
方案二 A 发指令给消息队列,通过中转给 B
如果我们使用方案二,消息队列起到了Async (异步)的作用,B不需要实时完成指令,当然 A 也无法立刻根据 B 的回复做进一步的处理。在实际操作中,Aysnc 给我们带来一个附带的优势,即 Traffic Smoothing (削峰),使得流量突然增长情况下,使服务保持可用性。
在这个基础之上,我们的用途有如下变化:
有多个服务 B,C,D 接收 A 服务的同样的指令
指令需要按照顺序执行
如果我们使用方案一,那么我们需要在 A-B, A-C, A-D 之间进行类似的 API Call,并且 B, C, D 的应用层面需要保证能够按照 API Call 的顺序去进行处理(这在分布式环境下是不容易保证的)。
如果我们使用方案二,消息中介会为我们做这个中转,保证顺序,简化 A, B, C, D 服务应用层的逻辑,使得它们不需要依赖彼此的接口,不需要担心某一个接口变化产生的麻烦。
用术语来说,相对于方案一,方案二遵循了 Decoupling (解耦) 的原则,使得服务之间可以尽可能地互相独立,方便部署升级。
总结起来,引入这层抽象的作用主要两点:异步和解耦。
RabbitMQ 提供一个 Message Broker 的通用方案,在工业界被广泛使用。RabbitMQ 相对后来者 Kafka,功能更加单一,是一款典型的“只做好一件事”的软件。
其核心抽象是一个队列 (Queue) ,用户通过 RabbiMQ 定义一组队列用于将内容在不同应用之间传递。
Message Broker 由 Exchange 和队列 (Queue) 组成。信息传递的流程如下:
发信方(Producer) 将信息传递给Exchange
Exchange 根据 Binding 逻辑将信息传递到队列 (Queue) 里
队列根据与收信方 (Consumer) 的订阅协议 (Subscribe) 把信息 推送 (Push) 给收信方
Message Broker 内部为什么需要引入 Exchange 这层抽象呢?
Exchange 抽象出了相对复杂的消息分发逻辑,使得 Queue只需要存储及发送信息。
Exchange 分成四种:
直接绑定 (Direct) - routing key 完美匹配 binding key
话题 (Topic) - routing key wildcard 匹配 binding key
Fanout - 发到所有有 binding 的 queue
Header - 根据消息 Header 里 binding key 匹配
我们看一看使用之前提到的 Exchange 可以支持什么样的消息模式 (Messaging Pattern)。
采用两个 Exchange (Request/Response Exchange),两个 Queue (Request/Response Queue)。请求方发布 Request 消息到 Request Exchange,再到 Request Queue。回复方读取 Request Queue,处理并发布 Response 消息到 Response Exchange, 再到 Response Queue,由请求方读取。
对应上一节的直接绑定。在一对一完美匹配 binding key 是传递消息。
对应上一个的 Fanout Exchange,把消息分发到所有的订阅 Queue 中。
有两种方式。
一是使用 Topic Exchange Wildcard 匹配,对应 Topic Exchange。二是使用 Header Exchange 根据消息 Header 匹配,对应 Header Exchange。
一个程序通常需要建立跟 Broker 之间建立多条逻辑上的通信频道 (Channel),比如该程序可以即是发信方也是收信方。
在这种情况下,最直接的方法是对于每一个逻辑上的通信频道都建立一条 TCP Connection。这样做法的缺点显而易见:
操作系统会限制每个程序能够创建的 TCP Connection 数量
建立,销毁,维持 TCP Connection 会消耗系统资源
于是 RabbitMQ 采用的协议之一 AMQP 0-9-1 就提供了一层 AMQP 0-9-1 connection 的抽象,使得多个通信频道可以共用一条 TCP Connection。协议内部会给每一个通信频道一个 ID,确保频道之间互不干扰。
6.2.1 RabbitMQ 如何保证信息的传递?
RabbitMQ 提供“接收确认”(Acknowledgement)的机制。这个机制既可以用在发信方,也可以用于收信方,发信方会收到 Broker 发送的接收确认,而 Broker 会收到收信方的接收确认。这样 Broker 可以很明确地知道什么时候消息传递已经完成。在网络出现问题或者其他失败出现时,一旦接收确认的回应没有被收到,RabbitMQ可以重新发送消息,实现至少一次的信息传递。
如果用户选择不使用这个机制,RabbitMQ 会保证至多一次的信息传递。
6.2.2 Broker 如何在解决单点故障问题?
一个 RabbitMQ 集群可以包括多台 RabbitMQ 机器。Exchange, binding 以及用户的信息会被复制到每一台机器上。而队列 (Queue) 本身默认只存在在单机上。此情况下,为了实现在该机器重启时信息不丢失,RabbitMQ 可以设置队列为 Durable 以及消息为 Persistent,这样队列的元信息和其中每一条消息都会被写到硬盘上,在重启时自动恢复。
另一个选择时复制队列到多台机器,称为队列镜像 (Mirrored Queue)。每个镜像会包含所有消息,如果主镜像宕机,剩下的镜像会重现选出一个主镜像。
Lovisa Johansson. (2019). Part 1: RabbitMQ for beginners - What is RabbitMQ?
RabbitMQ documentation. Reliability Guide
RabbitMQ documentation. Channels
图片取自 Lovisa Johansson. (2019). Part 1: RabbitMQ for beginners - What is RabbitMQ?
图片取自 Lovisa Johansson. (2019). Part 1: RabbitMQ for beginners - What is RabbitMQ?