RabbitMQ消费者分析

消息消费者有2种模式:pull或push,RabbitMQ通常使用push模式(也可以使用拉模式),Kafka采用pull模式。push更关注实时性,pull更关注消费者消费能力。

消费者配置

Java项目中,一般会结合spring-amqp框架来使用RabbitMQspring-amqp底层调用RabbitMQ的java client来和Broker交互,比如我们会用如下配置来建立RabbitMQ的连接池、声明Queue以及指明监听者的监听行为:

1
2
3
4
5
6
7
8
<rabbit:connection-factory id="connectionFactory" />
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
<!-- template非必须,主要用于生产者发送消息-->
<rabbit:template id="template" connection-factory="connectionFactory" />
<rabbit:queue name="test.queue" />
<rabbit:listener-container connection-factory="connectionFactory" concurrency="1" prefetch="1">
<rabbit:listener ref="listener" queue-names="test.queue" />
</rabbit:listener-container>

concurrency设置的是对每个listener在初始化的时候设置的并发消费者的个数,prefetch是每次从一次性从broker里面取的待消费的消息的个数(默认为1).

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
// Test
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new CachingConnectionFactory(5672);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setQueueNames("spring");
container.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
});
container.start();
}

MessageListenerContainer分析

org.springframework.amqp.rabbit.listener.MessageListenerContainer可以看作MessageLinstener的容器。MessageListenerContainer只包含一个MessageListener,可以生成多个线程使用相同的MessageListener同时消费消息.

MessageListenerContainer继承自SmartLifecycle接口,该接口是Spring容器提供的与生命周期管理相关的接口,实现该接口的类会由Spring容器负责启动与停止。Spring加载和初始化所有bean后,会接着回调实现该接口的类中对应的start()方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// SimpleMessageListenerContainer.class
protected void doStart() throws Exception {
//some code
synchronized (this.consumersMonitor) {
int newConsumers = initializeConsumers();
//some code
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers.keySet()) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// 将消费者线程放在线程池中,每一个消费者一个线程,并发执行
this.taskExecutor.execute(processor);
}
//some code
}
}

doStart()方法中调用initializeConsumers()来初始化所有的消费者,AsyncMessageProcessingConsumer作为真实的处理器包装了BlockingQueueConsumer,而且AsyncMessageProcessingConsumer其实实现了Runnable接口.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected int initializeConsumers() {
int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
cancellationLock.reset();
this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
for (int i = 0; i < this.concurrentConsumers; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.add(consumer);
count++;
}
}
}
return count;
}

container启动的时候会根据设置的concurrency的值创建n个BlockingQueueConsumer

AsyncMessageProcessingConsumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.class
private final class AsyncMessageProcessingConsumer implements Runnable {
private final BlockingQueueConsumer consumer;
private final CountDownLatch start;
private volatile FatalListenerStartupException startupException;
private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
this.consumer = consumer;
this.start = new CountDownLatch(1);
}
public void run() {
...some code...
try {
// 消费者执行,每次从队列中获取prefetchCount数量的消息,放在消费者BlockingQueue中
consumer.start();
start.countDown();
} catch (FatalListenerStartupException ex) {
throw ex;
} catch (Throwable t) {
start.countDown();
handleStartupFailure(t);
throw t;
}
...some code...
boolean continuable = false;
while (isActive() || continuable) {
// isActive()表示MessageListenerContainer是否是活跃的,正常情况下都是true
try {
// 不停的尝试从消费者queue中获取Delivery实例,将之转化为Message,然后执行MessageListener 的onMessage回调方法
continuable = receiveAndExecute(consumer) && !isChannelTransacted();
} catch (ListenerExecutionFailedException ex) {
// 消息处理失败,不影响线程执行,继续获取消息进行处理
}
}
...some code...
}
}
获取消息:consumer.start()

BlockingQueueConsumer内部维护了一个阻塞队列BlockingQueue,这个queue不是对应RabbitMQ的队列,而是Consumer自己维护的内存级别的队列,用来暂时存储从RabbitMQ中取出来的消息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// BlockingQueueConsumer.class
private final BlockingQueue<Delivery> queue = new LinkedBlockingQueue<Delivery>();
public void start() throws AmqpException {
this.consumer = new InternalConsumer(channel);
...some code...
int passiveDeclareTries = 3;
do {
try {
if (!acknowledgeMode.isAutoAck()) {
// 声明每次获取消息数量
channel.basicQos(prefetchCount);
}
for (int i = 0; i < queues.length; i++) {
// 声明每个队列,消息来源可能是多个队列
channel.queueDeclarePassive(queues[i]);
}
passiveDeclareTries = 0;
} catch (IOException e) {
...some code...
}
} while (passiveDeclareTries-- > 0);
try {
for (int i = 0; i < queues.length; i++) {
// 对每个队列,都会调用basicConsume方法让InternalConsumer监听当前队列
channel.basicConsume(queues[i], acknowledgeMode.isAutoAck(), consumer);
}
} catch (IOException e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
}

调用channel.basicConsume后,broker就会不断往consumer投递message,每次prefetch条。consumer是通过异步方式来抓取message的,BlockingQueue的size是在异步地不断增长直到prefetch

当队列接受到消息时,amqp client会主动调用InternalConsumerhandleDelivery()方法。该方法调用 BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body))将消息放到BlockingQueueConsumerBlockingQueue<Delivery> queue中。

1
2
3
4
5
6
7
8
9
10
11
// BlockingQueueConsumer$InternalConsumer.class
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
...some code...
try {
queue.put(new Delivery(envelope, properties, body));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
处理消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.class
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {
Channel channel = consumer.getChannel();
for (int i = 0; i < txSize; i++) {
// 从消费者队列中获取Delivery,将之转化为 Message
Message message = consumer.nextMessage(receiveTimeout);
if (message == null) {
break;
}
try {
// 执行MessageListener的onMessage回调方法
executeListener(channel, message);
} catch (ImmediateAcknowledgeAmqpException e) {
break;
} catch (Throwable ex) {
// 消息处理失败,执行channel.basicReject()
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));
}
  1. 如果执行成功,则调用 AMQP 信道的basicAck方法确认消息消费成功。
  2. 如果执行过程中发生异常,则将异常转化为ListenerExecutionFailedException抛出。默认情况下,Spring AMQP 处理用户自定义异常的逻辑非常简单:调用 AMQP 信道的basicReject方法将消息退回队列,但不会打破AsyncMessageProcessingConsumer线程的while循环,消息消费继续进行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
...some code...
try {
boolean ackRequired = !acknowledgeMode.isAutoAck() && !acknowledgeMode.isManual();
// 需要ack
if (ackRequired) {
...some code...
if (!deliveryTags.isEmpty()) {
long deliveryTag = new ArrayList<Long>(deliveryTags).get(deliveryTags.size() - 1);
// 发送broker消息确认,方便删除该条消息
channel.basicAck(deliveryTag, true);
}
}
...some code...
} finally {
deliveryTags.clear();
}
return true;
}

org.springframework.amqp.core.AcknowledgeMode定义了三种确认模式:

  • NONE:不确认,相当于 amqp client 中Channel.basicConsume方法中autoAck参数值设为“true”,如果是这种模式,不管业务执行是否成功,broker都会收到ack从而删除该消息
  • MANUAL:用户通过手动控制消息确认,由业务自己确定什么时候ack(手动调用channel.basicAck)
  • AUTO:Spring AMQP框架根据MessageListeneronMessage执行过程中是否抛出异常来决定是否确认消息消费

Spring创建CachingConnectionFactory的时候,AcknowledgeMode默认为 AUTO;也就是业务执行失败则消息回退队列,否则broker收到ack删除该消息。

总结

设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch个数目的消息消费有问题,而不至于单消费者情况下整个RabbitMQ的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置concurrencyprefetch值。

参考

[Spring AMQP中文文档]

[Spring AMQP 源码分析 MessageListener]

RabbitMQ消费者的几个参数

Spring SmartLifecycle 在容器所有bean加载和初始化完毕执行

热评文章