分布式消息中间件
在使用RabbitMQ做消息分发时,主要有三个概念要注意:Exchange,RoutingKey,Queue。
Exchange可以理解为交换器,RoutingKey可以理解为路由,Queue作为真实存储消息的队列和某个Exchange绑定,具体如何路由到感兴趣的Queue则由Exchange的三种模式决定:
fanout:生产者往此Exchange发送的消息会发给每个和其绑定的Queue,此时RoutingKey并不起作用topic: 生产者可以指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey满足此通配符的Queue就会收到消息direct:生产者指定Exchange和RoutingKey,然后往其发送消息,消息只能被绑定的满足RoutingKey的Queue接受消息。(如果不指定RoutingKey的具体名字,那么默认的名字其实是Queue的名字)
生产者配置
|
|
源码分析
|
|
ConnectionFactory分析
org.springframework.amqp.rabbit.connection.ConnectionFactory 是 Spring AMQP 定义的连接工厂接口,负责创建连接.
|
|
CachingConnectionFactory实现支持对这些通道的缓存,并且基于它们是否是事务来维护单独的通道高速缓存。如果要配置通道缓存的大小(默认值为25),可以调用setChannelCacheSize()方法。
RabbitAdmin分析
org.springframework.amqp.core.AmqpAdmin接口定义了AMQP基础管理操作,主要是对各种资源(交换机、队列、绑定)的申明和删除操作.org.springframework.amqp.rabbit.core.RabbitAdmin实现了AmqpAdmin接口.
|
|
如果使用xml配置:
|
|
当CachingConnectionFactory缓存模式是CHANNEL(默认值)时,RabbitAdmin实现会在同一个ApplicationContext中声明的Queue,Exchanges和Bindings自动延迟声明.
declareQueue
declareQueue方法用来申明队列。org.springframework.amqp.core.Queue是 Spring AMQP 对队列的封装,其属性与RabbitMQ Java client中定义的Queue的属性基本一致,new Queue("spring")相当于RabbitMQ Java client中 channel.queueDeclare("spring", true, false, false, null)指定的队列特性,即队列是持久化、非排他性、非自动删除的.
|
|
|
|
declareQueue调用rabbitTemplate的execute(ChannelCallback)方法,在ChannelCallback的回调方法 doInRabbit(Channel channel) 中通过入參channel调用RabbitMQ Java client提供的channel.queueDeclare方法申明队列。
RabbitTemplate分析
org.springframework.amqp.core.AmqpTemplate接口定义了 AMQP 基础操作,主要为同步的消息收发方法。org.springframework.amqp.rabbit.core.RabbitTemplate实现了 AmqpTemplate 接口。
RabbitTemplate就是 RabbitMQ 收发消息的模板方法,所以,RabbitTemplate要实现创建连接、获取channel、收发消息、消息格式转换、关闭信道与连接等模板代码。
消息发送
|
|
execute方法中通过传入的ConnectionFactory获取连接和信道,ChannelCallback接口的doInRabbit(Channel channel)方法作为回调函数,通过channel参数接受execute方法中获取的信道,完成消息发送的具体业务。
|
|
不管是生产者还是消费者都需要与RabbitMQ建立连接,这是一个TCP连接,在TCP连接上RabbitMQ会创建一个个虚拟的channel,所有的RabbitMQ操作必须在channel之上执行,使用channel的好处是比TCP更节省系统资源。TCP连接就像电缆,AMQP channel就像纤维束.
doSend方法实现消息发送这个具体操作。实际上调用RabbitMQ Java client提供的channel.basicPublish方法发送消息。
|
|
通过将CachingConnectionFactory的publisherConfirms和publisherReturns属性分别设置为“true”,支持确认和返回的消息。设置这些选项时,工厂创建的通道将被包装在PublisherCallbackChannel中,该通道用于方法回调。当获得这样的频道时,客户端可以使用该channel注册一个PublisherCallbackChannel.Listener。 `PublisherCallbackChannel实现包含将“确认”或“返回”路由到适当的监听器的逻辑。
业务操作完成后,execute方法会回收连接和信道资源,整个消息发送模板功能完成。
Publisher的消息确认机制
当生产者发送消息的时候,需要考虑一个事情,就是这个消息是否提交成功, 是否送达到了队列中。
|
|
returnCallback:用来判断消息是否正确到达绑定的队列,如果没有队列绑定到该exchange,回调returnCallback接口
confirmCallback:消息正确到达绑定的队列(如果设置了持久化,则消息必须持久化到磁盘),broker返回ack,回调confirmCallback接口