i@yujinyan.me

Blog

Notes on RabbitMQ with Spring Boot

Introduction

Suppose our application organizes asynchronous domain logic inside DomainEventListener’s like the following code snippet.

interface DomainEventListener {
  val topic: String
  fun handle(event: DomainEvent)
}

@Component
class UserRegistered: DomainEventListener {
  override val topic = "user:registered"

  // How deserialization is performed is left out.

  override fun handle(event: DomainEvent) {
    // perform business logic
  }
}

This post shows how to integrate these listeners with Spring AMQP by taking advantage of Spring’s infrastructure. We also briefly discuss several important topics on using messaging middleware.

Up and running

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

The web management UI is available at http://localhost:15672. The default username and password are both guest.

Messaging with RabbitMQ is a very basic Hello World example for Spring Boot integration.

For a more production-ready setting, it’s mandatory to refer to the Spring AMQP reference documentation.

AMQP

AMQP (Advanced Message Queuing Protocol) is an application layer protocol for messaging middleware. RabbitMQ primarily supports the AMQP 0-9-1 model. It mainly revolves around the following three AMQP entities:

  • exchange
  • queue
  • binding

Exchange, queue, and binding

Messages are published to an exchange and consumed from a queue. A queue can subscribe to exchanges with bindings. When a message arrives at an exchange, it will be routed to different queues based on:

  • the routing key in the message, and
  • the type of the addressed exchange.

Routing

The four basic types of exchange are:

  • Direct exchange
  • Fanout exchange
  • Topic exchange
  • Headers exchange

The routing mechanism further decouples the publisher and subscriber, making it easy to adapt to different workflows. It may look overwhelming at first sight, so we are sticking with the simplest Direct Exchange. In this case, the message will be routed to subscribing queues with the same routing key.

To learn more about RabbitMQ and AMQP, I would recommend RabbitMQ in Depth. Its clear explanation and illustration make it easier to grasp these concepts.

Declarative configuration with Spring AMQP

With Spring AMQP, we can configure the required AMQP entities in the message broker by registering them as beans in the application context. At runtime, Spring AMQP will issue requests to the broker and create them.

I like this declarative approach. It feels like Kubernetes without nasty YAML configurations.

@Configuration
class RabbitMqConfig {
  @Bean
  fun domainEventExchange() =
    DirectExchange("domain_events")

  @Bean
  fun queues(listeners: List<DomainEventListener>) =
    Declarables( 
      listeners.map {
        QueueBuilder
          .durable(it::class.simpleName) 
          .build()
      }
    )

  @Bean
  fun bindings(listeners: List<DomainEventListener>) =
    Declarables(
      queues.zip(listeners) { queue, listener ->
        BindingBuilder
          .bind(queue)
          .to(domainEventExchange())
          .with(listener.topic)
      }
    )
}

In this code snippet, we first declare an application-wide domainEventExchange. Then, we loop over the DomainEventListener’s in the application context and create the necessary queues and bindings.

Note that:

  1. We can declare multiple AMQP entities in a Declarables. They don’t need to be the same type.
  2. durable means the created entity will survive broker restart.

Listener container

So far, we have declared desired entities in the broker, but haven’t yet wired up our DomainEventListener’s to consume messages from it and perform business logic. To do so, we need to wrap our event handlers inside a MessageListenerContainer.

A MessageListenerContainer represents an active or hot component. It handles the connection to the message broker. When the connection is broken, SimpleMessageListenerConainer will try to restart the listener. As a lifecycle component, it provides methods for starting and stopping.

To programmatically register our domain listeners, we can implement RabbitListenerConfigurer and use the RabbitListenerEndpointRegistrar like this:

@Configuration
@EnableRabbit
class ContainerConfig(
  val listeners: List<DomainEventListener>,
  val containerFactory: SimpleRabbitListenerContainerFactory
): RabbitListenerConfigurer {

  override fun configureRabbitListeners(
    registrar: RabbitListenerEndpointRegistrar
  ) {
    registrar.setContainerFactory(containerFactory) 
    listeners.forEach { listener ->
      val endpoint = SimpleRabbitListenerEndpoint().apply {
        id = listener::class.simpleName!!
        setQueueNames(listener::class.simpleName)
        setMessageListener { message ->
          listener.handle(message.toDomainEvent())
        }
      }
      registrar.registerEndpoint(endpoint) 
    }
  }
}

Spring AMQP will take care of creating the listener containers at runtime. Note that we passed an instance of RabbitListenerContainerFactory to the registrar. We will see that we can configure common properties of the containers through the container factory.

Check out more details at:

Messages are ephemeral

It’s important to realize that in the AMQP model, the message broker just acts as a postman between sender and receiver. It holds on to the messages only temporarily. After successful delivery to all consumers, the message is usually removed from the broker.

In contrast, Redis Stream and Kafka are more like databases that store the messages until explicitly deleted.

Some repercussions:

  • It’s easy to lose messages if something is misconfigured. For example, a message published to an exchange with no bound queues will be dropped. Conceptually, it seems only queues in AMQP have memory.
  • No first-class support for doing CRUD on messages. For example, the web UI does not have a list screen to page through the messages. It’s possible to retrieve a few ones in the queue detail page, but it has a warning that says “Getting messages from a queue is a destructive action.” Although not necessarily always “destructive”, the action will probably cause side effects on the message.
  • Spring AMQP shares this kind of mindset. For example, it is important to configure a MessageRecoverer in a RetryInterceptor. By default, Spring AMQP will drop the message after retrying for configured times and issue a warning. Retry is discussed in the following section.

Resilience

Spring AMQP Reference: Resilience: Recovering from Errors and Broker Failures

Acknowledgment

Message consumers can fail at any time (due to business exceptions, dropped connections, application crash, etc). To prevent losing messages in this way, message brokers use acknowledgments: a message is removed from the broker only after the client explicitly acknowledges that it has processed it.

💡

In Laravel, when a job is taken from the queue, it’s placed at myqueue:reserved key at the same time. These two steps form an atomic operation by using Lua scripting. This can also be seen as a form of acknowledgment.

Dead letter exchange

RabbitMQ Dead Letter Documentation

Suppose there is a mal-formed message. In our Spring application, if our message handlers throw an exception due to this message, by default, the message will be re-queued and delivered again, resulting in an infinite loop. A viable solution would be sending the bad message to other places for inspection after a few retries.

RabbitMQ can handle this situation with dead letter exchanges. When our consumer negatively acknowledges a message, the queue will “dead-letter” the message (annotate with some information about the failure) and route it to the configured dead letter exchange. This configuration can be applied globally in the broker with a policy or specified when creating a queue.

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

You can also apply a policy in the RabbitMQ web UI under the “admin” tag.

Note that the dead letter exchange is just a regular exchange. If you specify a non-existent exchange, RabbitMQ does not create it automatically. This is a type of misconfiguration that can cause lost messages.

AMQP entity declaration in Spring for configuring dead letter exchanges:

@Bean
fun deadLetterExchange() = DirectExchange("dlx")

@Bean
fun deadLetterQueues(
  listeners: List<DomainEventListener>
) = Declarables(
  val name = it::class.simpleName
  listeners.flatMap { 
    val deadLetterQueue = QueueBuilder.durable("$name.dlq").build()
    val deadLetterBinding = BindingBuilder
      .bind(deadLetterQueue)
      .to(deadLetterExchange())
      .with(it.topic)

    listOf(deadLetterQueue, deadLetterBinding) 
  }
)

For each listener, we create:

  • a corresponding dead letter queue, and
  • its binding to the application-wide deadLetterExchange().

By properly configuring listening queues and bindings, we ensure dead letters won’t get lost.

Then, update our queues() code to configure the queues’ dead letter exchange.

@Bean
fun deadLetterExchange() = DirectExchange("dlx")

@Bean
fun queues(listeners: List<DomainEventListener>) = Declarables(
  listeners.map {
    QueueBuilder.durable(it::class.simpleName)
      .withDeadLetterExchange(deadLetterExchange()) 
      .build()
  }
)

private fun QueueBuilder.withDeadLetterExchange(exchange: Exchange): QueueBuilder =
  withArgument("x-dead-letter-exchange", exchange.name) 

We also need to set up our listener containers to instruct RabbitMQ not to requeue rejected messages. If a dead letter exchange is configured on the queue, the message will be routed to it. Otherwise, the message will be dropped.

SimpleRabbitListenerContainerFactory().apply {
  setDefaultRequeueRejected(false)
  setConnectionFactory(connectionFactory)
}
💡

Laravel’s failed jobs table is the equivalent concept.

Retry

It may be helpful to retry a few times before routing a failed message to the dead letter exchange. Spring provides some retry helpers, and we can configure them in the container factory like this:

@Bean
fun retryInterceptor() = RetryInterceptorBuilder()
  .stateless()
  .maxAttemtps(2)
  .backOffOptions(1000, 2.0, 10_000)
  .recoverer(RejectAndDontRequeueRecoverer()) 
  .build()

SimpleRabbitListenerContainerFactory().apply {
  setDefaultRequeueRejected(false)
  setConnectionFactory(connectionFactory)
  setAdviceChain(retryInterceptor()) 
}

评论区 Discussions · 也可前往 GitHub 评论区 互动