Rabbitmq总结

消息中间件的作用

异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。
应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。
流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
日志处理 - 解决大量日志传输。
消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

Rabbitmq特点

可靠性:Rabbitmq使用一些机制来保证可靠性,如持久化、传输确认以及发布确认等

灵活的路由:消息进入队列之前,通过交换器来路由消息

扩展性:多个节点组成集群,根据业务情况动态扩展集群中的节点

高可用性: 队列可以设置镜像,使得部分节点出问题时队列仍然可用

多种协议:支持amqp、mqtt等多种消息中间件协议

管理界面:提供了易用的管理界面,方便监控和管理

插件机制:提供了许多插件,可以进行扩展

使用场景

架构模型

屏幕快照png
上图可以看到三部分:Producer(生产者)、Consumer(消费者)、Broker(消息中间件)。

图中的Broker部分就是消息中间件的服务节点,对于Rabbitmq来说可以看作是一个Rabbitmq服务实例。

核心概念

Queue: 队列,用于存储消息。Rabbitmq中消息都只能存储在队列中。多个消费者可以消费同一个队列,此时队列中的消息会被平均分摊(Round-Robin),而不是每个消费者都收到所有的消息

Exchange: 交换器,生产者奖消息发送到交换器,由交换器将消息路由到一个或者多个队列中

Routing Key: 路由键,生产者将消息发送给交换器的时候,会指定一个路由键,用来表示这个消息的路由规则,路由键需要与交换器类型以及绑定键联合使用而生效

Binding: 绑定,通过绑定将交换器与队列关联起来

Binding Key: RabbitMQ会将交换器与Queue绑定起来,在绑定的时候一般会指定一个BindingKey,这样RabbitMQ就可以正确地将消息路由到队列

Virtual Host: 在Rabbitmq的实例上实现逻辑的隔离

Connection: 一个TCP连接

Channel: 引入Channel可以复用TCP连接以节约资源,一个Connection可以创建多个Channel实例

交换器类型

常用的有direct、topic、fanout、headers四种。

fanout: 会把所有发送到该交换器的消息发送到所有与其绑定的队列中。

direct: 把消息发送到路由键和绑定键完全匹配的队列中。

topic: direct是路由键和绑定键的完全匹配,topic类型则是模糊匹配

headers: 根据发送消息内容中的headers属性来进行路由,性能较差,使用的不多。

运转流程

生产者流程:
生产者连接到RabbitMQ Broker并创建连接和开启一个Channel

生产者声明一个交换器并设置相关属性,比如类型、是否持久化等

生产者声明一个队列并设置相关属性,比如排他、是否持久化、是否自动删除

生产者通过路由键将交换器和队列绑定起来

生产者发送消息值RabbitMQ Broker,其中包含路由键、交换器等信息

相应的交换器根据接收到的路由键查找匹配的队列

如果找到则将消息存入匹配到的队列

如果没有找到则根据生产者配置的属性选择丢弃还是退回给生产者

关闭Channel和连接

消费者流程

消费者连接到RabbitMQ Broker并创建连接和开启一个Channel

消费者向RabbitMQ Broker请求消费相应队列中的消息,设置相关信息

等待RabbitMQ Broker回应并投递相应队列的消息,消费者接收消息

消费者确认接收到的消息(ACK)

RabbitMQ Broker从队列中删除已经确认的消息

关闭Channel和连接

消费消息

消费消息分为推模式和拉模式。建议使用推模式。

消费者在订阅队列或者拉取消息时,都可以指定autoAck参数,当其为true时RabbitMQ会自动把发送出去的消息置为已确认状态,然后从内存或磁盘中删除,而不管消费者是否消费这些消息;若autoAck为false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存或磁盘中移除消息。

当autoAck为false时,对于RabbitMQ服务端来说,队列中的消息有两种类型,一种是等待投递给消费者的消息,一种是已经投递给消费者但未收到消费者确认的消息。如果一直未收到消费者的确认信号,RabbitMQ会等到消费者断开连接才会安排消息重新进入待投递的队列,等待投递给下一个消费者。

Rabbitmq不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是Rabbitmq允许消费者花很久的时间消费一条消息。

python示例

todo.

发送消息不可达

发送消息时有个参数mandatory, 用于在消息传递过程中不可达目的地时将消息返回给生产者。Rabbitmq提供的备份交换器可以将未能被交换器路由的消息存储起来,而不用返回给客户端。

mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么Rabbitmq会将消息返回飞生产者;当mandatory为false时,则消息直接被丢弃。

生产者想要获取被Rabbitmq返回的消息需要添加返回监听器来实现。

备份交换器

生产者在发送消息的时候如果设置mandatory参数为false,那么消息在未被路由到queue的情况下将会丢失,如果设置了mandatory参数为true,那么需要添加返回监听器的逻辑,生产者的代码将变得复杂。

如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备用交换器,这样可以将未被路由到queue的消息存储在RabbitMQ中,在需要的时候去处理这些消息。

过期时间

设置消息的TTL有两种方式:在消息上设置TTL(方法1)和在队列上设置队列中每条消息的TTL(方法2)。对于方法1,即使消息过期也不会马上从队列中抹去,每条消息是否过期是在投递到消费者之前才会进行判定;而在方法2中,一旦消息过期就会马上从队列中抹去。

方法1队列中每条消息的过期时间不同,如果要删除所有过期消息势必要不停的扫描整个队列,这种严重影响性能的方式是不可取的,所有RabbitMQ采用一种懒过期的方式,即等到消息即将被消费是才去判定是否过期,如果过期则将该消息作为死信处理而不投递给消费者;而对于方法2,整个队列内消息的过期时间一样,先投递的一定比后投递的早一步过期,RabbitMQ只要定期扫描队列头部是否有过期的消息即可。

对于方法1和方法2并用来设置消息TTL的情况,则TTL以两者中最小的值为准。

设置整个队列的过期时间后,当队列在给定的时间内没有被使用就会被删除,没被使用的意思是该队列上没有消费者在消费,并且没有被重新声明。

死信队列

一个消息因为以下情况会变成死信:

消息被拒绝,并且requeue参数为false
消息过期
队列达到最大长度

DLX,即死信队列交换器。当消息变成死信后,若消息所在队列有指定一个DLX,则会将该死信投递到该DLX,与这个DLX绑定队列的则被成为死信队列。

DLX也是一个正常的交换器,和一般的交换器没有区别,能在任何的队列上被绑定,实际就是设置某个队列的属性。当这个队列中存在死信时,Rabbitmq会自动就这个消息重新发布到设置的DLX上去,进而被路由到一个队列,即死信队列,可以监听这个队列中的消息以进行相应的处理。

一个队列的死信队列交换器可以通过在channel.queueDeclare方法中设置拓展属性x-dead-letter-exchange来制定。

DLX可以处理异常情况下,消息不能够悲消费者正确消费(被Nack或Reject)而置入死信队列的情况,后续可以通过消费死信队列的内容来分析当时遇到的异常情况。

延迟队列

Rabbitmq本身没有直接支持延迟队列的功能,但可以通过DLX和TTL模拟出延迟队列的功能。设置消息的TTL为延迟的时间,当消息成为死信时则表示该消息可以消费了,所以我们的消费者只要监听队列绑定的死信队列即可。

需要注意的是,设置消息TTL的方式有两种:在消息上设置TTL和在队列上设置队列中每条消息的TTL。根据RabbitMQ对两种方式的处理,我们一般会选择在队列上设置队列中每条消息的TTL,原因是若出现TTL较长的消息会导致其后的过期消息无法投递到死信队列,从而导致后面的消息无法按预期的延迟时间来处理。

优先级队列

优先级队列可以通过在声明队列方法中设置拓展属性x-max-priority来设置。

RPC实现

持久化

Rabbitmq的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。

1.交换器持久化
持久化的交换器元数据不会再RabbitMQ服务重启之后丢失,可以在声明交换器时将durable参数设置为true来声明一个持久化的交换器

2.队列持久化
持久化的队列元数据不会再RabbitMQ服务重启之后丢失,可以在声明队列时将durable参数设置为true来声明一个持久化的队列

3.消息持久化
交换器和队列的持久化并不能保证内部消息也持久化,要将消息持久化需要在发送时指定消息是持久化消息。通过在消息发送时将投递模式(deliveryMode)设置为2即可实现消息的持久化。

可以将所有的消息都设置为持久化,但这样会严重影响Rabbitmq的性能,写入磁盘的速度比写入内存的速度慢许多。在选择是否将消息持久化时,需要在可靠性和吞吐量之间进行权衡。

如果要做到数据的可靠性传递,还需要:

  1. 消费者订阅消费队列时,将autoAck设为false,并进行手动确认
  2. 持久化的消息存入Rabbitmq后,可能需要一段时间才能存入磁盘(不是每条消息都进行同步存盘), 这期间发生宕机会丢失消息,可以引入镜像队列机制,在节点挂掉时自动切换到从节点。这样可靠性提升许多,除非整个集群都挂掉。
  3. 生产端引入事务机制或者生产者确认机制来保证消息正确的投递到Rabbitmq中。

生产端-如何可靠性投递

生产者把消息发出之后,如何保证到达了服务器呢?

有两种解决方式:事务机制和发送方确认机制。

事务机制由于对性能影响特别大,使用较少。

下面说一下发送方确认机制:

  1. 将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。

  2. 一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

  3. 如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。

  4. 发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

消费端-要点

消费者通过推或拉的方式获取并消费消息,完成业务逻辑后手动确认消息已被接收,这样Rabbitmq把当前消息从队列中标记清楚。消费者无法处理的消息可以通过Nack或Reject拒绝掉。

消息的分发是以round-robin的方式进行分发的,每条消息只会给订阅列表里的一个消费者,这样非常适合扩展,负载过重时只需要创建更多的消费者即可。可以通过Qos限制信道上的消费者所能保持的最大未确认消息的数量,以控制单个消费者的负载。

Qos即服务端限流,对于拉模式的消费方式无效,以下是Qos的好处:

  • 提高服务稳定性。假设消费端有一段时间不可用,导致队列中有上万条未处理的消息,如果开启客户端,巨量的消息推送过来,可能会导致消费端变卡,也有可能直接不可用,所以服务端限流很重要。

  • 提高吞吐量。当队列有多个消费者时,队列收到的消息以轮询的方式发送给消费者。但由于机器性能等的原因,每个消费者的消费能力不一样,
    这就会导致一些消费者处理完了消费的消息,而另一些则还堆积了一些消息,会造成整体应用吞吐量的下降。

另外,Rabbitmq不保证消息的顺序性,一般需要业务方使用全局有序标识等方法来实现。

消息传输保障

消息中间件的消息传输保障一般分为三个层次:

At most once: 最多一次,消息可能丢失,但绝不会重复传输
At least once: 最少一次,消息绝不会丢失,但可能会重复传输
Exactly once: 恰好一次,每条消息都肯定会被传输而且仅传输一次

Rabbitmq支持最多一次和最少一次,其中最少一次需要注意如下方面:

  1. 消息生产者需要开启事务机制或生产者确认机制,以确保消息传输到Rabbitmq中
  2. 消息生产者需要配合使用mandatory参数或备份交换器来确保消息能够从交换器路由到队列中,进而被保存下来而不会丢弃
  3. 消息和队列都需要进行持久化处理,以确保在Rabbitmq服务器遇到异常时不会丢失消息
  4. 消费者需要将autoAck设置为false,然后通过手动确认来确认消息的消费,以避免在消费端引起不必要的消费丢失

消息的重复问题

消息去重机制一般要业务客户端来实现,例如引入全局唯一标志符。