如何解决消息丢失
生产者与消费者相互解耦,怎么保证生产者已将消息投递到 RabbitMQ
服务端,又如何确认消费者已经消费了该消息?
消息生产可靠性
RabbitMQ
为我们提供了两种方式:
- 通过
AMQP
事务机制实现,这也是AMQP
协议层面提供的解决方案; - 通过将
channel
设置成confirm
模式来实现;
RabbitMQ
事务机制
注意:使用事务模式会导致服务端吞吐量急剧下降,实际使用场景很少,这里简单介绍一下
RabbitMQ
中与事务机制有关的方法有三个:txSelect()
, txCommit()
以及txRollback()
, txSelect
用于将当前channel
设置成transaction
模式(通道事务),txCommit
用于提交事务,txRollback
用于回滚事务,在通过txSelect
开启事务之后,我们便可以发布消息给broker
代理服务器了,如果txCommit
提交成功了,则消息一定到达了broker
了,如果在txCommit
执行之前broker
异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback
回滚事务了。
|
|
Confirm模式(常用模式)
模仿了协议中已经存在的消费者ACK
确认机制,生产者将信道设置成confirm
模式,一旦信道进入confirm
模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker
就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出
confirm
模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息
|
|
Spring Amqp配置
|
|
当mandatory
标志设置为true时,如果exchange
根据自身类型和消息routingKey
无法找到一个合适的queue
存储消息,那么broker
会调用basic.return
方法将消息返还给生产者;当mandatory
设置为false时,出现上述情况broker会直接将消息丢弃
confirmCallback
回调方法
|
|
returnCallback
回调方法
|
|
注意:
confirm
主要是用来判断消息是否有正确到达交换机,如果有,那么就 ack 就返回 true;如果没有,则是 false。return
则表示如果你的消息已经正确到达交换机,但是后续处理出错了,那么就会回调 return,并且把信息送回给你(前提是需要设置了 Mandatory,不设置那么就丢弃);如果消息没有到达交换机,那么不会调用 return 的东西。
消息消费可靠性
为了保证消息从队列可靠地到达 Consumer
,RabbitMQ
提供消息确认机制。消费者在声明队列时,可以指定noAck
参数,当noAck=false
时,RabbitMQ
会等待消费者显式发回ack
信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ
会在队列中消息被消费后立即删除它
消息生产、消费的流程图:
ack
模式
NONE
该模式下, 当
broker
发送消息成功后, 会立即将此消息从消息队列中删除, 而不会等待消费者的ACK
回复
MANUAL
AUTO
(默认模式)当
broker
发送消息给消费者时, 不会立即将此消息删除, 而是需要等待消费者的 ACK 回复后才会删除消息. 因此在手动ACK
模式下, 当消费者收到消息并处理完成后, 需要向broker
显示地发送 ACK 指令.