消息消费者有2种模式:pull或push,RabbitMQ通常使用push模式(也可以使用拉模式),Kafka采用pull模式。push更关注实时性,pull更关注消费者消费能力。
消费者配置
Java项目中,一般会结合spring-amqp框架来使用RabbitMQ,spring-amqp底层调用RabbitMQ的java client来和Broker交互,比如我们会用如下配置来建立RabbitMQ的连接池、声明Queue以及指明监听者的监听行为:
|
|
concurrency设置的是对每个listener在初始化的时候设置的并发消费者的个数,prefetch是每次从一次性从broker里面取的待消费的消息的个数(默认为1).
源码分析
|
|
MessageListenerContainer分析
org.springframework.amqp.rabbit.listener.MessageListenerContainer可以看作MessageLinstener的容器。MessageListenerContainer只包含一个MessageListener,可以生成多个线程使用相同的MessageListener同时消费消息.
MessageListenerContainer继承自SmartLifecycle接口,该接口是Spring容器提供的与生命周期管理相关的接口,实现该接口的类会由Spring容器负责启动与停止。Spring加载和初始化所有bean后,会接着回调实现该接口的类中对应的start()方法.
|
|
在doStart()方法中调用initializeConsumers()来初始化所有的消费者,AsyncMessageProcessingConsumer作为真实的处理器包装了BlockingQueueConsumer,而且AsyncMessageProcessingConsumer其实实现了Runnable接口.
|
|
container启动的时候会根据设置的concurrency的值创建n个BlockingQueueConsumer
AsyncMessageProcessingConsumer
|
|
获取消息:consumer.start()
BlockingQueueConsumer内部维护了一个阻塞队列BlockingQueue,这个queue不是对应RabbitMQ的队列,而是Consumer自己维护的内存级别的队列,用来暂时存储从RabbitMQ中取出来的消息.
|
|
调用channel.basicConsume后,broker就会不断往consumer投递message,每次prefetch条。consumer是通过异步方式来抓取message的,BlockingQueue的size是在异步地不断增长直到prefetch。
当队列接受到消息时,amqp client会主动调用InternalConsumer的handleDelivery()方法。该方法调用 BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body))将消息放到BlockingQueueConsumer的BlockingQueue<Delivery> queue中。
|
|
处理消息
|
|
- 如果执行成功,则调用 AMQP 信道的
basicAck方法确认消息消费成功。 - 如果执行过程中发生异常,则将异常转化为
ListenerExecutionFailedException抛出。默认情况下,Spring AMQP 处理用户自定义异常的逻辑非常简单:调用 AMQP 信道的basicReject方法将消息退回队列,但不会打破AsyncMessageProcessingConsumer线程的while循环,消息消费继续进行。
|
|
org.springframework.amqp.core.AcknowledgeMode定义了三种确认模式:
NONE:不确认,相当于 amqp client 中Channel.basicConsume方法中autoAck参数值设为“true”,如果是这种模式,不管业务执行是否成功,broker都会收到ack从而删除该消息MANUAL:用户通过手动控制消息确认,由业务自己确定什么时候ack(手动调用channel.basicAck)AUTO:Spring AMQP框架根据MessageListener的onMessage执行过程中是否抛出异常来决定是否确认消息消费
Spring创建CachingConnectionFactory的时候,AcknowledgeMode默认为 AUTO;也就是业务执行失败则消息回退队列,否则broker收到ack删除该消息。
总结
设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch个数目的消息消费有问题,而不至于单消费者情况下整个RabbitMQ的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置concurrency和prefetch值。