# Spring Boot 集成 RabbitMQ

本文主要介绍如何在 Spring Boot 项目中使用 RabbitMQ,源代码请参考:

https://github.com/davenkin/spring-amqp-learning (opens new window)

# 1. docker方式启动RabbitMQ容器

创建一个 docker-compose.yml 文件:

version: "2"
services:
  rabbitmq:
    restart: always
    container_name: spring-amqp-learning-rabbitmq
    image: rabbitmq:3-management
    environment:
      - "RABBITMQ_DEFAULT_USER=user"
      - "RABBITMQ_DEFAULT_PASS=password"
    ports:
      - "15673:15672"
      - "5673:5672"

此时,切换到在与docker-compose.yml文件相同目录,命令行运行docker-compose up后,通过http://localhost:15673/ (opens new window)可以访问 RabbitMQ 的管理页面:

img

# 2. 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.fengye</groupId>
    <artifactId>spring-boot-rabbitmq</artifactId>
    <version>1.0.0</version>
    <name>spring-boot-rabbitmq</name>
    <description>Spring Boot 集成 RabbitMQ</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

# 3. application.yml文件中配置RabbitMQ

spring:
  rabbitmq:
    host: localhost
    port: 5673
    virtual-host: /
    username: rabbitmq-user
    password: rabbitmq-password

# 4. 配置 Queue

创建RabbitHelloWorldConfiguration.java

@Configuration
public class RabbitHelloWorldConfiguration {
    @Bean
    public Queue helloWorldQueue() {
        return new Queue("HelloWorldQueue", false, false, false);
    }
}

# 5. 发送消息

创建RabbitHelloWorldController.java

@RestController
@RequestMapping(value = "/rabbitmq")
public class RabbitHelloWorldController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/hello")
    public void helloWorld() {
        rabbitTemplate.convertAndSend("HelloWorldQueue", "HelloWorld!" + LocalDateTime.now().toString());
    }
}

# 6. 接收消息

创建RabbitHelloWorldListener.java

@Component
@RabbitListener(queues = "HelloWorldQueue")
@Slf4j
public class RabbitHelloWorldListener {

    @RabbitHandler
    public void receiveHelloWorld(String queueMessage) {
        log.info("Received message:{}", queueMessage);
    }
}

# 7. 验证

发送消息curl http://localhost:8080/rabbitmq/hello,此时可以在命令行中看到日志输出:

2019-07-25 09:17:59.871 -- INFO  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.e.o.r.h.RabbitHelloWorldListener : Received message:HelloWorld!2019-07-25T09:17:59.869

表明消息已经成功发送并接收。

在对上文的“Hello World”做出解释之前,先介绍一下 Spring AMQP 中的主要对象类以及 Spring Boot 对 RabbitMQ 的默认配置。

# Spring AMQP 的主要对象

Spring AMQP 主要对象类如下:

作用
Queue 对应 RabbitMQ 中 Queue
AmqpTemplate 接口,用于向 RabbitMQ 发送和接收 Message
RabbitTemplate AmqpTemplate 的实现类
@RabbitListener 指定消息接收方,可以配置在类和方法上
@RabbitHandler 指定消息接收方,只能配置在方法上,可以与@RabbitListener 一起使用
Message 对 RabbitMQ 消息的封装
Exchange 对 RabbitMQ 的 Exchange 的封装,子类有 TopicExchange、FanoutExchange 和 DirectExchange 等
Binding 将一个 Queue 绑定到某个 Exchange,本身只是一个声明,并不做实际绑定操作
AmqpAdmin 接口,用于 Exchange 和 Queue 的管理,比如创建/删除/绑定等,自动检查 Binding 类并完成绑定操作
RabbitAdmin AmqpAdmin 的实现类
ConnectionFactory 创建 Connection 的工厂类,RabbitMQ 也有一个名为 ConnectionFactory 的类但二者没有继承关系,Spring ConnectionFactory 可以认为是对 RabbitMQ ConnectionFactory 的封装
CachingConnectionFactory Spring ConnectionFactory 的实现类,可以用于缓存 Channel 和 Connection
Connection Spring 中用于创建 Channel 的连接类,RabbitMQ 也有一个名为 Connection 的类,但二者没有继承关系,Spring Connection 是对 RabbitMQ Connection 的封装
SimpleConnection Spring Connection 的实现类,将实际工作代理给 RabbitMQ 的 Connection 类
MessageListenerContainer 接口,消费端负责与 RabbitMQ 服务器保持连接并将 Message 传递给实际的@RabbitListener/@RabbitHandler 处理
RabbitListenerContainerFactory 接口,用于创建 MessageListenerContainer
SimpleMessageListenerContainer MessageListenerContainer 的实现类
SimpleRabbitListenerContainerFactory RabbitListenerContainerFactory 的实现类
RabbitProperties 用于配置 Spring AMQP 的 Property 类

对于发送方而言,需要做以下配置:

  • 配置 CachingConnectionFactory
  • 配置 Exchange/Queue/Binding
  • 配置 RabbitAdmin 创建上一步的 Exchange/Queue/Binding
  • 配置 RabbitTemplate 用于发送消息,RabbitTemplate 通过 CachingConnectionFactory 获取到 Connection,然后想指定 Exchange 发送

对于消费方而言,需要做以下配置:

  • 配置 CachingConnectionFactory
  • 配置 Exchange/Queue/Binding
  • 配置 RabbitAdmin 创建上一步的 Exchange/Queue/Binding
  • 配置 RabbitListenerContainerFactory
  • 配置@RabbitListener/@RabbitHandler 用于接收消息

可以看到,在 Spring 中使用 RabbitMQ 的配置还不少,但是 Spring Boot 已经帮我们做了很多默认配置,才使得我们刚才的“Hello World”可以如此简单的完成。

# Spring Boot 的默认 RabbitMQ 设置

在默认情况下,Spring Boot 为我们配置了一下 RabbitMQ 的对象:

  • 一个 CachingConnectionFactory
  • 一个名为rabbitListenerContainerFactorySimpleRabbitListenerContainerFactory,默认自动查找 IoC 容器中的 MessageConverter
  • 一个 RabbitTemplate
  • 一个 AmqpAdmin

因此,通常情况下,我们只需要配置自己的 Exchange/Queue/Binding 即可。

# 对 Hello World 的解释

有了以上知识,再让我们来看看刚才的“Hello World”背后发生了什么:

  • 首先,我们配置了一个名为“HelloWorldQueue”的 Queue,但是并没有配置 Exchange,也没有配置“HelloWorldQueue”的 Binding,这其实使用了 RabbitMQ 的默认行为,即所有 Queue 都以其自身的名称为 routingKey 绑定到了一个默认的 Exchange 上,该默认 Exchange 的名称为“”
  • 由于 Spring Boot 自动配置了 AmqpAdmin,该 AmqpAdmin 将自动向 RabbitMQ 创建名为“HelloWorldQueue”的 Queue。
  • 在发送消息的时候,我们直接使用了 RabbitTemplate,这个 RabbitTemplate 是 Spring Boot 自动为我们配置好的,RabbitTemplate 所依赖的 CachingConnectionFactory 也由 Spring Boot 自动配置。
  • 发送消息时指定了 routingKey 为“HelloWorldQueue”,但是没有指定 Exchange,此时消息将会发送到 RabbitMQ 默认的 Exchange,又由于名为“HelloWorldQueue”的 Queue 向默认 Exchange 绑定的 routingKey 正是“HelloWorldQueue”,因此消息将由默认 Exchange 转发到名为“HelloWorldQueue” 的 Queue。至此发送方任务结束。
  • 在消息接收方,由于 Spring Boot 默认为我们配置了 SimpleRabbitListenerContainerFactory,因此只需要配置@RabbitListener@RabbitHandler 接收消息即可。

另外,Spring Boot 采用了很多默认配置,通过统一的RabbitProperties同时完成消费方和生产方的配置,在默认情况下主要的配置如下:

配置项 默认值 作用
host localhost RabbitMQ 服务器地址
port 5672 RabbitMQ 服务器端口
username 账户名 guest
password 密码 guest
virtualHost RabbitMQ 虚拟主机名 /
publisherConfirms false 设置是否启用生产方确认
publisherReturns false 设置是否启用生产方消息返回
ssl 对象 配置 SSL,默认停用
template 对象 设置 RabbitTemplate
template.retry 默认停用 设置 RabbitTemplate 发送消息时的重试,主要用于 RabbitTemplate 与 RabbitMQ 之间的网络连接
template.mandatory false 设置发送消息失败时(无接收 queue)是否 return 消息,与 return callback 一并使用
template.exchange "" 默认发送的 exchange
template.routingKey "" 默认发送消息时的 routing key
template.defaultReceiveQueue null 默认接收消息的 queue
listener.simple 对象 设置 SimpleRabbitListenerContainerFactory
listener.direct 对象 设置 DirectRabbitListenerContainerFactory
listener.simple.concurrency null 并发消费方数量
listener.simple.acknowledgeMode AUTO 设置消费方确认模式,这里的 AUTO 与 RabbitMQ 的自动确认不是一回事
listener.simple.prefetch 250 设置消费方一次性接收消息的条数
listener.simple.defaultRequeueRejected true 当 Listener 发生异常时是否 requeue
listener.simple.retry 对象 设置 Listener 的重试机制,默认停用,当启用时,Listener 对于消息处理过程中的异常将进行 requeue 重试,超过重试次数再抛弃,此时 AmqpRejectAndDontRequeueException 异常也会被重试

所有默认配置如下:

spring:
  rabbitmq:
    host: localhost
    port: 5673
    virtual-host: /
    username: rabbitmq-user
    password: rabbitmq-password
    ssl:
      enabled: false
      keyStore:
      keyStoreType: PKCS12
      keyStorePassword:
      trustStore:
      trustStoreType: JKS
      trustStorePassword:
      algorithm:
      validateServerCertificate: true
      verifyHostname: true
    addresses:
    requestedHeartbeat:
    publisherConfirms: false
    publisherReturns: false
    connectionTimeout:
    cache:
      channel:
        size:
        checkoutTimeout:
      connection:
        mode: CHANNEL
        size:
    listener:
      type: SIMPLE
      simple:
        autoStartup: true
        acknowledgeMode:
        prefetch:
        defaultRequeueRejected:
        idleEventInterval:
        retry:
          enabled: false
          maxAttempts: 3
          initialInterval: PT1S
          multiplier: 1
          maxInterval: PT10S
          stateless: true
        concurrency:
        maxConcurrency:
        transactionSize:
        missingQueuesFatal: true
      direct:
        autoStartup: true
        acknowledgeMode:
        prefetch:
        defaultRequeueRejected:
        idleEventInterval:
        retry:
          enabled: false
          maxAttempts: 3
          initialInterval: PT1S
          multiplier: 1
          maxInterval: PT10S
          stateless: true
        consumersPerQueue:
        missingQueuesFatal: false
    template:
      retry:
        enabled: false
        maxAttempts: 3
        initialInterval: PT1S
        multiplier: 1
        maxInterval: PT10S
      mandatory:
      receiveTimeout:
      replyTimeout:
      exchange: ''
      routingKey: ''
      defaultReceiveQueue:
    parsedAddresses:

# Exchange 和 Queue 的定义

在使用 RabbitMQ 时,Exchange 和 Queue 的定义是需要精心设计的rabbitmq-best-practices-for-designing-exchanges-queues-and-bindings (opens new window),在笔者的RabbitMQ 最佳实践 (opens new window)文章中,我建议每个微服务对应一个 Exchange 等方式。

另外,除了定义正常情况下的 Exchange 和 Queue 之外,我们还需要考虑到异常情况,因此我们还需要配置死信交换(Dead Letter Exchange, DLX)和死信队列(Dead Letter Queue, DLQ)用于对异常情况的处理。

接下来,我们围绕以下业务场景来定义相应的 Exchange 和 Queue:

在电商系统中,当订单(Order)创建之后,需要给用于发送短信通知。

我们将采用 RabbitMQ 来完成这个过程,即下单之后发送消息,短信服务监听该消息然后完成短信发送。

在消息发送方,首先需要定义一个 Exchange 作为接收消息的第一站,在OrderRabbitMqConfig.java中配置:

//"发送方Exchange"
@Bean
public TopicExchange orderPublishExchange() {
  return new TopicExchange(ORDER_PUBLISH_EXCHANGE, true, false, ImmutableMap.of("alternate-exchange", ORDER_PUBLISH_DLX));
}

对于发送方而言,我们只需要关系消息能够成功发送到 Exchange 即可,而不用关心该消息实际是否到达了某个 Queue 以及哪个 Queue。因此,发送方并不具备定义 Queue 的职责,定义一个 Exchange 即可。

然而,有时发送到 Exchange 的消息没有任何 Queue 可以接收,要么因为根本就没有绑定 Queue,要么消息的 RoutingKey 设置导致无法路由到任何 Queue。严格意义上讲,这些问题也不是发送发需要关心的事情,然而在很多时候,消息的发送方和接收方事实上都是关系密切的系统,为了更好的支持接收方,笔者建议在发送方将那些无法路由到任何 Queue 的消息先保存下来,以便后续处理。在 RabbitMQ 中,可以通过定义 Exchange 时的alternate-exchange参数指定当消息无法路由时应该发到的 Exchange。

在上面定义的ORDER_PUBLISH_EXCHANGE中,我们其实已经指定了该 Exchange 的alternate-exchangeORDER_PUBLISH_DLXORDER_PUBLISH_DLX的定义如下:

    //"发送方DLX",消息发送失败时传到该DLX
    @Bean
    public TopicExchange orderPublishDlx() {
        return new TopicExchange(ORDER_PUBLISH_DLX, true, false, null);
    }

另外,定义一个 Queue(ORDER_PUBLISH_DLQ)用于存储ORDER_PUBLISH_DLX中的消息:

    //"发送方DLQ",所有发到"发送DLX"的消息都将路由到该DLQ
    @Bean
    public Queue orderPublishDlq() {
        return new Queue(ORDER_PUBLISH_DLQ, true, false, false, ImmutableMap.of("x-queue-mode", "lazy"));
    }

    //"发送方DLQ"绑定到"发送方DLX"
    @Bean
    public Binding orderPublishDlqBinding() {
        return BindingBuilder.bind(orderPublishDlq()).to(orderPublishDlx()).with("#");
    }

此时,当发送方的消息到达了ORDER_PUBLISH_EXCHANGE而无法路由到任何 Queue 时,该消息会被 RabbitMQ 发送到ORDER_PUBLISH_DLX,进而路由到ORDER_PUBLISH_DLQ

至此,发送发的 RabbitMQ 配置完成。

在消息接收方,首先定义用于接收消息的 Queue(ORDER_RECEIVE_QUEUE):

//接收方的所有消息都发送到该"接收方Queue",即"接收方queue"可以绑定多个"发送方Exchange"
@Bean
public Queue orderReceiveQueue() {
  ImmutableMap<String, Object> args = ImmutableMap.of(
    "x-dead-letter-exchange",
    ORDER_RECEIVE_DLX,
    "x-overflow",
    "drop-head",
    "x-max-length",
    300000,
    "x-message-ttl",
    24 * 60 * 60 * 1000);
  return new Queue(ORDER_RECEIVE_QUEUE, true, false, false, args);
}

//"接收方queue"绑定到"发送方exchange"
@Bean
public Binding orderReceiveBinding() {
  return BindingBuilder.bind(orderReceiveQueue()).to(orderPublishExchange()).with("order.#");
}

在消息处理的过程中,可能发生异常,此时需要将异常消息记录下来,同样采用 DLX 和 DLQ 机制。在定义ORDER_RECEIVE_QUEUE时,事实上我们已经指定了 DLX 为ORDER_RECEIVE_DLX,定义该 DLX 和对应的 DLQ(ORDER_RECEIVE_DLQ)为:

//"接收方DLX",消息处理失败时传到该DLX
@Bean
public TopicExchange orderReceiveDlx() {
  return new TopicExchange(ORDER_RECEIVE_DLX, true, false, null);
}

//"接收方DLQ",所有发到"接收DLX"的消息都将路由到该DLQ
@Bean
public Queue orderReceiveDlq() {
  return new Queue(ORDER_RECEIVE_DLQ, true, false, false, ImmutableMap.of("x-queue-mode", "lazy"));
}

//"接收方DLQ"绑定到"接收方DLX"
@Bean
public Binding orderReceiveDlqBinding() {
  return BindingBuilder.bind(orderReceiveDlq()).to(orderReceiveDlx()).with("#");
}

此外,对于发送到ORDER_RECEIVE_DLQ的死信消息,我们需要对其进行手动处理,一种处理方式便是在后台定期运行一个 Job 将死信消息重新处理。为此,我们需要定义一个用于手动回复的 Exchange(ORDER_RECEIVE_RECOVER_EXCHANGE),并将接收方的所有的消费 Queue 绑定到该 Exchange:

//"接收方恢复Exchange",用于手动将"接收方DLQ"中的消息发到该DLX进行重试
@Bean
public TopicExchange orderReceiveRecoverExchange() {
  return new TopicExchange(ORDER_RECEIVE_RECOVER_EXCHANGE, true, false, null);
}

//"接收方Queue"绑定到"接收方恢复Exchange"
@Bean
public Binding orderReceiveRecoverBinding() {
  return BindingBuilder.bind(orderReceiveQueue()).to(orderReceiveRecoverExchange()).with("#");
}

以上关于 Exchange/Queue 和 DLX/DLQ 的配置可以成为一种模式,使得我们一方面能够尽量少地配置 RabbitMQ 的基础设施,另一方面又能对异常情况进行处理。在该模式下存在以下概念:

概念 类型 解释 命名 示例
发送方 Exchange Exchange 用于接收一个系统(比如微服务)中所有消息的 Exchange,一个系统只有一个发送方Exchange xxx-publish-x order-publish-x
发送方 DLX Exchange 用于接收发送方无法路由的消息 xxx-publish-dlx order-publish-dlx
发送方 DLQ Queue 用于存放发送方DLX的消息 xxx-publish-dlq order-publish-dlq
接收方 Queue Queue 用于接收发送方Exchange的消息,一个系统只有一个接收方Queue xxx-receive-q product-receive-q
接收方 DLX Exchange 用于接收消费失败的消息 xxx-receive-dlx product-receive-dlx
接收方 DLQ Queue 用于存放接收方DLX的消息 xxx-receive-dlq product-receive-dlq
接收方恢复 Exchange Exchange 用于接收从接收方DLQ中手动恢复的消息,接收方Queue应该绑定到接收方恢复Exchange xxx-receive-recover-x product-receive-recover-x

该模式的 RabbitMQ 配置架构图如下:

img

一种 RabbitMQ 配置架构

# 消息序列化

RabbitMQ 中所有消息都是二进制的,在 Spring 中我们需要在 Java 对象与 RabbitMQ 消息体之间进行相互转换。在默认情况下,Spring Boot 使用了SimpleMessageConverter用于转换 String,Java 的 Serializable 对象以及字节数组。通常情况下,我们希望使用 Json 格式,此时可以采用以下配置:

@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
  Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(objectMapper);
  messageConverter.setClassMapper(classMapper());
  return messageConverter;
}

@Bean
public DefaultClassMapper classMapper() {
  DefaultClassMapper classMapper = new DefaultClassMapper();
  classMapper.setTrustedPackages("*");
  return classMapper;
}

Spring 的RabbitTemplateRabbitListenerContainerFactory会自动找到以上配置的MessageConverter并使用之。

此时,在RabbitTemplate发送消息时,它会在消息的头部加入 Java 对象的类型信息,然后在RabbitListener接收到消息时,通过该头部的类型信息指导应该发序列化为哪个类的对象。

在消费消息时,配置@RabbitListener

@Component
@RabbitListener(queues = {ORDER_RECEIVE_QUEUE})
public class OrderEventNotificationListener {
    private Logger logger = AutoNamingLoggerFactory.getLogger();

    @RabbitHandler
    public void onOrderCreated(OrderCreatedEvent event) {
        // send notification
        //sendSmsNotification(event);

        logger.info("Notification sent for OrderCreatedEvent:{}", event.getOrderId());
        throw new RuntimeException("");
    }

}

其中,@RabbitListener可以用于类和方法,而@RabbitHandler只能用于方法,通常的用法是:@RabbitListener用在类上,接收不同 Queue 的不同类型消息,然后在方法上使用@RabbitHandler来处理特定类型的消息。

需要指出的是,不同的处理方法之间必须没有歧义,即我们需要保证对于一种类型的消息,只有一个方法可以接收。比如:对于 OrderCreatedEvent,如果它还有个父类叫 OrderEvent,那么如果再添加一个方法用于处理 OrderEvent,那么将会报错:

@Component
@RabbitListener(queues = ${ORDER_RECEIVE_QUEUE})
public class OrderEventNotificationListener {
    private Logger logger = AutoNamingLoggerFactory.getLogger();

    @RabbitHandler
    public void onOrderCreated(OrderCreatedEvent event) {
        // send notification
        //sendSmsNotification(event);

        logger.info("Notification sent for OrderCreatedEvent:{}", event.getOrderId());
    }

    @RabbitHandler
    public void onOrderEvent(OrderEvent event) {
        logger.info("Received order event");
    }

}

此时报错为:

Caused by: org.springframework.amqp.AmqpException: Ambiguous methods for payload type: class com.ecommerce.order.order.model.OrderCreatedEvent: onOrderCreated and onOrderEvent
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.findHandlerForPayload(DelegatingInvocableHandler.java:206)
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:129)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:60)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
    ... 26 common frames omitted

# 发送方确认

要使用发送方确认需要:

  1. 设置publisherConfirms = true
  2. 在发送消息时,加入CorrelationData
rabbitTemplate.convertAndSend(ORDER_PUBLISH_EXCHANGE,
        "order.created",
        event,
        new CorrelationData(event.getOrderId()));
  1. 在 RabbitTemplate 中注册确认 CallbacksetConfirmCallback(),并通过CorrelationData确定是哪条消息被确认了:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    String eventId = correlationData.getId();
    if (ack) {
        logger.info("Publish confirmed event[{}].", eventId);
    } else {
        logger.warn("Domain event[{}] is nacked while publish:{}.", eventId, cause);
    }

});

在正常情况下,运行OrderApiTest.should_create_order(),输出的日志为:

2019-07-27 14:09:13.517 -- INFO  [AMQP Connection 127.0.0.1:5673] c.e.o.order.OrderApplicationService : Publish confirmed event[89cacd65a550493c95957729c4f24066].

表明消息被 RabbitMQ 确认已经收到。在 RabbitMQ 无法确保消息成功投递时,将作 nack 确认,比如发送方Exchange不存在,此时输出的日志为:

2019-07-27 14:14:12.731 -- WARN  [rabbitConnectionFactory1] c.e.o.order.OrderApplicationService : Domain event[d27e1afdd4ea449d827af272209e9125] is nacked while publish:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dfd' in vhost '/', class-id=60, method-id=40).

要使用发送方返回需要:

  1. 设置publisherReturns = true
  2. 设置template.mandatory = true
  3. 在 RabbitTemplate 中注册返回 CallbacksetReturnCallback()

通常来讲,我们没有必要对返回消息进行处理,因此这里不展开讲解。

# 消费方确认

Spring AMQP 在默认情况下是启用了消费方确认的,它会根据消费方是否抛出异常来决定是 ack 还是 nack。

AcknowledgeMode用于配置 Spring AMQP 的消费方确认行为,其值有以下几种:

  • NONE:无消费方确认,即表示消费方自动确认,等同于 RabbitMQ 的autoAck=true
  • MANNUAL:消费方手动确认,用户需要自行调用ack/nack操作
  • AUTO:默认值,Spring 根据 Listener 中是否抛出异常来决定是 ack 还是 nack,需要注意的是,这里的AUTO与 RabbitMQ 的autoAck是不同的东西。

除了AcknowledgeMode外,消费方确认的行为还受defaultRequeueRejected参数的影响,当defaultRequeueRejected=true(默认值)时,表示如果消费方拒绝了消息,那么消息将被重新放入 Queue 中以便下次重新消费,当为false时,被拒绝的消息将不会被重新放入 Queue 中,而是要么直接抛弃,要么放入 DLX。

另外,通过抛出指定异常可以绕开defaultRequeueRejected的影响:

  • AmqpRejectAndDontRequeueException:表示直接将消息抛弃或者 DLX,而不是 Requeue,无论defaultRequeueRejected的值是什么。
  • ImmediateRequeueAmqpException:与AmqpRejectAndDontRequeueException正好相反,表示立即将消息进行 Requeue 处理,无论defaultRequeueRejected的值是什么。

当 Listener 中抛出异常时,Spring 默认采用ConditionalRejectingErrorHandler来判断哪些异常(通常是不能恢复的异常)可以对消息进行 Requeue 操作,哪些不能。

在消费消息时,还可以配置重试机制,Spring 直接支持消费方的重试(spring-retry),只是在默认情况下是禁用的,可以通过以下方式启用消费方重试:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true

需要注意的是,这里的重试并没有 RabbitMQ 的参与,只是 Spring 内部的重试。也就是说,重试时并不是讲消息做 Requeue 操作,而是拿到同一条消息在 Spring 内部做重复处理。默认情况下,重试最大此时为 3,当达到最大重试次数时,Spring 默认将使用RejectAndDontRequeueRecoverer对消息进行恢复处理,该类将不会对消息做 Requeue 操作,而是直接抛弃/DLX 掉。

在没有启用重试机制的情况下,并且defaultRequeueRejected=true,此时如果业务处理过程中一直抛出异常,那么将会导致消息被不断 Requeue 的情况,为了避免这种情况,要么启用重试机制;要么将defaultRequeueRejected设置为false;或者同时启动二者。笔者推荐同时启用二者。

综上,在使用 RabbitMQ 时,我们希望采用以下实践:

  • 默认开启生产方确认
  • 默认设置defaultRequeueRejected=false
  • 默认设置消费方重试

为此,除了直接在 application.yml 中配置之外,还可以将这些默认配置固化到代码中,创建一个ConnectionFactory并且默认开启生产方确认(setPublisherConfirms(true)):

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses(rabbitProperties.getAddresses());
factory.setUsername(rabbitProperties.getUsername());
factory.setPassword(rabbitProperties.getPassword());
factory.setPort(rabbitProperties.getPort());
factory.setVirtualHost(rabbitProperties.getVirtualHost());
factory.setPublisherConfirms(true);
return factory;
}

默认配置消费方的defaultRequeueRejected=false以及重试:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
      ConnectionFactory connectionFactory,
      MessageConverter messageConverter,
      TaskExecutor taskExecutor) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDefaultRequeueRejected(false);
        factory.setPrefetchCount(rabbitProperties.getListener().getSimple().getPrefetch());
        factory.setConcurrentConsumers(rabbitProperties.getListener().getSimple().getConcurrency());
        factory.setMaxConcurrentConsumers(rabbitProperties.getListener().getSimple().getMaxConcurrency());
        factory.setMessageConverter(messageConverter);

        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000L);
        RetryOperationsInterceptor build = RetryInterceptorBuilder.stateless()
                .maxAttempts(5)
                .backOffPolicy(fixedBackOffPolicy)
                .recoverer(new RejectAndDontRequeueRecoverer())
                .build();
        factory.setAdviceChain(build);

        factory.setTaskExecutor(taskExecutor);
        return factory;
    }