发布于

消息队列的推拉模型

作者

消息队列的基本模型主要囊括三个角色:负责生产消息的 Producer、负责消息中转的 Broker 以及对消息进行消费的 Consumer,三者之间的交互模式如下图所示。

消息队列的基本交互模型

一般而言,消息是从 Producer 端主动推送到 Broker 的,而根据 Broker 到 Consumer 这条消息链路所采用的工作机制,可以分为推模型(Push)和拉模型(Pull Model)。推模型是指 Producer 所生产的消息由 Broker 主动推送给相应的 Consumer,而在拉模型中则是由 Consumer 主动向 Broker 拉取数据。

推拉模型

考虑到 Producer 生产消息后一般直接推送到 Broker,因此讨论一个消息队列采用的是推模型还是拉模型主要看 Broker 与 Consumer 之间的交互方式,如果是 Broker 主动推送则是推模型,反之则是拉模型。

拉模型

目前主流的消息队列实现中采用拉模型的代表非 Kafka 莫属,Kafka 官方对于采用拉模型的设计给出了如下理由

  • 拉模型可以由 Consumer 根据自身情况来控制消费速率,避免了推模型中 Consumer 端可能产生的消息堆积问题;
  • 在拉模型中,Consumer 可以一次批量拉取多个消息,在减少网络带宽的同时提高系统吞吐量;
  • Consumer 可以任意重放历史消息,在推模型中很难记录和控制消息的历史消费进度,而在模型中,客户端可以通过 consumer offset,commit log 等机制控制消费进度。

此外,从实际代码实现的角度上来看,拉模型的另外一个显著优势是 Consumer 可以按照自身需要设置缓冲区大小,然后成批次的拉取数据,而不是像推模型那样,客户端在接收消息时每次都要在本地新申请一个对应的消息缓冲区。

当然拉模型并不是只有优点,由于 Consumer 和 Broker 之间建立的是 TCP 长连接,因此 Consumer 在拉取数据时自然要以一种轮询(Polling)的方式来获取数据,比较容易造成 Consumer 端相关轮询线程空转。对于这个问题,主流的消息队列一般会采用长轮询(Long Polling)结合超时重试的方式实现其 Consumer API,从而避免轮询线程空转的问题。

推模型

对于 EMQXRabbitMQ 这类采用标准协议(MQTT、AMQP)实现的推模型消息队列而言,Broker 在收到 Producer 生产的消息后会立刻根据相应的规则依次转发给所有订阅该消息的 Consumer,因此消息推送的实时性比拉模型要强的多,这也是推模型相较拉模型的主要优势。

下面我们主要讨论采用推模型的消息队列所面临的的两个主要问题:Consumer 消息堆积和消息重放。对于消息堆积问题,当前主流推模型消息队列在协议和代码实现层面都有做相应的优化,比如 Broker 与 Consumer 在建立连接时会协商一个消费速率区间来避免消息堆积。

消息重放问题在本质上其实就是历史消息该如何持久化的问题:在 Consumer 离线的时间(一分钟,一天、一个月等)里,应当考虑是否应该为 Consumer 保留(持久化)离线时间段内应该推送给它的所有新消息?对于这个问题,一般会采用下面给出的几种解决方案:

  • 直接丢弃(Discard):最简单粗暴的做法,即不对历史消息做持久化,这也就意味着当 Consumer 重新上线时,它将无法接收到 Producer 在离线期间发布的消息。RabbitMQ 默认情况下采用这种处理方式,如果 Consumer 想要收到离线期间新产生的消息,可以通过设置队列的 durable 选项来解决;
  • 只保留最新消息(Retain):对于 EMQX 这类采用 MQTT 协议的消息队列,Producer 在发布消息时可以通过设置 publish message 的 retain 标志来发送保留消息(Retain Message),MQTT Broker 会为每个 Topic 存储最新一条保留消息,离线的 Consumer 在重连订阅该 Topic 后会收到对应的保留消息;
  • 持久化(Persistent):Broker 负责将 Consumer 离线期间产生的消息存储到本地,以便 Consumer 重连后进行消费。对于推模型消息队列而言,当 Consumer 数量较大时,为每个离线的 Consumer 维护历史消息的成本较高,一般会为持久化后的消息设置相应的超时时间(一天、一个月等)并定期删除过时消息。此外,还可以根据消息的服务等级(Quality of Service,QoS)来选择性的持久化重要的消息。比如 MQTT 协议中根据 Consumer 对于消息的预期接收情况规定了三个 QoS 等级,分别是至多收到一次(At most once,QoS 0)、至少收到一次(At least once,QoS 1)和只收到一次(Exactly once,QoS 2),我们可以只保留 QoS >= 1 的消息来节省磁盘空间,提高 Broker 消息推送效率。