分布式消息中间件
在使用RabbitMQ
做消息分发时,主要有三个概念要注意:Exchange
,RoutingKey
,Queue
。
Exchange
可以理解为交换器,RoutingKey
可以理解为路由,Queue
作为真实存储消息的队列和某个Exchange
绑定,具体如何路由到感兴趣的Queue
则由Exchange
的三种模式决定:
fanout
:生产者往此Exchange发送的消息会发给每个和其绑定的Queue,此时RoutingKey并不起作用topic
: 生产者可以指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey满足此通配符的Queue就会收到消息direct
:生产者指定Exchange和RoutingKey,然后往其发送消息,消息只能被绑定的满足RoutingKey的Queue接受消息。(如果不指定RoutingKey的具体名字,那么默认的名字其实是Queue的名字)
生产者配置
|
|
源码分析
|
|
ConnectionFactory分析
org.springframework.amqp.rabbit.connection.ConnectionFactory
是 Spring AMQP 定义的连接工厂接口,负责创建连接.
|
|
CachingConnectionFactory
实现支持对这些通道的缓存,并且基于它们是否是事务来维护单独的通道高速缓存。如果要配置通道缓存的大小(默认值为25),可以调用setChannelCacheSize()
方法。
RabbitAdmin分析
org.springframework.amqp.core.AmqpAdmin
接口定义了AMQP基础管理操作,主要是对各种资源(交换机、队列、绑定)的申明和删除操作.org.springframework.amqp.rabbit.core.RabbitAdmin
实现了AmqpAdmin
接口.
|
|
如果使用xml配置:
|
|
当CachingConnectionFactory
缓存模式是CHANNEL(默认值)时,RabbitAdmin
实现会在同一个ApplicationContext
中声明的Queue
,Exchanges
和Bindings
自动延迟声明.
declareQueue
declareQueue
方法用来申明队列。org.springframework.amqp.core.Queue
是 Spring AMQP 对队列的封装,其属性与RabbitMQ Java client
中定义的Queue
的属性基本一致,new Queue("spring")
相当于RabbitMQ Java client
中 channel.queueDeclare("spring", true, false, false, null)
指定的队列特性,即队列是持久化、非排他性、非自动删除的.
|
|
|
|
declareQueue
调用rabbitTemplate
的execute(ChannelCallback)
方法,在ChannelCallback
的回调方法 doInRabbit(Channel channel)
中通过入參channel
调用RabbitMQ Java client
提供的channel.queueDeclare
方法申明队列。
RabbitTemplate分析
org.springframework.amqp.core.AmqpTemplate
接口定义了 AMQP 基础操作,主要为同步的消息收发方法。org.springframework.amqp.rabbit.core.RabbitTemplate
实现了 AmqpTemplate 接口。
RabbitTemplate
就是 RabbitMQ 收发消息的模板方法,所以,RabbitTemplate
要实现创建连接、获取channel、收发消息、消息格式转换、关闭信道与连接等模板代码。
消息发送
|
|
execute
方法中通过传入的ConnectionFactory
获取连接和信道,ChannelCallback
接口的doInRabbit(Channel channel)
方法作为回调函数,通过channel
参数接受execute
方法中获取的信道,完成消息发送的具体业务。
|
|
不管是生产者还是消费者都需要与RabbitMQ建立连接,这是一个TCP连接,在TCP连接上RabbitMQ会创建一个个虚拟的channel
,所有的RabbitMQ操作必须在channel
之上执行,使用channel
的好处是比TCP更节省系统资源。TCP连接就像电缆,AMQP channel就像纤维束.
doSend
方法实现消息发送这个具体操作。实际上调用RabbitMQ Java client提供的channel.basicPublish
方法发送消息。
|
|
通过将CachingConnectionFactory
的publisherConfirms
和publisherReturns
属性分别设置为“true”,支持确认和返回的消息。设置这些选项时,工厂创建的通道将被包装在PublisherCallbackChannel
中,该通道用于方法回调。当获得这样的频道时,客户端可以使用该channel
注册一个PublisherCallbackChannel.Listener
。 `PublisherCallbackChannel
实现包含将“确认”或“返回”路由到适当的监听器的逻辑。
业务操作完成后,execute
方法会回收连接和信道资源,整个消息发送模板功能完成。
Publisher的消息确认机制
当生产者发送消息的时候,需要考虑一个事情,就是这个消息是否提交成功, 是否送达到了队列中。
|
|
returnCallback
:用来判断消息是否正确到达绑定的队列,如果没有队列绑定到该exchange
,回调returnCallback接口
confirmCallback
:消息正确到达绑定的队列(如果设置了持久化,则消息必须持久化到磁盘),broker返回ack,回调confirmCallback
接口