Class RedisMessageListenerContainer

java.lang.Object
org.springframework.data.redis.listener.RedisMessageListenerContainer
All Implemented Interfaces:
Aware, BeanNameAware, DisposableBean, InitializingBean, Lifecycle, Phased, SmartLifecycle

public class RedisMessageListenerContainer extends Object implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle
Container providing asynchronous behaviour for Redis message listeners. Handles the low level details of listening, converting and message dispatching.

As opposed to the low level Redis (one connection per subscription), the container uses only one connection that is 'multiplexed' for all registered listeners, the message dispatch being done through the task executor. It is recommended to configure the task executor (and subscription executor when using a blocking Redis connector) instead of using the default SimpleAsyncTaskExecutor for reuse of thread pools.

The container uses a single Redis connection in a lazy fashion (the connection is used only if at least one listener is configured). Listeners can be registered eagerly before starting the container to subscribe to all registered topics upon startup. Listeners are guaranteed to be subscribed after the start() method returns.

Subscriptions are retried gracefully using BackOff that can be configured through setRecoveryInterval(long) until reaching the maximum number of attempts. Listener errors are handled through a ErrorHandler if configured.

This class can be used concurrently after initializing the container with afterPropertiesSet() and start() allowing concurrent calls to addMessageListener(org.springframework.data.redis.connection.MessageListener, java.util.Collection<? extends org.springframework.data.redis.listener.Topic>) and removeMessageListener(org.springframework.data.redis.connection.MessageListener, java.util.Collection<? extends org.springframework.data.redis.listener.Topic>) without external synchronization.

Listeners that wish to receive subscription/unsubscription callbacks in response to subscribe/unsubscribe commands can implement SubscriptionListener.

Author:
Costin Leau, Jennifer Hickey, Way Joke, Thomas Darimont, Mark Paluch, John Blum
See Also:
  • Field Details

    • DEFAULT_RECOVERY_INTERVAL

      public static final long DEFAULT_RECOVERY_INTERVAL
      The default recovery interval: 5000 ms = 5 seconds.
      See Also:
    • DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME

      public static final long DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME
      The default subscription wait time: 2000 ms = 2 seconds.
      See Also:
    • DEFAULT_THREAD_NAME_PREFIX

      public static final String DEFAULT_THREAD_NAME_PREFIX
      Default thread name prefix: "RedisListeningContainer-".
    • logger

      protected final Log logger
      Logger available to subclasses
  • Constructor Details

    • RedisMessageListenerContainer

      public RedisMessageListenerContainer()
  • Method Details

    • setErrorHandler

      public void setErrorHandler(ErrorHandler errorHandler)
      Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default, there will be no ErrorHandler so that error-level logging is the only result.
    • setSubscriptionExecutor

      public void setSubscriptionExecutor(Executor subscriptionExecutor)
      Sets the task execution used for subscribing to Redis channels. By default, if no executor is set, the setTaskExecutor(Executor) will be used. In some cases, this might be undesired as the listening to the connection is a long-running task.

      Note: This implementation uses at most one long-running thread (depending on whether there are any listeners registered or not) and up to two threads during the initial registration.

      Parameters:
      subscriptionExecutor - the subscriptionExecutor to set.
    • setTaskExecutor

      public void setTaskExecutor(Executor taskExecutor)
      Sets the task executor used for running the message listeners when messages are received. If no task executor is set, an instance of SimpleAsyncTaskExecutor will be used by default. The task executor can be adjusted depending on the work done by the listeners and the number of messages coming in.
      Parameters:
      taskExecutor - the taskExecutor to set.
    • getConnectionFactory

      @Nullable public RedisConnectionFactory getConnectionFactory()
      Returns the connectionFactory.
      Returns:
      Returns the connectionFactory
    • setConnectionFactory

      public void setConnectionFactory(RedisConnectionFactory connectionFactory)
      Parameters:
      connectionFactory - The connectionFactory to set.
    • setTopicSerializer

      public void setTopicSerializer(RedisSerializer<String> serializer)
      Sets the serializer for converting the Topics into low-level channels and patterns. By default, StringRedisSerializer is used.
      Parameters:
      serializer - The serializer to set.
    • getMaxSubscriptionRegistrationWaitingTime

      public long getMaxSubscriptionRegistrationWaitingTime()
    • setMaxSubscriptionRegistrationWaitingTime

      public void setMaxSubscriptionRegistrationWaitingTime(long maxSubscriptionRegistrationWaitingTime)
      Specify the max time to wait for subscription registrations, in milliseconds The default is 2000ms, that is, 2 second. The timeout applies for awaiting the subscription registration. Note that subscriptions can be created asynchronously and an expired timeout does not cancel the timeout.
      Parameters:
      maxSubscriptionRegistrationWaitingTime - the maximum subscription registration wait time
      See Also:
    • setRecoveryInterval

      public void setRecoveryInterval(long recoveryInterval)
      Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.
      See Also:
    • setRecoveryBackoff

      public void setRecoveryBackoff(BackOff recoveryInterval)
      Specify the interval BackOff recovery attempts.
      Since:
      2.7
      See Also:
    • setMessageListeners

      public void setMessageListeners(Map<? extends MessageListener,Collection<? extends Topic>> listeners)
      Attaches the given listeners (and their topics) to the container.

      Note: it's possible to call this method while the container is running forcing a reinitialization of the container. Note however that this might cause some messages to be lost (while the container reinitializes) - hence calling this method at runtime is considered advanced usage.

      Parameters:
      listeners - map of message listeners and their associated topics
    • setBeanName

      public void setBeanName(String name)
      Specified by:
      setBeanName in interface BeanNameAware
    • afterPropertiesSet

      public void afterPropertiesSet()
      Specified by:
      afterPropertiesSet in interface InitializingBean
    • createDefaultTaskExecutor

      protected TaskExecutor createDefaultTaskExecutor()
      Creates a default TaskExecutor. Called if no explicit TaskExecutor has been specified.

      The default implementation builds a SimpleAsyncTaskExecutor with the specified bean name (or the class name, if no bean name specified) as thread name prefix.

      See Also:
    • destroy

      public void destroy() throws Exception
      Destroy the container and stop it.
      Specified by:
      destroy in interface DisposableBean
      Throws:
      Exception
    • start

      public void start()
      Startup the container and subscribe to topics if listeners were registered prior to starting the container.

      This method is a potentially blocking method that blocks until a previous stop() is finished and until all previously registered listeners are successfully subscribed.

      Multiple calls to this method are ignored if the container is already running. Concurrent calls to this method are synchronized until the container is started up.

      Specified by:
      start in interface Lifecycle
      See Also:
    • stop

      public void stop()
      Stop the message listener container and cancel any subscriptions if the container is listening. Stopping releases any allocated connections.

      This method is a potentially blocking method that blocks until a previous start() is finished and until the connection is closed if the container was listening.

      Multiple calls to this method are ignored if the container was already stopped. Concurrent calls to this method are synchronized until the container is stopped.

      Specified by:
      stop in interface Lifecycle
    • stop

      public void stop(Runnable callback)
      Stop the message listener container and cancel any subscriptions if the container is listening. Stopping releases any allocated connections.

      This method is a potentially blocking method that blocks until a previous start() is finished and until the connection is closed if the container was listening.

      Multiple calls to this method are ignored if the container was already stopped. Concurrent calls to this method are synchronized until the container is stopped.

      Specified by:
      stop in interface SmartLifecycle
      Parameters:
      callback - callback to notify when the container actually stops.
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
    • isListening

      public boolean isListening()
    • isActive

      public final boolean isActive()
      Return whether this container is currently active, that is, whether it has been set up but not shut down yet.
    • addMessageListener

      public void addMessageListener(MessageListener listener, Collection<? extends Topic> topics)
      Adds a message listener to the (potentially running) container. If the container is running, the listener starts receiving (matching) messages as soon as possible.
      Parameters:
      listener - message listener
      topics - message listener topic
    • addMessageListener

      public void addMessageListener(MessageListener listener, Topic topic)
      Adds a message listener to the (potentially running) container. If the container is running, the listener starts receiving (matching) messages as soon as possible.
      Parameters:
      listener - message listener
      topic - message topic
    • removeMessageListener

      public void removeMessageListener(@Nullable MessageListener listener, Collection<? extends Topic> topics)
      Removes a message listener from the given topics. If the container is running, the listener stops receiving (matching) messages as soon as possible.

      Note that this method obeys the Redis (p)unsubscribe semantics - meaning an empty/null collection will remove listener from all channels.

      Parameters:
      listener - message listener
      topics - message listener topics
    • removeMessageListener

      public void removeMessageListener(@Nullable MessageListener listener, Topic topic)
      Removes a message listener from the given topic. If the container is running, the listener stops receiving (matching) messages as soon as possible.

      Note that this method obeys the Redis (p)unsubscribe semantics - meaning an empty/null collection will remove listener from all channels.

      Parameters:
      listener - message listener
      topic - message topic
    • removeMessageListener

      public void removeMessageListener(MessageListener listener)
      Removes the given message listener completely (from all topics). If the container is running, the listener stops receiving (matching) messages as soon as possible.
      Parameters:
      listener - message listener
    • processMessage

      protected void processMessage(MessageListener listener, Message message, byte[] source)
      Process a message received from the provider.
      Parameters:
      listener - the message listener to notify.
      message - the received message.
      source - the source, either the channel or pattern.
      See Also:
    • handleListenerException

      protected void handleListenerException(Throwable cause)
      Handle the given exception that arose during listener execution.

      The default implementation logs the exception at error level. This can be overridden in subclasses.

      Parameters:
      cause - the exception to handle
    • invokeErrorHandler

      protected void invokeErrorHandler(Throwable cause)
      Invoke the registered ErrorHandler, if any. Log at error level otherwise.
      Parameters:
      cause - the uncaught error that arose during message processing.
      See Also:
    • handleSubscriptionException

      protected void handleSubscriptionException(CompletableFuture<Void> future, BackOffExecution backOffExecution, Throwable cause)
      Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection failure (for example, Redis was restarted).
      Parameters:
      cause - Throwable exception