Class DefaultMessageListenerContainer

java.lang.Object
org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer
All Implemented Interfaces:
org.springframework.context.Lifecycle, org.springframework.context.Phased, org.springframework.context.SmartLifecycle, MessageListenerContainer

public class DefaultMessageListenerContainer extends Object implements MessageListenerContainer
Simple Executor based MessageListenerContainer implementation for running tasks like listening to MongoDB Change Streams and tailable cursors.
This message container creates long-running tasks that are executed on Executor.
Since:
2.1
Author:
Christoph Strobl, Mark Paluch
  • Constructor Details

    • DefaultMessageListenerContainer

      public DefaultMessageListenerContainer(MongoTemplate template)
      Parameters:
      template - must not be null.
    • DefaultMessageListenerContainer

      public DefaultMessageListenerContainer(MongoTemplate template, Executor taskExecutor)
      Create a new DefaultMessageListenerContainer running tasks via the given taskExecutor.
      Parameters:
      template - must not be null.
      taskExecutor - must not be null.
    • DefaultMessageListenerContainer

      public DefaultMessageListenerContainer(MongoTemplate template, Executor taskExecutor, @Nullable org.springframework.util.ErrorHandler errorHandler)
      Create a new DefaultMessageListenerContainer running tasks via the given taskExecutor delegating errors to the given ErrorHandler.
      Parameters:
      template - must not be null. Used by the TaskFactory.
      taskExecutor - must not be null.
      errorHandler - the default ErrorHandler to be used by tasks inside the container. Can be null.
  • Method Details

    • isAutoStartup

      public boolean isAutoStartup()
      Specified by:
      isAutoStartup in interface org.springframework.context.SmartLifecycle
    • stop

      public void stop(Runnable callback)
      Specified by:
      stop in interface org.springframework.context.SmartLifecycle
    • start

      public void start()
      Specified by:
      start in interface org.springframework.context.Lifecycle
    • stop

      public void stop()
      Specified by:
      stop in interface org.springframework.context.Lifecycle
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface org.springframework.context.Lifecycle
    • getPhase

      public int getPhase()
      Specified by:
      getPhase in interface org.springframework.context.Phased
      Specified by:
      getPhase in interface org.springframework.context.SmartLifecycle
    • register

      public <S, T> Subscription register(SubscriptionRequest<S,? super T,? extends SubscriptionRequest.RequestOptions> request, Class<T> bodyType)
      Description copied from interface: MessageListenerContainer
      Register a new SubscriptionRequest in the container. If the is already running the Subscription will be added and run immediately, otherwise it'll be scheduled and started once the container is actually started.
       
           MessageListenerContainer container = ...
      
           MessageListener<ChangeStreamDocument<Document>, Document> messageListener = (message) -> message.getBody().toJson();
           ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(messageListener, () -> "collection-name");
      
           Subscription subscription = container.register(request, Document.class);
       
       
      On Lifecycle.stop() all subscriptions are cancelled prior to shutting down the container itself.
      Registering the very same SubscriptionRequest more than once simply returns the already existing Subscription.
      Unless a Subscription is removed form the container, the Subscription is restarted once the container itself is restarted.
      Errors during Message retrieval lead to cannelation of the underlying task.
      Specified by:
      register in interface MessageListenerContainer
      Parameters:
      request - must not be null.
      bodyType - the exact target or a more concrete type of the Message.getBody().
      Returns:
      never null.
    • register

      public <S, T> Subscription register(SubscriptionRequest<S,? super T,? extends SubscriptionRequest.RequestOptions> request, Class<T> bodyType, org.springframework.util.ErrorHandler errorHandler)
      Description copied from interface: MessageListenerContainer
      Register a new SubscriptionRequest in the container. If the is already running the Subscription will be added and run immediately, otherwise it'll be scheduled and started once the container is actually started.
       
           MessageListenerContainer container = ...
      
           MessageListener<ChangeStreamDocument<Document>, Document> messageListener = (message) -> message.getBody().toJson();
           ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(messageListener, () -> "collection-name");
      
           Subscription subscription = container.register(request, Document.class);
       
       
      On Lifecycle.stop() all subscriptions are cancelled prior to shutting down the container itself.
      Registering the very same SubscriptionRequest more than once simply returns the already existing Subscription.
      Unless a Subscription is removed form the container, the Subscription is restarted once the container itself is restarted.
      Errors during Message retrieval are delegated to the given ErrorHandler.
      Specified by:
      register in interface MessageListenerContainer
      Parameters:
      request - must not be null.
      bodyType - the exact target or a more concrete type of the Message.getBody(). Must not be null.
      errorHandler - the callback to invoke when retrieving the Message from the data source fails for some reason.
      Returns:
      never null.
    • lookup

      public Optional<Subscription> lookup(SubscriptionRequest<?,?,?> request)
      Description copied from interface: MessageListenerContainer
      Lookup the given SubscriptionRequest within the container and return the associated Subscription if present.
      Specified by:
      lookup in interface MessageListenerContainer
      Parameters:
      request - must not be null.
      Returns:
      Optional.empty() if not set.
    • register

      public Subscription register(SubscriptionRequest request, Task task)
    • remove

      public void remove(Subscription subscription)
      Description copied from interface: MessageListenerContainer
      Unregister a given Subscription from the container. This prevents the Subscription to be restarted in a potential stop/start scenario.
      An active subcription is cancelled prior to removal.
      Specified by:
      remove in interface MessageListenerContainer
      Parameters:
      subscription - must not be null.