Asynchronous Outbound Gateway

The gateway discussed in the previous section is synchronous, in that the sending thread is suspended until a reply is received (or a timeout occurs). Spring Integration version 4.3 added an asynchronous gateway, which uses the AsyncRabbitTemplate from Spring AMQP. When a message is sent, the thread returns immediately after the send operation completes, and, when the message is received, the reply is sent on the template’s listener container thread. This can be useful when the gateway is invoked on a poller thread. The thread is released and is available for other tasks in the framework.

The following listing shows the possible configuration options for an AMQP asynchronous outbound gateway:

  • Java DSL

  • Java

  • XML

@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)
                           request-channel="myRequestChannel" (2)
                           async-template=""                  (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           lazy-connect="true" />             (18)
1 The unique ID for this adapter. Optional.
2 Message channel to which messages should be sent in order to have them converted and published to an AMQP exchange. Required.
3 Bean reference to the configured AsyncRabbitTemplate. Optional (it defaults to asyncRabbitTemplate).
4 The name of the AMQP exchange to which messages should be sent. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name-expression'. Optional.
5 A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name'. Optional.
6 The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover. Optional (it defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).
7 Message channel to which replies should be sent after being received from an AMQP queue and converted. Optional.
8 The time the gateway waits when sending the reply message to the reply-channel. This only applies if the reply-channel can block — such as a QueueChannel with a capacity limit that is currently full. The default is infinity.
9 When no reply message is received within the AsyncRabbitTemplate’s `receiveTimeout property and this setting is true, the gateway sends an error message to the inbound message’s errorChannel header. When no reply message is received within the AsyncRabbitTemplate’s `receiveTimeout property and this setting is false, the gateway sends an error message to the default errorChannel (if available). It defaults to true.
10 The routing-key to use when sending Messages. By default, this is an empty String. Mutually exclusive with 'routing-key-expression'. Optional.
11 A SpEL expression that is evaluated to determine the routing-key to use when sending messages, with the message as the root object (for example, 'payload.key'). By default, this is an empty String. Mutually exclusive with 'routing-key'. Optional.
12 The default delivery mode for messages: PERSISTENT or NON_PERSISTENT. Overridden if the header-mapper sets the delivery mode. If the Spring Integration message header (amqp_deliveryMode) is present, the DefaultHeaderMapper sets the value. If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP MessagePropertiesConverter used by the RabbitTemplate. If that is not customized, the default is PERSISTENT. Optional.
13 An expression that defines correlation data. When provided, this configures the underlying AMQP template to receive publisher confirmations. Requires a dedicated RabbitTemplate and a CachingConnectionFactory with its publisherConfirms property set to true. When a publisher confirmation is received and correlation data is supplied, the confirmation is written to either the confirm-ack-channel or the confirm-nack-channel, depending on the confirmation type. The payload of the confirmation is the correlation data as defined by this expression, and the message has its 'amqp_publishConfirm' header set to true (ack) or false (nack). For nack instances, an additional header (amqp_publishConfirmNackCause) is provided. Examples: headers['myCorrelationData'], payload. If the expression resolves to a Message<?> instance (such as “#this”), the message emitted on the ack/nack channel is based on that message, with the additional headers added. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
14 The channel to which positive (ack) publisher confirmations are sent. The payload is the correlation data defined by the confirm-correlation-expression. Requires the underlying AsyncRabbitTemplate to have its enableConfirms property set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
15 Since version 4.2. The channel to which negative (nack) publisher confirmations are sent. The payload is the correlation data defined by the confirm-correlation-expression. Requires the underlying AsyncRabbitTemplate to have its enableConfirms property set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
16 When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds. Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value. Also see Alternative Mechanism for Publisher Confirms and Returns. Default none (nacks will not be generated).
17 The channel to which returned messages are sent. When provided, the underlying AMQP template is configured to return undeliverable messages to the gateway. The message is constructed from the data received from AMQP, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, and amqp_returnRoutingKey. Requires the underlying AsyncRabbitTemplate to have its mandatory property set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
18 When set to false, the endpoint tries to connect to the broker during application context initialization. Doing so allows “fail fast” detection of bad configuration, by logging an error message if the broker is down. When true (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.

See also Asynchronous Service Activator for more information.

RabbitTemplate

When you use confirmations and returns, we recommend that the RabbitTemplate wired into the AsyncRabbitTemplate be dedicated. Otherwise, unexpected side effects may be encountered.