在Rabbitmq中,一个消息从生产者发布到消费者消费完成会经历四个部分:
生产者投递消息给exchange
exchange路由消息到队列
消息在队列中存储
消费者订阅队列取出消息并进行消费
这四部分都有丢失的可能, 下面来分别对这四部分可能丢失的情况提出解决方案。
生产者投递消息
这个过程应对消息丢失有两个方案:
- amqp协议提供了事务机制,Rabbitmq也实现了事务机制
- 使用生产者确认模式
事务机制可以看作是一种同步阻塞的方案,生产者发送消息后处于阻塞状态,等待Rabbitmq的回应,由于会极大影响性能,在生产环境中用的不多。
生产者确认机制是异步的,发送一条消息后不必等待回应就可以继续发送下一条消息,消息最后被成功接收或丢失都会通过回调方法来进行处理。
exchange路由消息
消息到达了exchange也可能会由于没有匹配的队列而丢失。
解决方案也有两种:
- 发送消息时mandatory参数设置为true,exchange在无法找到匹配的队列时会将消息返回给生产者
- 使用备份交换器
发送消息时设置mandatory参数为true, 并且没有匹配到队列时,消息会被返回给生产者,但是需要生产者添加监听的业务逻辑来接收。
如果不想增加生产者进行监听的业务逻辑,可以设置备份交换器,对所有无法路由的消息发送给备份交换器,然后由一个绑定了备份交换器的队列接收消息,后续通过消费这个队列对这些消息进行处理。
消息存储在队列阶段
默认情况下消息是存储在内存中的,因此可能会宕机然后消息丢失,所以需要设置持久化。需要注意的是设置持久化需要将消息以及队列都开启持久化。
持久化的消息也并不是每一条都落盘了,需要一段很短的时间才能存入磁盘,如果是采用了事务机制或者生产者确认机制的话,则确认时是已经落盘的了。
不过这里还存在一个单点的问题,如果单机故障并且磁盘损毁也会消息丢失,因此一般为了高可用会设置镜像队列,镜像队列是一个数据副本,在主节点挂掉后可以自动切换到从节点。
消息的消费阶段
消息在被消费者接收后,还没来得及处理就宕机了,也算是丢失的情况。类似生产者确认机制,Rabbitmq提供了消费者确认机制。
消费者在订阅队列时,可以设置只有显式回复确认时,消息才会被Rabbitmq移除。
消息消费失败时也可以进行拒绝而不是确认,拒绝时可以指定requeue参数,设置为true的情况下消息会被重新存入队列,以便发送给下一个订阅的消费者。
requeue的消息会被放入队列头部,如果再次消费时又是拒绝然后requeue, 可能会出现无限循环的情况,推荐放到一个死信队列,然后对死信队列的消息进行消费和分析。