RabbitMQ生产者分析

分布式消息中间件

在使用RabbitMQ做消息分发时,主要有三个概念要注意:ExchangeRoutingKeyQueue

Exchange可以理解为交换器,RoutingKey可以理解为路由,Queue作为真实存储消息的队列和某个Exchange绑定,具体如何路由到感兴趣的Queue则由Exchange的三种模式决定:

  • fanout:生产者往此Exchange发送的消息会发给每个和其绑定的Queue,此时RoutingKey并不起作用
  • topic : 生产者可以指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey满足此通配符的Queue就会收到消息
  • direct:生产者指定Exchange和RoutingKey,然后往其发送消息,消息只能被绑定的满足RoutingKey的Queue接受消息。(如果不指定RoutingKey的具体名字,那么默认的名字其实是Queue的名字)

生产者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<rabbit:connection-factory id="connectionFactory"
host="${rabbit.connect.host}" port="${rabbit.connect.port}" username="${rabbit.connect.username}"
password="${rabbit.connect.password}" channel-cache-size="${rabbit.connect.channelCacheSize}"
publisher-returns="true" publisher-confirms="true" />
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />
<bean id="messageConverter"
class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
<bean id="returnCallback"
class="com.zsr.test.callback.MessageReturnCallback" />
<bean id="confirmCallback"
class="com.zsr.test.callback.MessageConfirmCallback" />
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory" confirm-callback="confirmCallback"
return-callback="returnCallback" message-converter="messageConverter" />

源码分析

1
2
3
4
5
6
7
8
9
10
// Test
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new CachingConnectionFactory(5672);
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("test.queue"));
RabbitTemplate template = admin.getRabbitTemplate();
template.convertAndSend("spring", "test");
String foo = (String) template.receiveAndConvert("spring");
System.out.println(foo);
}

ConnectionFactory分析

org.springframework.amqp.rabbit.connection.ConnectionFactory 是 Spring AMQP 定义的连接工厂接口,负责创建连接.

1
2
3
4
5
6
7
8
9
10
// CachingConnectionFactory.class
public CachingConnectionFactory(String hostname, int port) {
// 真正的RabbitMQ连接工厂, 实际连接都是通过这个rabbitConnectionFactory创建的
super(new com.rabbitmq.client.ConnectionFactory());
if (!StringUtils.hasText(hostname)) {
hostname = getDefaultHostName();
}
setHost(hostname);
setPort(port);
}

CachingConnectionFactory实现支持对这些通道的缓存,并且基于它们是否是事务来维护单独的通道高速缓存。如果要配置通道缓存的大小(默认值为25),可以调用setChannelCacheSize()方法。

RabbitAdmin分析

org.springframework.amqp.core.AmqpAdmin接口定义了AMQP基础管理操作,主要是对各种资源(交换机、队列、绑定)的申明和删除操作.org.springframework.amqp.rabbit.core.RabbitAdmin实现了AmqpAdmin接口.

1
2
3
4
5
6
7
// RabbitAdmin.class
public RabbitAdmin(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
// 创建了一个RabbitTemplate实例
this.rabbitTemplate = new RabbitTemplate(connectionFactory);
}

如果使用xml配置:

1
2
3
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

CachingConnectionFactory缓存模式是CHANNEL(默认值)时,RabbitAdmin实现会在同一个ApplicationContext中声明的QueueExchangesBindings自动延迟声明.

declareQueue

declareQueue方法用来申明队列。org.springframework.amqp.core.Queue是 Spring AMQP 对队列的封装,其属性与RabbitMQ Java client中定义的Queue的属性基本一致,new Queue("spring")相当于RabbitMQ Java clientchannel.queueDeclare("spring", true, false, false, null)指定的队列特性,即队列是持久化、非排他性、非自动删除的.

1
2
3
4
5
6
7
8
9
// Queue.class
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
Assert.notNull(name, "'name' cannot be null");
this.name = name;
this.durable = durable;
this.exclusive = exclusive;
this.autoDelete = autoDelete;
this.arguments = arguments;
}
1
2
3
4
5
6
7
8
9
// RabbitAdmin.class
public String declareQueue(final Queue queue) {
return this.rabbitTemplate.execute(new ChannelCallback<String>() {
@Override
public String doInRabbit(Channel channel) throws Exception {
return declareQueues(channel, queue)[0].getQueue();
}
});
}

declareQueue调用rabbitTemplateexecute(ChannelCallback)方法,在ChannelCallback的回调方法 doInRabbit(Channel channel) 中通过入參channel调用RabbitMQ Java client提供的channel.queueDeclare方法申明队列。

RabbitTemplate分析

org.springframework.amqp.core.AmqpTemplate接口定义了 AMQP 基础操作,主要为同步的消息收发方法。org.springframework.amqp.rabbit.core.RabbitTemplate实现了 AmqpTemplate 接口。

RabbitTemplate就是 RabbitMQ 收发消息的模板方法,所以,RabbitTemplate要实现创建连接、获取channel、收发消息、消息格式转换、关闭信道与连接等模板代码。

消息发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// RabbitTemplate.class
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
// convertMessageIfNecessary方法将Object转化为Message实例
send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}
public void send(final String exchange, final String routingKey,
final Message message, final CorrelationData correlationData)
throws AmqpException {
// 实际在ChannelCallback接口的匿名实现类的doInRabbit(Channel)方法中实现发送消息功能
execute(new ChannelCallback<Object>() {
@Override
public Object doInRabbit(Channel channel) throws Exception {
doSend(channel, exchange, routingKey, message, correlationData);
return null;
}
});
}
// Message类是Spring AMQP对消息的封装
protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message) {
return (Message) object;
}
// 获取在RabbitTemplate构造函数中创建的MessageConverter,默认是SimpleMessageConverter
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}

execute方法中通过传入的ConnectionFactory获取连接和信道,ChannelCallback接口的doInRabbit(Channel channel)方法作为回调函数,通过channel参数接受execute方法中获取的信道,完成消息发送的具体业务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// RabbitTemplate.class
private <T> T doExecute(ChannelCallback<T> action) {
Assert.notNull(action, "Callback object must not be null");
// resourceHolder包装了connection和channel
RabbitResourceHolder resourceHolder = getTransactionalResourceHolder();
// 获取channel
Channel channel = resourceHolder.getChannel();
// 如果有confirmCallback或者returnCallback,则添加监听器,用于发送结果异步回调
if (this.confirmCallback != null || this.returnCallback != null) {
addListener(channel);
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Executing callback on RabbitMQ Channel: " + channel);
}
// 调用doSend方法,通过channel执行消息发送
return action.doInRabbit(channel);
}
catch (Exception ex) {
if (isChannelLocallyTransacted(channel)) {
resourceHolder.rollbackAll();
}
throw convertRabbitAccessException(ex);
}
finally {
// 回收连接和信道资源
ConnectionFactoryUtils.releaseResources(resourceHolder);
}
}

不管是生产者还是消费者都需要与RabbitMQ建立连接,这是一个TCP连接,在TCP连接上RabbitMQ会创建一个个虚拟的channel,所有的RabbitMQ操作必须在channel之上执行,使用channel的好处是比TCP更节省系统资源。TCP连接就像电缆,AMQP channel就像纤维束.

doSend方法实现消息发送这个具体操作。实际上调用RabbitMQ Java client提供的channel.basicPublish方法发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected void doSend(Channel channel, String exchange, String routingKey, Message message,
CorrelationData correlationData) throws Exception {
...some code...
if (this.confirmCallback != null && channel instanceof PublisherCallbackChannel) {
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(),
new PendingConfirm(correlationData, System.currentTimeMillis()));
}
boolean mandatory = this.returnCallback != null && this.mandatory;
MessageProperties messageProperties = message.getMessageProperties();
if (mandatory) {
messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_CORRELATION, this.uuid);
}
BasicProperties convertedMessageProperties = this.messagePropertiesConverter
.fromMessageProperties(messageProperties, encoding);
// 发送消息
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
}

通过将CachingConnectionFactorypublisherConfirmspublisherReturns属性分别设置为“true”,支持确认和返回的消息。设置这些选项时,工厂创建的通道将被包装在PublisherCallbackChannel中,该通道用于方法回调。当获得这样的频道时,客户端可以使用该channel注册一个PublisherCallbackChannel.Listener。 `PublisherCallbackChannel实现包含将“确认”或“返回”路由到适当的监听器的逻辑。

业务操作完成后,execute方法会回收连接和信道资源,整个消息发送模板功能完成。

Publisher的消息确认机制

当生产者发送消息的时候,需要考虑一个事情,就是这个消息是否提交成功, 是否送达到了队列中。

1
2
3
Returns are when the broker returns a message because it's undeliverable (no matching bindings on the exchange to which the message was published, and the mandatory bit is set).
Confirms are when the broker sends an ack back to the publisher, indicating that a message was successfully routed.

returnCallback:用来判断消息是否正确到达绑定的队列,如果没有队列绑定到该exchange,回调returnCallback接口

confirmCallback:消息正确到达绑定的队列(如果设置了持久化,则消息必须持久化到磁盘),broker返回ack,回调confirmCallback接口

参考

Spring AMQP 源码分析

Spring AMQP中文文档

ConfirmCallback vs ReturnCallback

热评文章