消息消费者有2种模式:pull或push,RabbitMQ通常使用push模式(也可以使用拉模式),Kafka采用pull模式。push更关注实时性,pull更关注消费者消费能力。
消费者配置
Java
项目中,一般会结合spring-amqp
框架来使用RabbitMQ
,spring-amqp
底层调用RabbitMQ
的java client来和Broker
交互,比如我们会用如下配置来建立RabbitMQ
的连接池、声明Queue以及指明监听者的监听行为:
|
|
concurrency
设置的是对每个listener
在初始化的时候设置的并发消费者的个数,prefetch
是每次从一次性从broker里面取的待消费的消息的个数(默认为1).
源码分析
|
|
MessageListenerContainer分析
org.springframework.amqp.rabbit.listener.MessageListenerContainer
可以看作MessageLinstener
的容器。MessageListenerContainer
只包含一个MessageListener
,可以生成多个线程使用相同的MessageListener
同时消费消息.
MessageListenerContainer
继承自SmartLifecycle
接口,该接口是Spring容器提供的与生命周期管理相关的接口,实现该接口的类会由Spring容器负责启动与停止。Spring加载和初始化所有bean后,会接着回调实现该接口的类中对应的start()
方法.
|
|
在doStart()
方法中调用initializeConsumers()
来初始化所有的消费者,AsyncMessageProcessingConsumer
作为真实的处理器包装了BlockingQueueConsumer
,而且AsyncMessageProcessingConsumer
其实实现了Runnable
接口.
|
|
container
启动的时候会根据设置的concurrency
的值创建n个BlockingQueueConsumer
AsyncMessageProcessingConsumer
|
|
获取消息:consumer.start()
BlockingQueueConsumer
内部维护了一个阻塞队列BlockingQueue
,这个queue不是对应RabbitMQ的队列,而是Consumer自己维护的内存级别的队列,用来暂时存储从RabbitMQ中取出来的消息.
|
|
调用channel.basicConsume
后,broker
就会不断往consumer
投递message
,每次prefetch
条。consumer
是通过异步方式来抓取message
的,BlockingQueue
的size是在异步地不断增长直到prefetch
。
当队列接受到消息时,amqp client
会主动调用InternalConsumer
的handleDelivery()
方法。该方法调用 BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body))
将消息放到BlockingQueueConsumer
的BlockingQueue<Delivery> queue
中。
|
|
处理消息
|
|
- 如果执行成功,则调用 AMQP 信道的
basicAck
方法确认消息消费成功。 - 如果执行过程中发生异常,则将异常转化为
ListenerExecutionFailedException
抛出。默认情况下,Spring AMQP 处理用户自定义异常的逻辑非常简单:调用 AMQP 信道的basicReject
方法将消息退回队列,但不会打破AsyncMessageProcessingConsumer
线程的while循环,消息消费继续进行。
|
|
org.springframework.amqp.core.AcknowledgeMode
定义了三种确认模式:
NONE
:不确认,相当于 amqp client 中Channel.basicConsume
方法中autoAck
参数值设为“true”,如果是这种模式,不管业务执行是否成功,broker都会收到ack从而删除该消息MANUAL
:用户通过手动控制消息确认,由业务自己确定什么时候ack(手动调用channel.basicAck
)AUTO
:Spring AMQP框架根据MessageListener
的onMessage
执行过程中是否抛出异常来决定是否确认消息消费
Spring创建CachingConnectionFactory
的时候,AcknowledgeMode
默认为 AUTO;也就是业务执行失败则消息回退队列,否则broker收到ack删除该消息。
总结
设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue
的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch
个数目的消息消费有问题,而不至于单消费者情况下整个RabbitMQ的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置concurrency
和prefetch
值。