spring-rabbit配置多connect-factory

多connect-factory时配置exchange, queue需使用declared-by

背景:

项目中需要监听多个rabbitmq服务器或者发消息到多个rabbitmq服务器,项目启动报错。结果如下:

1
2
3
4
5
6
7
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot redeclare exchange 'lamia.live.start' in vhost '/' with different type, durable, internal or autodelete value, class-id=40, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

问题原因:从错误来看,大概意思是重复声明了“exchange”。研究了下spring-rabbit中“exchange”的配置。

基本信息:

Broker:简单来说就是消息队列服务器实体,可以把一个rabbitmq server当作一个broker。

vhost虚拟主机:一个broker里可以开设多个vhost,用作不同用户的权限分离,也可以把他看作明名空间。vhost之间相互完全隔离,不同Vhost之间无法共享Exchange和Queue。(如果不指定vhost,默认是”/“)

Exchange:Exchange是属于Vhost的。同一个vhost不能有重复的Exchange名称(这个就是错误原因)。

Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

项目配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<!-- 声明消息转换器为SimpleMessageConverter -->
<bean id="messageConverter"
class="org.springframework.amqp.support.converter.SimpleMessageConverter">
</bean>
<!-- 2) 推送APN -->
<rabbit:connection-factory id="notificationConnectionFactory"
host="${push.apn.rabbit.connect.host}"
port="${push.apn.rabbit.connect.port}"
username="${push.apn.rabbit.connect.username}"
password="${push.apn.rabbit.connect.password}"
channel-cache-size="${push.apn.rabbitmq.connect.channelCacheSize}" />
<rabbit:admin id="notificationRabbitAdmin"
connection-factory="notificationConnectionFactory" />
<rabbit:template id="notificationRabbitTemplate"
connection-factory="notificationConnectionFactory"
exchange="${pns-push.notification.topic}" />
<rabbit:template id="clockedNotificationRabbitTemplate"
connection-factory="notificationConnectionFactory"
routing-key="${pns-push.clockedMsg.queue}" />
<!-- 3) 推送微信服务号 -->
<rabbit:connection-factory id="wechatConnectionFactory"
host="${push.wx.rabbit.connect.host}"
port="${push.wx.rabbit.connect.port}"
username="${push.wx.rabbit.connect.username}"
password="${push.wx.rabbit.connect.password}" />
<rabbit:template id="wechatRabbitTemplate"
connection-factory="wechatConnectionFactory" />
<rabbit:admin id="wechatRabbitAdmin" connection-factory="wechatConnectionFactory" />
<!-- 微信 推送预约提醒topic -->
<rabbit:topic-exchange name="lamia.live.start" durable="true" />
  • rabbit:connection-factory

spring-rabbit中,管理消息协商器(broker)连接的核心组件是ConnectionFactory这个接口。 ConnectionFactory提供了 org.springframework.amqp.rabbit.connection.Connection(com.rabbitmq.client.Connection的包装类)实例的连接与管理。

CachingConnectionFactory:是ConnectionFactory的在Spring AMQP中唯一实现,它创建一个连接代理,使程序可以共享连接。

Connection 提供一个createChannel的方法。CachingConnectionFactory 的实现能支持channels的缓存,并且能根据区分是事务性或非事务性各自独立。同时,CachingConnectionFactory也提供hostname的构造函数,并且可以设置usernamepasswordsetChannelCacheSize等方法。CachingConnectionFactory 默认channel cache 大小为1,如果想改变可以用setChannelCacheSize设置 channel-cache-size的大小。

  • rabbit:template

Spring AMQP提供了一个发送和接收消息的操作模板类AmqpTemplate。 AmqpTemplate它定义包含了发送和接收消息等的一些基本的操作功能。RabbitTemplate是AmqpTemplate的一个实现。

RabbitTemplate支持消息的确认与返回,为了返回消息,RabbitTemplate 需要设置mandatory 属性为true,并且CachingConnectionFactorypublisherReturns属性也需要设置为true。返回的消息会根据它注册的RabbitTemplate.ReturnCallback setReturnCallback 回调发送到给客户端,一个RabbitTemplate仅能支持一个ReturnCallback

为了确认Confirms消息, CachingConnectionFactorypublisherConfirms 属性也需要设置为true,确认的消息会根据它注册的RabbitTemplate.ConfirmCallback setConfirmCallback回调发送到给客户端。一个RabbitTemplate也仅能支持一个ConfirmCallback.

  • messageConverter

AmqpTemplate 定义提供了各种发送和接收委拖给MessageConverter转化对象消息的方法。MessageConverter 本身比较简单,它提供了消息对象的转化,可将object转化成Message 对象,或者将Message 对象转化成Object对象。它提供了默认的SimpleMessageConverter实现,以及第三方的MessageConverter,如Jackson2JsonMessageConverterMarshallingMessageConverter等,来处理消息与对象之间的转换。

  • rabbit:admin

CachingConnectionFactory 缓存模式是CHANNEL 时(默认的), RabbitAdmin 实现会在同一个ApplicationContext中自动延迟声明 Queues,ExchangesBindings.

默认情况下,所有queues, exchanges,和bindings 都可通过应用程序上下文中所有RabbitAdmin 实例来声明(设置了auto-startup="true").

从1.2版本开始,可以有条件地声明元素.当程序连接了多个brokers,并需要在哪些brokers上声明特定元素时,特别有用.

默认情况下,如果没有提供declared-by(或是空的), auto-declare 属性则为true,那么所有RabbitAdmin将声明对象(只要admin的auto-startup 属性为true,默认值)

错误原因:在这个例子中RabbitAdmin有2个,并且exchange‘lamia.live.start' 没有提供declared-by声明。故此,该exchange被这2个RabbitAdmin都声明了一次。而且刚好这2个RabbitAdmin使用的是同一个服务器上面的同一个vhost. 同一个vhost不能有重复的Exchange`名称。所以,项目启动报错。

正确姿势

exchangequeue上面加上declared-by.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<!-- 2) 推送APN -->
<rabbit:connection-factory id="notificationConnectionFactory"
host="${push.apn.rabbit.connect.host}"
port="${push.apn.rabbit.connect.port}"
username="${push.apn.rabbit.connect.username}"
password="${push.apn.rabbit.connect.password}"
channel-cache-size="${push.apn.rabbitmq.connect.channelCacheSize}" />
<rabbit:admin id="notificationRabbitAdmin"
connection-factory="notificationConnectionFactory" />
<rabbit:template id="notificationRabbitTemplate"
connection-factory="notificationConnectionFactory" exchange="${pns-push.notification.topic}" />
<rabbit:template id="clockedNotificationRabbitTemplate"
connection-factory="notificationConnectionFactory" routing-key="${pns-push.clockedMsg.queue}" />
<bean id="produceService" class="com.ximalaya.pns.device.service.impl.ProduceService"></bean>
<!-- 3) 推送微信服务号 -->
<rabbit:connection-factory id="wechatConnectionFactory"
host="${push.wx.rabbit.connect.host}"
port="${push.wx.rabbit.connect.port}"
username="${push.wx.rabbit.connect.username}"
password="${push.wx.rabbit.connect.password}" />
<rabbit:template id="wechatRabbitTemplate"
connection-factory="wechatConnectionFactory" />
<rabbit:admin id="wechatRabbitAdmin" connection-factory="wechatConnectionFactory" />
<!-- 微信 推送预约提醒topic -->
<rabbit:topic-exchange name="${push.wx.live.start.topic}" durable="true" declared-by="wechatRabbitAdmin" />

参考

spring-rabbit配置多vhost,多connect-factory

[Spring AMQP]

Spring AMQP 1.6完整参考指南-第二部分

Spring AMQP 1.6完整参考指南-第六部分

热评文章