The Spring Framework

org.springframework.jms.listener
Class DefaultMessageListenerContainer

java.lang.Object
  extended by org.springframework.jms.support.JmsAccessor
      extended by org.springframework.jms.support.destination.JmsDestinationAccessor
          extended by org.springframework.jms.listener.AbstractMessageListenerContainer
              extended by org.springframework.jms.listener.DefaultMessageListenerContainer
All Implemented Interfaces:
BeanNameAware, DisposableBean, InitializingBean, Lifecycle
Direct Known Subclasses:
DefaultMessageListenerContainer102

public class DefaultMessageListenerContainer
extends AbstractMessageListenerContainer
implements BeanNameAware

Message listener container that uses plain JMS client API, specifically a loop of MessageConsumer.receive() calls that also allow for transactional reception of messages (registering them with XA transactions). Designed to work in a native JMS environment as well as in a J2EE environment, with only minimal differences in configuration.

This is a simple but nevertheless powerful form of a message listener container. It creates a fixed number of JMS Sessions to invoke the listener, not allowing for dynamic adaptation to runtime demands. Like SimpleMessageListenerContainer, its main advantage is its low level of complexity and the minimum requirements on the JMS provider: Not even the ServerSessionPool facility is required.

Actual MessageListener execution happens in separate threads that are created through Spring's TaskExecutor abstraction. By default, the appropriate number of threads are created on startup, according to the "concurrentConsumers" setting. Specify an alternative TaskExecutor to integrate with an existing thread pool facility, for example. With a native JMS setup, each of those listener threads is gonna use a cached JMS Session and MessageConsumer (only refreshed in case of failure), using the JMS provider's resources as efficiently as possible.

Message reception and listener execution can automatically be wrapped in transactions through passing a Spring PlatformTransactionManager into the "transactionManager" property. This will usually be a JtaTransactionManager in a J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory that this message listener container fetches its Connections from (check your J2EE server's documentation). Note that this listener container will automatically reobtain all JMS handles for each transaction in case of an external transaction manager specified, for compatibility with all J2EE servers (in particular JBoss). This non-caching behavior can be overridden through the "cacheLevel"/"cacheLevelName" property, enforcing caching of the Connection (or also Session and MessageConsumer) even in case of an external transaction manager.

See the AbstractMessageListenerContainer javadoc for details on acknowledge modes and transaction options.

This class requires a JMS 1.1+ provider, because it builds on the domain-independent API. Use the DefaultMessageListenerContainer102 subclass for JMS 1.0.2 providers.

For dynamic adaptation of the active number of Sessions, consider using ServerSessionMessageListenerContainer.

Since:
2.0
Author:
Juergen Hoeller
See Also:
setTransactionManager(org.springframework.transaction.PlatformTransactionManager), setCacheLevel(int), setCacheLevelName(java.lang.String), JtaTransactionManager, MessageConsumer.receive(long), SimpleMessageListenerContainer, ServerSessionMessageListenerContainer, DefaultMessageListenerContainer102

Nested Class Summary
 
Nested classes/interfaces inherited from class org.springframework.jms.listener.AbstractMessageListenerContainer
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
 
Field Summary
static int CACHE_CONNECTION
          Constant that indicates to cache a shared JMS Connection.
static int CACHE_CONSUMER
          Constant that indicates to cache a shared JMS Connection and a JMS Session for each listener thread, as well as a JMS MessageConsumer for each listener thread.
static int CACHE_NONE
          Constant that indicates to cache no JMS resources at all.
static int CACHE_SESSION
          Constant that indicates to cache a shared JMS Connection and a JMS Session for each listener thread.
static long DEFAULT_RECEIVE_TIMEOUT
          The default receive timeout: 1000 ms = 1 second.
static long DEFAULT_RECOVERY_INTERVAL
          The default recovery interval: 5000 ms = 5 seconds.
static String DEFAULT_THREAD_NAME_PREFIX
          Default thread name prefix: "SimpleAsyncTaskExecutor-".
 
Fields inherited from class org.springframework.jms.support.JmsAccessor
logger
 
Constructor Summary
DefaultMessageListenerContainer()
           
 
Method Summary
 void afterPropertiesSet()
          Validates this instance's configuration.
protected  MessageConsumer createConsumer(Session session, Destination destination)
          Create a JMS MessageConsumer for the given Session and Destination.
protected  TaskExecutor createDefaultTaskExecutor()
          Create a default TaskExecutor.
protected  MessageConsumer createListenerConsumer(Session session)
          Create a MessageConsumer for the given JMS Session, registering a MessageListener for the specified listener.
protected  void destroyListener()
          Destroy the registered JMS Sessions and associated MessageConsumers.
protected  void doReceiveAndExecute(Session session, MessageConsumer consumer, TransactionStatus status)
          Actually execute the listener for a message received from the given consumer, fetching all requires resources and invoking the listener.
protected  void doRescheduleTask(Object task)
          Executes the given task via this listener container's TaskExecutor.
protected  void establishSharedConnection()
          Overridden to accept a failure in the initial setup - leaving it up to the asynchronous invokers to establish the shared Connection on first access.
 int getCacheLevel()
          Return the level of caching that this listener container is allowed to apply.
protected  Connection getConnection(JmsResourceHolder holder)
          Fetch an appropriate Connection from the given JmsResourceHolder.
protected  Session getSession(JmsResourceHolder holder)
          Fetch an appropriate Session from the given JmsResourceHolder.
protected  void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered)
          Handle the given exception that arose during setup of a listener.
 void initialize()
          Initialize this message listener container.
protected  boolean isPubSubNoLocal()
          Return whether to inhibit the delivery of messages published by its own connection.
protected  void receiveAndExecute(Session session, MessageConsumer consumer)
          Execute the listener for a message received from the given consumer, wrapping the entire operation in an external transaction if demanded.
protected  Message receiveMessage(MessageConsumer consumer)
          Receive a message from the given consumer.
protected  void recoverAfterListenerSetupFailure()
          Recover this listener container after a listener failed to set itself up, for example reestablishing the underlying Connection.
protected  void refreshConnectionUntilSuccessful()
          Refresh the underlying Connection, not returning before an attempt has been successful.
protected  void refreshDestination()
          Refresh the JMS destination that this listener container operates on.
protected  void registerListener()
          Creates the specified number of concurrent consumers, in the form of a JMS Session plus associated MessageConsumer running in a separate thread.
 void setBeanName(String beanName)
          Set the name of the bean in the bean factory that created this bean.
 void setCacheLevel(int cacheLevel)
          Specify the level of caching that this listener container is allowed to apply.
 void setCacheLevelName(String constantName)
          Specify the level of caching that this listener container is allowed to apply, in the form of the name of the corresponding constant: e.g.
 void setConcurrentConsumers(int concurrentConsumers)
          Specify the number of concurrent consumers to create.
 void setMaxMessagesPerTask(int maxMessagesPerTask)
          Set the maximum number of messages to process in one task.
 void setPubSubNoLocal(boolean pubSubNoLocal)
          Set whether to inhibit the delivery of messages published by its own connection.
 void setReceiveTimeout(long receiveTimeout)
          Set the timeout to use for receive calls, in milliseconds.
 void setRecoveryInterval(long recoveryInterval)
          Specify the interval between recovery attempts, in milliseconds.
 void setTaskExecutor(TaskExecutor taskExecutor)
          Set the Spring TaskExecutor to use for running the listener threads.
 void setTransactionManager(PlatformTransactionManager transactionManager)
          Specify the Spring PlatformTransactionManager to use for transactional wrapping of message reception plus listener execution.
 void setTransactionTimeout(int transactionTimeout)
          Specify the transaction timeout to use for transactional wrapping, in seconds.
protected  boolean sharedConnectionEnabled()
          Use a shared JMS Connection depending on the "cacheLevel" setting.
protected  void sleepInbetweenRecoveryAttempts()
           
protected  void startSharedConnection()
          This implementations proceeds even after an exception thrown from Connection.start(), relying on listeners to perform appropriate recovery.
protected  void stopSharedConnection()
          This implementations proceeds even after an exception thrown from Connection.stop(), relying on listeners to perform appropriate recovery after a restart.
 
Methods inherited from class org.springframework.jms.listener.AbstractMessageListenerContainer
checkMessageListener, commitIfNecessary, createConnection, createSession, destroy, doExecuteListener, doInvokeListener, doInvokeListener, doStart, doStop, executeListener, getClientId, getDestination, getDestinationName, getDurableSubscriptionName, getExceptionListener, getMessageListener, getMessageSelector, getSharedConnection, handleListenerException, invokeExceptionListener, invokeListener, isActive, isClientAcknowledge, isExposeListenerSession, isRunning, isSubscriptionDurable, prepareSharedConnection, refreshSharedConnection, rescheduleTaskIfNecessary, rollbackOnExceptionIfNecessary, setAutoStartup, setClientId, setDestination, setDestinationName, setDurableSubscriptionName, setExceptionListener, setExposeListenerSession, setMessageListener, setMessageSelector, setSubscriptionDurable, shutdown, start, stop, waitWhileNotRunning
 
Methods inherited from class org.springframework.jms.support.destination.JmsDestinationAccessor
getDestinationResolver, isPubSubDomain, resolveDestinationName, setDestinationResolver, setPubSubDomain
 
Methods inherited from class org.springframework.jms.support.JmsAccessor
convertJmsAccessException, getConnectionFactory, getSessionAcknowledgeMode, isSessionTransacted, setConnectionFactory, setSessionAcknowledgeMode, setSessionAcknowledgeModeName, setSessionTransacted
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_THREAD_NAME_PREFIX

public static final String DEFAULT_THREAD_NAME_PREFIX
Default thread name prefix: "SimpleAsyncTaskExecutor-".


DEFAULT_RECEIVE_TIMEOUT

public static final long DEFAULT_RECEIVE_TIMEOUT
The default receive timeout: 1000 ms = 1 second.

See Also:
Constant Field Values

DEFAULT_RECOVERY_INTERVAL

public static final long DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.

See Also:
Constant Field Values

CACHE_NONE

public static final int CACHE_NONE
Constant that indicates to cache no JMS resources at all.

See Also:
setCacheLevel(int), Constant Field Values

CACHE_CONNECTION

public static final int CACHE_CONNECTION
Constant that indicates to cache a shared JMS Connection.

See Also:
setCacheLevel(int), Constant Field Values

CACHE_SESSION

public static final int CACHE_SESSION
Constant that indicates to cache a shared JMS Connection and a JMS Session for each listener thread.

See Also:
setCacheLevel(int), Constant Field Values

CACHE_CONSUMER

public static final int CACHE_CONSUMER
Constant that indicates to cache a shared JMS Connection and a JMS Session for each listener thread, as well as a JMS MessageConsumer for each listener thread.

See Also:
setCacheLevel(int), Constant Field Values
Constructor Detail

DefaultMessageListenerContainer

public DefaultMessageListenerContainer()
Method Detail

setPubSubNoLocal

public void setPubSubNoLocal(boolean pubSubNoLocal)
Set whether to inhibit the delivery of messages published by its own connection. Default is "false".

See Also:
TopicSession.createSubscriber(javax.jms.Topic, String, boolean)

isPubSubNoLocal

protected boolean isPubSubNoLocal()
Return whether to inhibit the delivery of messages published by its own connection.


setTaskExecutor

public void setTaskExecutor(TaskExecutor taskExecutor)
Set the Spring TaskExecutor to use for running the listener threads. Default is SimpleAsyncTaskExecutor, starting up a number of new threads, according to the specified number of concurrent consumers.

Specify an alternative TaskExecutor for integration with an existing thread pool. Note that this really only adds value if the threads are managed in a specific fashion, for example within a J2EE environment. A plain thread pool does not add much value, as this listener container will occupy a number of threads for its entire lifetime.

See Also:
setConcurrentConsumers(int), SimpleAsyncTaskExecutor, WorkManagerTaskExecutor

setConcurrentConsumers

public void setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create. Default is 1.


setMaxMessagesPerTask

public void setMaxMessagesPerTask(int maxMessagesPerTask)
Set the maximum number of messages to process in one task. More concretely, this limits the number of message reception attempts, which includes receive iterations that did not actually pick up a message until they hit their timeout (see "receiveTimeout" property).

Default is unlimited (-1) in case of a standard TaskExecutor, and 1 in case of a SchedulingTaskExecutor that indicates a preference for short-lived tasks. Specify a number of 10 to 100 messages to balance between extremely long-lived and extremely short-lived tasks here.

Long-lived tasks avoid frequent thread context switches through sticking with the same thread all the way through, while short-lived tasks allow thread pools to control the scheduling. Hence, thread pools will usually prefer short-lived tasks.

See Also:
setTaskExecutor(org.springframework.core.task.TaskExecutor), setReceiveTimeout(long), SchedulingTaskExecutor.prefersShortLivedTasks()

setTransactionManager

public void setTransactionManager(PlatformTransactionManager transactionManager)
Specify the Spring PlatformTransactionManager to use for transactional wrapping of message reception plus listener execution. Default is none, not performing any transactional wrapping.

If specified, this will usually be a Spring JtaTransactionManager, in combination with a JTA-aware ConnectionFactory that this message listener container fetches its Connections from.

Alternatively, pass in a fully configured Spring TransactionTemplate into the "transactionTemplate" property.


setTransactionTimeout

public void setTransactionTimeout(int transactionTimeout)
Specify the transaction timeout to use for transactional wrapping, in seconds. Default is none, using the transaction manager's default timeout.

See Also:
TransactionDefinition.getTimeout(), setReceiveTimeout(long)

setReceiveTimeout

public void setReceiveTimeout(long receiveTimeout)
Set the timeout to use for receive calls, in milliseconds. The default is 1000 ms, that is, 1 second.

NOTE: This value needs to be smaller than the transaction timeout used by the transaction manager (in the appropriate unit, of course). -1 indicates no timeout at all; however, this is only feasible if not running within a transaction manager.

See Also:
MessageConsumer.receive(long), MessageConsumer.receive(), setTransactionTimeout(int)

setRecoveryInterval

public void setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.

See Also:
handleListenerSetupFailure(java.lang.Throwable, boolean)

setCacheLevelName

public void setCacheLevelName(String constantName)
                       throws IllegalArgumentException
Specify the level of caching that this listener container is allowed to apply, in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".

Throws:
IllegalArgumentException
See Also:
setCacheLevel(int)

setCacheLevel

public void setCacheLevel(int cacheLevel)
Specify the level of caching that this listener container is allowed to apply.

Default is CACHE_NONE if an external transaction manager has been specified (to reobtain all resources freshly within the scope of the external transaction), and CACHE_CONSUMER else (operating with local JMS resources).

Some J2EE servers only register their JMS resources with an ongoing XA transaction in case of a freshly obtained JMS Connection and Session, which is why this listener container does by default not cache any of those. However, if you want to optimize for a specific server, consider switching this setting to at least CACHE_CONNECTION or CACHE_SESSION even in conjunction with an external transaction manager.

Currently known servers that absolutely require CACHE_NONE for XA transaction processing: JBoss 4. For any others, consider raising the cache level.

See Also:
CACHE_NONE, CACHE_CONNECTION, CACHE_SESSION, CACHE_CONSUMER, setTransactionManager(org.springframework.transaction.PlatformTransactionManager)

getCacheLevel

public int getCacheLevel()
Return the level of caching that this listener container is allowed to apply.


setBeanName

public void setBeanName(String beanName)
Description copied from interface: BeanNameAware
Set the name of the bean in the bean factory that created this bean.

Invoked after population of normal bean properties but before an init callback like InitializingBean's afterPropertiesSet or a custom init-method.

Specified by:
setBeanName in interface BeanNameAware
Parameters:
beanName - the name of the bean in the factory

afterPropertiesSet

public void afterPropertiesSet()
Validates this instance's configuration.

Specified by:
afterPropertiesSet in interface InitializingBean
Overrides:
afterPropertiesSet in class AbstractMessageListenerContainer
See Also:
AbstractMessageListenerContainer.initialize()

initialize

public void initialize()
Description copied from class: AbstractMessageListenerContainer
Initialize this message listener container.

Creates a JMS Connection, registers the given listener object, and starts the Connection (if "autoStartup" hasn't been turned off).

Overrides:
initialize in class AbstractMessageListenerContainer

createDefaultTaskExecutor

protected TaskExecutor createDefaultTaskExecutor()
Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.

The default implementation builds a SimpleAsyncTaskExecutor with the specified bean name (or the class name, if no bean name specified) as thread name prefix.

See Also:
SimpleAsyncTaskExecutor.SimpleAsyncTaskExecutor(String)

sharedConnectionEnabled

protected final boolean sharedConnectionEnabled()
Use a shared JMS Connection depending on the "cacheLevel" setting.

Specified by:
sharedConnectionEnabled in class AbstractMessageListenerContainer
See Also:
setCacheLevel(int), CACHE_CONNECTION

registerListener

protected void registerListener()
                         throws JMSException
Creates the specified number of concurrent consumers, in the form of a JMS Session plus associated MessageConsumer running in a separate thread.

Specified by:
registerListener in class AbstractMessageListenerContainer
Throws:
JMSException - if registration failed
See Also:
setTaskExecutor(org.springframework.core.task.TaskExecutor)

doRescheduleTask

protected void doRescheduleTask(Object task)
Executes the given task via this listener container's TaskExecutor.

Overrides:
doRescheduleTask in class AbstractMessageListenerContainer
Parameters:
task - the task object to reschedule
See Also:
setTaskExecutor(org.springframework.core.task.TaskExecutor)

createListenerConsumer

protected MessageConsumer createListenerConsumer(Session session)
                                          throws JMSException
Create a MessageConsumer for the given JMS Session, registering a MessageListener for the specified listener.

Parameters:
session - the JMS Session to work on
Returns:
the MessageConsumer
Throws:
JMSException - if thrown by JMS methods
See Also:
receiveAndExecute(javax.jms.Session, javax.jms.MessageConsumer)

receiveAndExecute

protected void receiveAndExecute(Session session,
                                 MessageConsumer consumer)
                          throws JMSException
Execute the listener for a message received from the given consumer, wrapping the entire operation in an external transaction if demanded.

Parameters:
session - the JMS Session to work on
consumer - the MessageConsumer to work on
Throws:
JMSException - if thrown by JMS methods
See Also:
doReceiveAndExecute(javax.jms.Session, javax.jms.MessageConsumer, org.springframework.transaction.TransactionStatus)

doReceiveAndExecute

protected void doReceiveAndExecute(Session session,
                                   MessageConsumer consumer,
                                   TransactionStatus status)
                            throws JMSException
Actually execute the listener for a message received from the given consumer, fetching all requires resources and invoking the listener.

Parameters:
session - the JMS Session to work on
consumer - the MessageConsumer to work on
status - the TransactionStatus (may be null)
Throws:
JMSException - if thrown by JMS methods
See Also:
AbstractMessageListenerContainer.doExecuteListener(javax.jms.Session, javax.jms.Message)

receiveMessage

protected Message receiveMessage(MessageConsumer consumer)
                          throws JMSException
Receive a message from the given consumer.

Parameters:
consumer - the MessageConsumer to use
Returns:
the Message, or null if none
Throws:
JMSException - if thrown by JMS methods

establishSharedConnection

protected void establishSharedConnection()
Overridden to accept a failure in the initial setup - leaving it up to the asynchronous invokers to establish the shared Connection on first access.

Overrides:
establishSharedConnection in class AbstractMessageListenerContainer
See Also:
refreshConnectionUntilSuccessful()

startSharedConnection

protected void startSharedConnection()
This implementations proceeds even after an exception thrown from Connection.start(), relying on listeners to perform appropriate recovery.

Overrides:
startSharedConnection in class AbstractMessageListenerContainer
See Also:
Connection.start()

stopSharedConnection

protected void stopSharedConnection()
This implementations proceeds even after an exception thrown from Connection.stop(), relying on listeners to perform appropriate recovery after a restart.

Overrides:
stopSharedConnection in class AbstractMessageListenerContainer
See Also:
Connection.start()

handleListenerSetupFailure

protected void handleListenerSetupFailure(Throwable ex,
                                          boolean alreadyRecovered)
Handle the given exception that arose during setup of a listener. Called for every such exception in every concurrent listener.

The default implementation logs the exception at error level if not recovered yet, and at debug level if already recovered. Can be overridden in subclasses.

Parameters:
ex - the exception to handle
alreadyRecovered - whether a previously executing listener already recovered from the present listener setup failure (this usually indicates a follow-up failure than be ignored other than for debug log purposes)
See Also:
recoverAfterListenerSetupFailure()

recoverAfterListenerSetupFailure

protected void recoverAfterListenerSetupFailure()
Recover this listener container after a listener failed to set itself up, for example reestablishing the underlying Connection.

The default implementation delegates to DefaultMessageListenerContainer's recovery-capable refreshConnectionUntilSuccessful method, which will try to reestablish a Connection to the JMS provider both for the shared and the non-shared Connection case.

See Also:
refreshConnectionUntilSuccessful(), refreshDestination()

refreshConnectionUntilSuccessful

protected void refreshConnectionUntilSuccessful()
Refresh the underlying Connection, not returning before an attempt has been successful. Called in case of a shared Connection as well as without shared Connection, so either needs to operate on the shared Connection or on a temporary Connection that just gets established for validation purposes.

The default implementation retries until it successfully established a Connection, for as long as this message listener container is active. Applies the specified recovery interval between retries.

See Also:
setRecoveryInterval(long)

refreshDestination

protected void refreshDestination()
Refresh the JMS destination that this listener container operates on.

Called after listener setup failure, assuming that a cached Destination object might have become invalid (a typical case on WebLogic JMS).

The default implementation removes the destination from a DestinationResolver's cache, in case of a CachingDestinationResolver.

See Also:
AbstractMessageListenerContainer.setDestinationName(java.lang.String), CachingDestinationResolver

sleepInbetweenRecoveryAttempts

protected void sleepInbetweenRecoveryAttempts()

destroyListener

protected void destroyListener()
                        throws JMSException
Destroy the registered JMS Sessions and associated MessageConsumers.

Specified by:
destroyListener in class AbstractMessageListenerContainer
Throws:
JMSException - if destruction failed

getConnection

protected Connection getConnection(JmsResourceHolder holder)
Fetch an appropriate Connection from the given JmsResourceHolder.

This implementation accepts any JMS 1.1 Connection.

Parameters:
holder - the JmsResourceHolder
Returns:
an appropriate Connection fetched from the holder, or null if none found

getSession

protected Session getSession(JmsResourceHolder holder)
Fetch an appropriate Session from the given JmsResourceHolder.

This implementation accepts any JMS 1.1 Session.

Parameters:
holder - the JmsResourceHolder
Returns:
an appropriate Session fetched from the holder, or null if none found

createConsumer

protected MessageConsumer createConsumer(Session session,
                                         Destination destination)
                                  throws JMSException
Create a JMS MessageConsumer for the given Session and Destination.

This implementation uses JMS 1.1 API.

Parameters:
session - the JMS Session to create a MessageConsumer for
destination - the JMS Destination to create a MessageConsumer for
Returns:
the new JMS MessageConsumer
Throws:
JMSException - if thrown by JMS API methods

The Spring Framework

Copyright © 2002-2006 The Spring Framework.