# Spring Boot 集成 RabbitMQ
本文主要介绍如何在 Spring Boot 项目中使用 RabbitMQ,源代码请参考:
https://github.com/davenkin/spring-amqp-learning (opens new window)
# 1. docker方式启动RabbitMQ容器
创建一个 docker-compose.yml 文件:
version: "2"
services:
rabbitmq:
restart: always
container_name: spring-amqp-learning-rabbitmq
image: rabbitmq:3-management
environment:
- "RABBITMQ_DEFAULT_USER=user"
- "RABBITMQ_DEFAULT_PASS=password"
ports:
- "15673:15672"
- "5673:5672"
此时,切换到在与docker-compose.yml文件相同目录,命令行运行docker-compose up后,通过http://localhost:15673/ (opens new window)可以访问 RabbitMQ 的管理页面:

# 2. 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.fengye</groupId>
<artifactId>spring-boot-rabbitmq</artifactId>
<version>1.0.0</version>
<name>spring-boot-rabbitmq</name>
<description>Spring Boot 集成 RabbitMQ</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
# 3. application.yml文件中配置RabbitMQ
spring:
rabbitmq:
host: localhost
port: 5673
virtual-host: /
username: rabbitmq-user
password: rabbitmq-password
# 4. 配置 Queue
创建RabbitHelloWorldConfiguration.java:
@Configuration
public class RabbitHelloWorldConfiguration {
@Bean
public Queue helloWorldQueue() {
return new Queue("HelloWorldQueue", false, false, false);
}
}
# 5. 发送消息
创建RabbitHelloWorldController.java:
@RestController
@RequestMapping(value = "/rabbitmq")
public class RabbitHelloWorldController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "/hello")
public void helloWorld() {
rabbitTemplate.convertAndSend("HelloWorldQueue", "HelloWorld!" + LocalDateTime.now().toString());
}
}
# 6. 接收消息
创建RabbitHelloWorldListener.java:
@Component
@RabbitListener(queues = "HelloWorldQueue")
@Slf4j
public class RabbitHelloWorldListener {
@RabbitHandler
public void receiveHelloWorld(String queueMessage) {
log.info("Received message:{}", queueMessage);
}
}
# 7. 验证
发送消息curl http://localhost:8080/rabbitmq/hello,此时可以在命令行中看到日志输出:
2019-07-25 09:17:59.871 -- INFO [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.e.o.r.h.RabbitHelloWorldListener : Received message:HelloWorld!2019-07-25T09:17:59.869
表明消息已经成功发送并接收。
在对上文的“Hello World”做出解释之前,先介绍一下 Spring AMQP 中的主要对象类以及 Spring Boot 对 RabbitMQ 的默认配置。
# Spring AMQP 的主要对象
Spring AMQP 主要对象类如下:
| 类 | 作用 |
|---|---|
| Queue | 对应 RabbitMQ 中 Queue |
| AmqpTemplate | 接口,用于向 RabbitMQ 发送和接收 Message |
| RabbitTemplate | AmqpTemplate 的实现类 |
| @RabbitListener | 指定消息接收方,可以配置在类和方法上 |
| @RabbitHandler | 指定消息接收方,只能配置在方法上,可以与@RabbitListener 一起使用 |
| Message | 对 RabbitMQ 消息的封装 |
| Exchange | 对 RabbitMQ 的 Exchange 的封装,子类有 TopicExchange、FanoutExchange 和 DirectExchange 等 |
| Binding | 将一个 Queue 绑定到某个 Exchange,本身只是一个声明,并不做实际绑定操作 |
| AmqpAdmin | 接口,用于 Exchange 和 Queue 的管理,比如创建/删除/绑定等,自动检查 Binding 类并完成绑定操作 |
| RabbitAdmin | AmqpAdmin 的实现类 |
| ConnectionFactory | 创建 Connection 的工厂类,RabbitMQ 也有一个名为 ConnectionFactory 的类但二者没有继承关系,Spring ConnectionFactory 可以认为是对 RabbitMQ ConnectionFactory 的封装 |
| CachingConnectionFactory | Spring ConnectionFactory 的实现类,可以用于缓存 Channel 和 Connection |
| Connection | Spring 中用于创建 Channel 的连接类,RabbitMQ 也有一个名为 Connection 的类,但二者没有继承关系,Spring Connection 是对 RabbitMQ Connection 的封装 |
| SimpleConnection | Spring Connection 的实现类,将实际工作代理给 RabbitMQ 的 Connection 类 |
| MessageListenerContainer | 接口,消费端负责与 RabbitMQ 服务器保持连接并将 Message 传递给实际的@RabbitListener/@RabbitHandler 处理 |
| RabbitListenerContainerFactory | 接口,用于创建 MessageListenerContainer |
| SimpleMessageListenerContainer | MessageListenerContainer 的实现类 |
| SimpleRabbitListenerContainerFactory | RabbitListenerContainerFactory 的实现类 |
| RabbitProperties | 用于配置 Spring AMQP 的 Property 类 |
对于发送方而言,需要做以下配置:
- 配置 CachingConnectionFactory
- 配置 Exchange/Queue/Binding
- 配置 RabbitAdmin 创建上一步的 Exchange/Queue/Binding
- 配置 RabbitTemplate 用于发送消息,RabbitTemplate 通过 CachingConnectionFactory 获取到 Connection,然后想指定 Exchange 发送
对于消费方而言,需要做以下配置:
- 配置 CachingConnectionFactory
- 配置 Exchange/Queue/Binding
- 配置 RabbitAdmin 创建上一步的 Exchange/Queue/Binding
- 配置 RabbitListenerContainerFactory
- 配置@RabbitListener/@RabbitHandler 用于接收消息
可以看到,在 Spring 中使用 RabbitMQ 的配置还不少,但是 Spring Boot 已经帮我们做了很多默认配置,才使得我们刚才的“Hello World”可以如此简单的完成。
# Spring Boot 的默认 RabbitMQ 设置
在默认情况下,Spring Boot 为我们配置了一下 RabbitMQ 的对象:
- 一个 CachingConnectionFactory
- 一个名为
rabbitListenerContainerFactory的SimpleRabbitListenerContainerFactory,默认自动查找 IoC 容器中的 MessageConverter - 一个 RabbitTemplate
- 一个 AmqpAdmin
因此,通常情况下,我们只需要配置自己的 Exchange/Queue/Binding 即可。
# 对 Hello World 的解释
有了以上知识,再让我们来看看刚才的“Hello World”背后发生了什么:
- 首先,我们配置了一个名为“HelloWorldQueue”的 Queue,但是并没有配置 Exchange,也没有配置“HelloWorldQueue”的 Binding,这其实使用了 RabbitMQ 的默认行为,即所有 Queue 都以其自身的名称为 routingKey 绑定到了一个默认的 Exchange 上,该默认 Exchange 的名称为
“”。 - 由于 Spring Boot 自动配置了 AmqpAdmin,该 AmqpAdmin 将自动向 RabbitMQ 创建名为“HelloWorldQueue”的 Queue。
- 在发送消息的时候,我们直接使用了 RabbitTemplate,这个 RabbitTemplate 是 Spring Boot 自动为我们配置好的,RabbitTemplate 所依赖的 CachingConnectionFactory 也由 Spring Boot 自动配置。
- 发送消息时指定了 routingKey 为“HelloWorldQueue”,但是没有指定 Exchange,此时消息将会发送到 RabbitMQ 默认的 Exchange,又由于名为“HelloWorldQueue”的 Queue 向默认 Exchange 绑定的 routingKey 正是“HelloWorldQueue”,因此消息将由默认 Exchange 转发到名为“HelloWorldQueue” 的 Queue。至此发送方任务结束。
- 在消息接收方,由于 Spring Boot 默认为我们配置了
SimpleRabbitListenerContainerFactory,因此只需要配置@RabbitListener和@RabbitHandler接收消息即可。
另外,Spring Boot 采用了很多默认配置,通过统一的RabbitProperties同时完成消费方和生产方的配置,在默认情况下主要的配置如下:
| 配置项 | 默认值 | 作用 |
|---|---|---|
| host | localhost | RabbitMQ 服务器地址 |
| port | 5672 | RabbitMQ 服务器端口 |
| username | 账户名 | guest |
| password | 密码 | guest |
| virtualHost | RabbitMQ 虚拟主机名 | / |
| publisherConfirms | false | 设置是否启用生产方确认 |
| publisherReturns | false | 设置是否启用生产方消息返回 |
| ssl | 对象 | 配置 SSL,默认停用 |
| template | 对象 | 设置 RabbitTemplate |
| template.retry | 默认停用 | 设置 RabbitTemplate 发送消息时的重试,主要用于 RabbitTemplate 与 RabbitMQ 之间的网络连接 |
| template.mandatory | false | 设置发送消息失败时(无接收 queue)是否 return 消息,与 return callback 一并使用 |
| template.exchange | "" | 默认发送的 exchange |
| template.routingKey | "" | 默认发送消息时的 routing key |
| template.defaultReceiveQueue | null | 默认接收消息的 queue |
| listener.simple | 对象 | 设置 SimpleRabbitListenerContainerFactory |
| listener.direct | 对象 | 设置 DirectRabbitListenerContainerFactory |
| listener.simple.concurrency | null | 并发消费方数量 |
| listener.simple.acknowledgeMode | AUTO | 设置消费方确认模式,这里的 AUTO 与 RabbitMQ 的自动确认不是一回事 |
| listener.simple.prefetch | 250 | 设置消费方一次性接收消息的条数 |
| listener.simple.defaultRequeueRejected | true | 当 Listener 发生异常时是否 requeue |
| listener.simple.retry | 对象 | 设置 Listener 的重试机制,默认停用,当启用时,Listener 对于消息处理过程中的异常将进行 requeue 重试,超过重试次数再抛弃,此时 AmqpRejectAndDontRequeueException 异常也会被重试 |
所有默认配置如下:
spring:
rabbitmq:
host: localhost
port: 5673
virtual-host: /
username: rabbitmq-user
password: rabbitmq-password
ssl:
enabled: false
keyStore:
keyStoreType: PKCS12
keyStorePassword:
trustStore:
trustStoreType: JKS
trustStorePassword:
algorithm:
validateServerCertificate: true
verifyHostname: true
addresses:
requestedHeartbeat:
publisherConfirms: false
publisherReturns: false
connectionTimeout:
cache:
channel:
size:
checkoutTimeout:
connection:
mode: CHANNEL
size:
listener:
type: SIMPLE
simple:
autoStartup: true
acknowledgeMode:
prefetch:
defaultRequeueRejected:
idleEventInterval:
retry:
enabled: false
maxAttempts: 3
initialInterval: PT1S
multiplier: 1
maxInterval: PT10S
stateless: true
concurrency:
maxConcurrency:
transactionSize:
missingQueuesFatal: true
direct:
autoStartup: true
acknowledgeMode:
prefetch:
defaultRequeueRejected:
idleEventInterval:
retry:
enabled: false
maxAttempts: 3
initialInterval: PT1S
multiplier: 1
maxInterval: PT10S
stateless: true
consumersPerQueue:
missingQueuesFatal: false
template:
retry:
enabled: false
maxAttempts: 3
initialInterval: PT1S
multiplier: 1
maxInterval: PT10S
mandatory:
receiveTimeout:
replyTimeout:
exchange: ''
routingKey: ''
defaultReceiveQueue:
parsedAddresses:
# Exchange 和 Queue 的定义
在使用 RabbitMQ 时,Exchange 和 Queue 的定义是需要精心设计的rabbitmq-best-practices-for-designing-exchanges-queues-and-bindings (opens new window),在笔者的RabbitMQ 最佳实践 (opens new window)文章中,我建议每个微服务对应一个 Exchange 等方式。
另外,除了定义正常情况下的 Exchange 和 Queue 之外,我们还需要考虑到异常情况,因此我们还需要配置死信交换(Dead Letter Exchange, DLX)和死信队列(Dead Letter Queue, DLQ)用于对异常情况的处理。
接下来,我们围绕以下业务场景来定义相应的 Exchange 和 Queue:
在电商系统中,当订单(Order)创建之后,需要给用于发送短信通知。
我们将采用 RabbitMQ 来完成这个过程,即下单之后发送消息,短信服务监听该消息然后完成短信发送。
在消息发送方,首先需要定义一个 Exchange 作为接收消息的第一站,在OrderRabbitMqConfig.java中配置:
//"发送方Exchange"
@Bean
public TopicExchange orderPublishExchange() {
return new TopicExchange(ORDER_PUBLISH_EXCHANGE, true, false, ImmutableMap.of("alternate-exchange", ORDER_PUBLISH_DLX));
}
对于发送方而言,我们只需要关系消息能够成功发送到 Exchange 即可,而不用关心该消息实际是否到达了某个 Queue 以及哪个 Queue。因此,发送方并不具备定义 Queue 的职责,定义一个 Exchange 即可。
然而,有时发送到 Exchange 的消息没有任何 Queue 可以接收,要么因为根本就没有绑定 Queue,要么消息的 RoutingKey 设置导致无法路由到任何 Queue。严格意义上讲,这些问题也不是发送发需要关心的事情,然而在很多时候,消息的发送方和接收方事实上都是关系密切的系统,为了更好的支持接收方,笔者建议在发送方将那些无法路由到任何 Queue 的消息先保存下来,以便后续处理。在 RabbitMQ 中,可以通过定义 Exchange 时的alternate-exchange参数指定当消息无法路由时应该发到的 Exchange。
在上面定义的ORDER_PUBLISH_EXCHANGE中,我们其实已经指定了该 Exchange 的alternate-exchange为ORDER_PUBLISH_DLX,ORDER_PUBLISH_DLX的定义如下:
//"发送方DLX",消息发送失败时传到该DLX
@Bean
public TopicExchange orderPublishDlx() {
return new TopicExchange(ORDER_PUBLISH_DLX, true, false, null);
}
另外,定义一个 Queue(ORDER_PUBLISH_DLQ)用于存储ORDER_PUBLISH_DLX中的消息:
//"发送方DLQ",所有发到"发送DLX"的消息都将路由到该DLQ
@Bean
public Queue orderPublishDlq() {
return new Queue(ORDER_PUBLISH_DLQ, true, false, false, ImmutableMap.of("x-queue-mode", "lazy"));
}
//"发送方DLQ"绑定到"发送方DLX"
@Bean
public Binding orderPublishDlqBinding() {
return BindingBuilder.bind(orderPublishDlq()).to(orderPublishDlx()).with("#");
}
此时,当发送方的消息到达了ORDER_PUBLISH_EXCHANGE而无法路由到任何 Queue 时,该消息会被 RabbitMQ 发送到ORDER_PUBLISH_DLX,进而路由到ORDER_PUBLISH_DLQ。
至此,发送发的 RabbitMQ 配置完成。
在消息接收方,首先定义用于接收消息的 Queue(ORDER_RECEIVE_QUEUE):
//接收方的所有消息都发送到该"接收方Queue",即"接收方queue"可以绑定多个"发送方Exchange"
@Bean
public Queue orderReceiveQueue() {
ImmutableMap<String, Object> args = ImmutableMap.of(
"x-dead-letter-exchange",
ORDER_RECEIVE_DLX,
"x-overflow",
"drop-head",
"x-max-length",
300000,
"x-message-ttl",
24 * 60 * 60 * 1000);
return new Queue(ORDER_RECEIVE_QUEUE, true, false, false, args);
}
//"接收方queue"绑定到"发送方exchange"
@Bean
public Binding orderReceiveBinding() {
return BindingBuilder.bind(orderReceiveQueue()).to(orderPublishExchange()).with("order.#");
}
在消息处理的过程中,可能发生异常,此时需要将异常消息记录下来,同样采用 DLX 和 DLQ 机制。在定义ORDER_RECEIVE_QUEUE时,事实上我们已经指定了 DLX 为ORDER_RECEIVE_DLX,定义该 DLX 和对应的 DLQ(ORDER_RECEIVE_DLQ)为:
//"接收方DLX",消息处理失败时传到该DLX
@Bean
public TopicExchange orderReceiveDlx() {
return new TopicExchange(ORDER_RECEIVE_DLX, true, false, null);
}
//"接收方DLQ",所有发到"接收DLX"的消息都将路由到该DLQ
@Bean
public Queue orderReceiveDlq() {
return new Queue(ORDER_RECEIVE_DLQ, true, false, false, ImmutableMap.of("x-queue-mode", "lazy"));
}
//"接收方DLQ"绑定到"接收方DLX"
@Bean
public Binding orderReceiveDlqBinding() {
return BindingBuilder.bind(orderReceiveDlq()).to(orderReceiveDlx()).with("#");
}
此外,对于发送到ORDER_RECEIVE_DLQ的死信消息,我们需要对其进行手动处理,一种处理方式便是在后台定期运行一个 Job 将死信消息重新处理。为此,我们需要定义一个用于手动回复的 Exchange(ORDER_RECEIVE_RECOVER_EXCHANGE),并将接收方的所有的消费 Queue 绑定到该 Exchange:
//"接收方恢复Exchange",用于手动将"接收方DLQ"中的消息发到该DLX进行重试
@Bean
public TopicExchange orderReceiveRecoverExchange() {
return new TopicExchange(ORDER_RECEIVE_RECOVER_EXCHANGE, true, false, null);
}
//"接收方Queue"绑定到"接收方恢复Exchange"
@Bean
public Binding orderReceiveRecoverBinding() {
return BindingBuilder.bind(orderReceiveQueue()).to(orderReceiveRecoverExchange()).with("#");
}
以上关于 Exchange/Queue 和 DLX/DLQ 的配置可以成为一种模式,使得我们一方面能够尽量少地配置 RabbitMQ 的基础设施,另一方面又能对异常情况进行处理。在该模式下存在以下概念:
| 概念 | 类型 | 解释 | 命名 | 示例 |
|---|---|---|---|---|
| 发送方 Exchange | Exchange | 用于接收一个系统(比如微服务)中所有消息的 Exchange,一个系统只有一个发送方Exchange | xxx-publish-x | order-publish-x |
| 发送方 DLX | Exchange | 用于接收发送方无法路由的消息 | xxx-publish-dlx | order-publish-dlx |
| 发送方 DLQ | Queue | 用于存放发送方DLX的消息 | xxx-publish-dlq | order-publish-dlq |
| 接收方 Queue | Queue | 用于接收发送方Exchange的消息,一个系统只有一个接收方Queue | xxx-receive-q | product-receive-q |
| 接收方 DLX | Exchange | 用于接收消费失败的消息 | xxx-receive-dlx | product-receive-dlx |
| 接收方 DLQ | Queue | 用于存放接收方DLX的消息 | xxx-receive-dlq | product-receive-dlq |
| 接收方恢复 Exchange | Exchange | 用于接收从接收方DLQ中手动恢复的消息,接收方Queue应该绑定到接收方恢复Exchange | xxx-receive-recover-x | product-receive-recover-x |
该模式的 RabbitMQ 配置架构图如下:

一种 RabbitMQ 配置架构
# 消息序列化
RabbitMQ 中所有消息都是二进制的,在 Spring 中我们需要在 Java 对象与 RabbitMQ 消息体之间进行相互转换。在默认情况下,Spring Boot 使用了SimpleMessageConverter用于转换 String,Java 的 Serializable 对象以及字节数组。通常情况下,我们希望使用 Json 格式,此时可以采用以下配置:
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(objectMapper);
messageConverter.setClassMapper(classMapper());
return messageConverter;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
classMapper.setTrustedPackages("*");
return classMapper;
}
Spring 的RabbitTemplate和RabbitListenerContainerFactory会自动找到以上配置的MessageConverter并使用之。
此时,在RabbitTemplate发送消息时,它会在消息的头部加入 Java 对象的类型信息,然后在RabbitListener接收到消息时,通过该头部的类型信息指导应该发序列化为哪个类的对象。
在消费消息时,配置@RabbitListener:
@Component
@RabbitListener(queues = {ORDER_RECEIVE_QUEUE})
public class OrderEventNotificationListener {
private Logger logger = AutoNamingLoggerFactory.getLogger();
@RabbitHandler
public void onOrderCreated(OrderCreatedEvent event) {
// send notification
//sendSmsNotification(event);
logger.info("Notification sent for OrderCreatedEvent:{}", event.getOrderId());
throw new RuntimeException("");
}
}
其中,@RabbitListener可以用于类和方法,而@RabbitHandler只能用于方法,通常的用法是:@RabbitListener用在类上,接收不同 Queue 的不同类型消息,然后在方法上使用@RabbitHandler来处理特定类型的消息。
需要指出的是,不同的处理方法之间必须没有歧义,即我们需要保证对于一种类型的消息,只有一个方法可以接收。比如:对于 OrderCreatedEvent,如果它还有个父类叫 OrderEvent,那么如果再添加一个方法用于处理 OrderEvent,那么将会报错:
@Component
@RabbitListener(queues = ${ORDER_RECEIVE_QUEUE})
public class OrderEventNotificationListener {
private Logger logger = AutoNamingLoggerFactory.getLogger();
@RabbitHandler
public void onOrderCreated(OrderCreatedEvent event) {
// send notification
//sendSmsNotification(event);
logger.info("Notification sent for OrderCreatedEvent:{}", event.getOrderId());
}
@RabbitHandler
public void onOrderEvent(OrderEvent event) {
logger.info("Received order event");
}
}
此时报错为:
Caused by: org.springframework.amqp.AmqpException: Ambiguous methods for payload type: class com.ecommerce.order.order.model.OrderCreatedEvent: onOrderCreated and onOrderEvent
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.findHandlerForPayload(DelegatingInvocableHandler.java:206)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:129)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:60)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
... 26 common frames omitted
# 发送方确认
要使用发送方确认需要:
- 设置
publisherConfirms = true - 在发送消息时,加入
CorrelationData:
rabbitTemplate.convertAndSend(ORDER_PUBLISH_EXCHANGE,
"order.created",
event,
new CorrelationData(event.getOrderId()));
- 在 RabbitTemplate 中注册确认 Callback
setConfirmCallback(),并通过CorrelationData确定是哪条消息被确认了:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String eventId = correlationData.getId();
if (ack) {
logger.info("Publish confirmed event[{}].", eventId);
} else {
logger.warn("Domain event[{}] is nacked while publish:{}.", eventId, cause);
}
});
在正常情况下,运行OrderApiTest.should_create_order(),输出的日志为:
2019-07-27 14:09:13.517 -- INFO [AMQP Connection 127.0.0.1:5673] c.e.o.order.OrderApplicationService : Publish confirmed event[89cacd65a550493c95957729c4f24066].
表明消息被 RabbitMQ 确认已经收到。在 RabbitMQ 无法确保消息成功投递时,将作 nack 确认,比如发送方Exchange不存在,此时输出的日志为:
2019-07-27 14:14:12.731 -- WARN [rabbitConnectionFactory1] c.e.o.order.OrderApplicationService : Domain event[d27e1afdd4ea449d827af272209e9125] is nacked while publish:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dfd' in vhost '/', class-id=60, method-id=40).
要使用发送方返回需要:
- 设置
publisherReturns = true - 设置
template.mandatory = true - 在 RabbitTemplate 中注册返回 Callback
setReturnCallback()
通常来讲,我们没有必要对返回消息进行处理,因此这里不展开讲解。
# 消费方确认
Spring AMQP 在默认情况下是启用了消费方确认的,它会根据消费方是否抛出异常来决定是 ack 还是 nack。
AcknowledgeMode用于配置 Spring AMQP 的消费方确认行为,其值有以下几种:
- NONE:无消费方确认,即表示消费方自动确认,等同于 RabbitMQ 的
autoAck=true - MANNUAL:消费方手动确认,用户需要自行调用
ack/nack操作 - AUTO:默认值,Spring 根据 Listener 中是否抛出异常来决定是 ack 还是 nack,需要注意的是,这里的
AUTO与 RabbitMQ 的autoAck是不同的东西。
除了AcknowledgeMode外,消费方确认的行为还受defaultRequeueRejected参数的影响,当defaultRequeueRejected=true(默认值)时,表示如果消费方拒绝了消息,那么消息将被重新放入 Queue 中以便下次重新消费,当为false时,被拒绝的消息将不会被重新放入 Queue 中,而是要么直接抛弃,要么放入 DLX。
另外,通过抛出指定异常可以绕开defaultRequeueRejected的影响:
AmqpRejectAndDontRequeueException:表示直接将消息抛弃或者 DLX,而不是 Requeue,无论defaultRequeueRejected的值是什么。ImmediateRequeueAmqpException:与AmqpRejectAndDontRequeueException正好相反,表示立即将消息进行 Requeue 处理,无论defaultRequeueRejected的值是什么。
当 Listener 中抛出异常时,Spring 默认采用ConditionalRejectingErrorHandler来判断哪些异常(通常是不能恢复的异常)可以对消息进行 Requeue 操作,哪些不能。
在消费消息时,还可以配置重试机制,Spring 直接支持消费方的重试(spring-retry),只是在默认情况下是禁用的,可以通过以下方式启用消费方重试:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
需要注意的是,这里的重试并没有 RabbitMQ 的参与,只是 Spring 内部的重试。也就是说,重试时并不是讲消息做 Requeue 操作,而是拿到同一条消息在 Spring 内部做重复处理。默认情况下,重试最大此时为 3,当达到最大重试次数时,Spring 默认将使用RejectAndDontRequeueRecoverer对消息进行恢复处理,该类将不会对消息做 Requeue 操作,而是直接抛弃/DLX 掉。
在没有启用重试机制的情况下,并且defaultRequeueRejected=true,此时如果业务处理过程中一直抛出异常,那么将会导致消息被不断 Requeue 的情况,为了避免这种情况,要么启用重试机制;要么将defaultRequeueRejected设置为false;或者同时启动二者。笔者推荐同时启用二者。
综上,在使用 RabbitMQ 时,我们希望采用以下实践:
- 默认开启生产方确认
- 默认设置
defaultRequeueRejected=false - 默认设置消费方重试
为此,除了直接在 application.yml 中配置之外,还可以将这些默认配置固化到代码中,创建一个ConnectionFactory并且默认开启生产方确认(setPublisherConfirms(true)):
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses(rabbitProperties.getAddresses());
factory.setUsername(rabbitProperties.getUsername());
factory.setPassword(rabbitProperties.getPassword());
factory.setPort(rabbitProperties.getPort());
factory.setVirtualHost(rabbitProperties.getVirtualHost());
factory.setPublisherConfirms(true);
return factory;
}
默认配置消费方的defaultRequeueRejected=false以及重试:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
MessageConverter messageConverter,
TaskExecutor taskExecutor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDefaultRequeueRejected(false);
factory.setPrefetchCount(rabbitProperties.getListener().getSimple().getPrefetch());
factory.setConcurrentConsumers(rabbitProperties.getListener().getSimple().getConcurrency());
factory.setMaxConcurrentConsumers(rabbitProperties.getListener().getSimple().getMaxConcurrency());
factory.setMessageConverter(messageConverter);
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
RetryOperationsInterceptor build = RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffPolicy(fixedBackOffPolicy)
.recoverer(new RejectAndDontRequeueRecoverer())
.build();
factory.setAdviceChain(build);
factory.setTaskExecutor(taskExecutor);
return factory;
}