如何解决消息丢失
生产者与消费者相互解耦,怎么保证生产者已将消息投递到 RabbitMQ 服务端,又如何确认消费者已经消费了该消息?
消息生产可靠性
RabbitMQ为我们提供了两种方式:
- 通过
AMQP事务机制实现,这也是AMQP协议层面提供的解决方案; - 通过将
channel设置成confirm模式来实现;
RabbitMQ事务机制
注意:使用事务模式会导致服务端吞吐量急剧下降,实际使用场景很少,这里简单介绍一下
RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式(通道事务),txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
|
|
Confirm模式(常用模式)
模仿了协议中已经存在的消费者ACK确认机制,生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息
|
|
Spring Amqp配置
|
|
当mandatory标志设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
confirmCallback回调方法
|
|
returnCallback回调方法
|
|
注意:
confirm主要是用来判断消息是否有正确到达交换机,如果有,那么就 ack 就返回 true;如果没有,则是 false。return则表示如果你的消息已经正确到达交换机,但是后续处理出错了,那么就会回调 return,并且把信息送回给你(前提是需要设置了 Mandatory,不设置那么就丢弃);如果消息没有到达交换机,那么不会调用 return 的东西。
消息消费可靠性
为了保证消息从队列可靠地到达 Consumer,RabbitMQ提供消息确认机制。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它
消息生产、消费的流程图:

ack 模式
NONE该模式下, 当
broker发送消息成功后, 会立即将此消息从消息队列中删除, 而不会等待消费者的ACK回复
MANUAL
AUTO(默认模式)当
broker发送消息给消费者时, 不会立即将此消息删除, 而是需要等待消费者的 ACK 回复后才会删除消息. 因此在手动ACK模式下, 当消费者收到消息并处理完成后, 需要向broker显示地发送 ACK 指令.