rabbitmq延时队列

概述

rabbitmq 是目前使用最为普及的消息队列组件,基于 AMQP 的 rabbitmq 在各方面设计都比较完善,同时,它具有非常丰富的功能与特性,可以支持各种实际的适用场景。

但是rabbitmq并不直接支持延时队列的功能,本文我们就来介绍一下,如何使用 rabbitmq 实现一个延时队列。

延时队列的简易实现

使用redis集群来实现了这个功能,redis中存储了下单时间,以分钟为粒度扫描相应的key,即可扫出所有下单时间超过指定时间间隔的数据.

rabbitmq 与消息过期时间 – TTL

为队列设置消息过期时间

rabbitmq 支持在创建队列时对队列设置消息过期时间:

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

失效消息转发队列 – DLX

一旦上述消息过期时间设置生效,某条消息达到消息过期时间,那么他将会成为一条“dead-lettered”,此外,被拒绝的消息如果requeue属性为 false,或者消息所在队列已达到最大长度,那么他也将成为“dead-lettered”.

如果设置了DLX规则,即失效消息转发规则,那么失效的消息就会被转发到相应的exchange和queue.

通过代码设置失效消息转发队列

1
2
3
4
5
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

这样,一旦消息失效,则消息会被自动转发到你设置的x-dead-letter-exchange上的同名队列.

也可以通过下面的代码指定具体转发的目标 routing-key:

1
args.put("x-dead-letter-routing-key", "some-routing-key");

spring-rabbit 配置

可以使用spring-rabbit配置替代代码方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<rabbit:queue name="q.with.dlx">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="dlx"/>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="dlq"/>
<rabbit:direct-exchange name="dlx">
<rabbit:bindings>
<rabbit:binding key="q.with.dlx" queue="dlq"/>
</rabbit:bindings>
</rabbit:direct-exchange>

This assumes you routed the original message using the default direct exchange (routing by queue name). Hence the dead letter routing uses the same routing key (queue name). If you route using an explicit routing key, you would use that.

参考链接

RabbitMQ TTL

RabbitMQ DLX

Spring AMQP

热评文章