rabbitmq消息防丢失

如何解决消息丢失

生产者与消费者相互解耦,怎么保证生产者已将消息投递到 RabbitMQ 服务端,又如何确认消费者已经消费了该消息?

消息生产可靠性

RabbitMQ为我们提供了两种方式:

  1. 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
  2. 通过将channel设置成confirm模式来实现;
RabbitMQ事务机制

注意:使用事务模式会导致服务端吞吐量急剧下降,实际使用场景很少,这里简单介绍一下

RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式(通道事务),txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

1
2
3
4
5
6
7
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
Confirm模式(常用模式)

模仿了协议中已经存在的消费者ACK确认机制,生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息

1
2
3
4
5
6
1. 对于可路由的消息,当所有的队列接收到消息后,broker向client发送basic.ack确认通知;
2. 对于路由到持久队列的持久化消息,当消息持久化到磁盘后,broker向client发送basic.ack确认通知;
3. 对于路由到镜像队列的消息,当所有的镜像队列都接收到消息后,broker向client发送basic.ack确认通知;
4. 对于不可路由的消息,broker一旦确认该消息不可路由时,则向client发送basic.ack确认通知;
5. 对于不可路由且mandatory强制投递的消息,broker一旦确认该消息不可路由时,先向client发送basic.return通知,然后发送basic.ack确认通知;
  • Spring Amqp配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<rabbit:connection-factory id="connectionFactory"
host="${rabbit.connect.host}"
port="${rabbit.connect.port}"
username="${rabbit.connect.username}"
password="${rabbit.connect.password}"
channel-cache-size="${rabbit.connect.channelCacheSize}"
publisher-returns="true"
publisher-confirms="true" />
<bean id="returnCallback"
class="com.ximalaya.trump.business.message.callback.MessageReturnCallback" />
<bean id="confirmCallback"
class="com.ximalaya.trump.business.message.callback.MessageConfirmCallback" />
<!-- mandatory必须设置true,return-callback才生效 -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
confirm-callback="confirmCallback"
return-callback="returnCallback"
message-converter="messageConverter"
mandatory="true" />

mandatory标志设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃

  • confirmCallback回调方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MessageConfirmCallback implements ConfirmCallback {
private static final Logger LOG = LoggerFactory.getLogger(MessageConfirmCallback.class);
@Override
public void confirm(CorrelationData correlationData, boolean ack) {
if (ack) {
// 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true(这是一个坑,需要注意), 这种情况一般需要在returnCallBack中重新投递
LOG.info("消息确认成功");
} else {
// 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false
// 处理丢失的消息(nack)
LOG.warn("消息确认失败");
}
}
}
  • returnCallback回调方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MessageReturnCallback implements ReturnCallback {
private static final Logger LOG = LoggerFactory.getLogger(MessageReturnCallback.class);
private static Charset charset = Charset.forName("UTF-8");
@Autowired
private RabbitTemplate rabbitTemplate;
private JsonParser jsonParser;
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
String messageContent = new String(message.getBody(), charset);
LOG.warn(
"Return message from exchange {} and routing {}, reply-code is {}, reply-text is {}, message content is {}",
exchange, routingKey, replyCode, replyCode, messageContent);
Map<String, Object> map = jsonParser.toMap(messageContent);
String correlationId = (String) map.get("id");
if (null == correlationId) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
} else {
CorrelationData correlationData = new CorrelationData(correlationId);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
LOG.info("retry send message {} to exchange {} succeed.", messageContent, exchange);
}
}

注意:

  • confirm 主要是用来判断消息是否有正确到达交换机,如果有,那么就 ack 就返回 true;如果没有,则是 false。
  • return 则表示如果你的消息已经正确到达交换机,但是后续处理出错了,那么就会回调 return,并且把信息送回给你(前提是需要设置了 Mandatory,不设置那么就丢弃);如果消息没有到达交换机,那么不会调用 return 的东西。

消息消费可靠性

为了保证消息从队列可靠地到达 ConsumerRabbitMQ提供消息确认机制。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它

消息生产、消费的流程图:

ack 模式
  • NONE

    该模式下, 当 broker 发送消息成功后, 会立即将此消息从消息队列中删除, 而不会等待消费者的 ACK 回复

  • MANUAL
  • AUTO(默认模式)

    broker 发送消息给消费者时, 不会立即将此消息删除, 而是需要等待消费者的 ACK 回复后才会删除消息. 因此在手动 ACK 模式下, 当消费者收到消息并处理完成后, 需要向 broker 显示地发送 ACK 指令.

参考

RabbitMQ 实战(四)消费者 ack 以及 生产者 confirms

rabbitmq 消息可靠投递及消费机制

rabbitmq的发布确认和事务

RabbitMQ之消息确认机制(事务+Confirm)

有货RabbitMQ双活实践

Spring Boot集成RabbitMQ与ACK确认模式

热评文章