|
The Spring Framework | |||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object org.springframework.jms.support.JmsAccessor org.springframework.jms.support.destination.JmsDestinationAccessor org.springframework.jms.listener.AbstractMessageListenerContainer org.springframework.jms.listener.DefaultMessageListenerContainer
public class DefaultMessageListenerContainer
Message listener container that uses plain JMS client API, specifically
a loop of MessageConsumer.receive()
calls that also allow for
transactional reception of messages (registering them with XA transactions).
Designed to work in a native JMS environment as well as in a J2EE environment,
with only minimal differences in configuration.
This is a simple but nevertheless powerful form of a message listener container. It creates a fixed number of JMS Sessions to invoke the listener, not allowing for dynamic adaptation to runtime demands. Like SimpleMessageListenerContainer, its main advantage is its low level of complexity and the minimum requirements on the JMS provider: Not even the ServerSessionPool facility is required.
Actual MessageListener execution happens in separate threads that are created through Spring's TaskExecutor abstraction. By default, the appropriate number of threads are created on startup, according to the "concurrentConsumers" setting. Specify an alternative TaskExecutor to integrate with an existing thread pool facility, for example. With a native JMS setup, each of those listener threads is gonna use a cached JMS Session and MessageConsumer (only refreshed in case of failure), using the JMS provider's resources as efficiently as possible.
Message reception and listener execution can automatically be wrapped in transactions through passing a Spring PlatformTransactionManager into the "transactionManager" property. This will usually be a JtaTransactionManager in a J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory that this message listener container fetches its Connections from (check your J2EE server's documentation). Note that this listener container will automatically reobtain all JMS handles for each transaction in case of an external transaction manager specified, for compatibility with all J2EE servers (in particular JBoss). This non-caching behavior can be overridden through the "cacheLevel"/"cacheLevelName" property, enforcing caching of the Connection (or also Session and MessageConsumer) even in case of an external transaction manager.
See the AbstractMessageListenerContainer
javadoc for details on acknowledge modes and transaction options.
This class requires a JMS 1.1+ provider, because it builds on the
domain-independent API. Use the DefaultMessageListenerContainer102
subclass for JMS 1.0.2 providers.
For dynamic adaptation of the active number of Sessions, consider using ServerSessionMessageListenerContainer.
setTransactionManager(org.springframework.transaction.PlatformTransactionManager)
,
setCacheLevel(int)
,
setCacheLevelName(java.lang.String)
,
JtaTransactionManager
,
MessageConsumer.receive(long)
,
SimpleMessageListenerContainer
,
ServerSessionMessageListenerContainer
,
DefaultMessageListenerContainer102
Nested Class Summary |
---|
Nested classes/interfaces inherited from class org.springframework.jms.listener.AbstractMessageListenerContainer |
---|
AbstractMessageListenerContainer.SharedConnectionNotInitializedException |
Field Summary | |
---|---|
static int |
CACHE_CONNECTION
Constant that indicates to cache a shared JMS Connection. |
static int |
CACHE_CONSUMER
Constant that indicates to cache a shared JMS Connection and a JMS Session for each listener thread, as well as a JMS MessageConsumer for each listener thread. |
static int |
CACHE_NONE
Constant that indicates to cache no JMS resources at all. |
static int |
CACHE_SESSION
Constant that indicates to cache a shared JMS Connection and a JMS Session for each listener thread. |
static long |
DEFAULT_RECEIVE_TIMEOUT
The default receive timeout: 1000 ms = 1 second. |
static long |
DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds. |
static String |
DEFAULT_THREAD_NAME_PREFIX
Default thread name prefix: "SimpleAsyncTaskExecutor-". |
Fields inherited from class org.springframework.jms.support.JmsAccessor |
---|
logger |
Constructor Summary | |
---|---|
DefaultMessageListenerContainer()
|
Method Summary | |
---|---|
void |
afterPropertiesSet()
Validates this instance's configuration. |
protected MessageConsumer |
createConsumer(Session session,
Destination destination)
Create a JMS MessageConsumer for the given Session and Destination. |
protected TaskExecutor |
createDefaultTaskExecutor()
Create a default TaskExecutor. |
protected MessageConsumer |
createListenerConsumer(Session session)
Create a MessageConsumer for the given JMS Session, registering a MessageListener for the specified listener. |
protected void |
destroyListener()
Destroy the registered JMS Sessions and associated MessageConsumers. |
protected void |
doReceiveAndExecute(Session session,
MessageConsumer consumer,
TransactionStatus status)
Actually execute the listener for a message received from the given consumer, fetching all requires resources and invoking the listener. |
protected void |
doRescheduleTask(Object task)
Executes the given task via this listener container's TaskExecutor. |
protected void |
establishSharedConnection()
Overridden to accept a failure in the initial setup - leaving it up to the asynchronous invokers to establish the shared Connection on first access. |
int |
getCacheLevel()
Return the level of caching that this listener container is allowed to apply. |
protected Connection |
getConnection(JmsResourceHolder holder)
Fetch an appropriate Connection from the given JmsResourceHolder. |
protected Session |
getSession(JmsResourceHolder holder)
Fetch an appropriate Session from the given JmsResourceHolder. |
protected void |
handleListenerSetupFailure(Throwable ex,
boolean alreadyRecovered)
Handle the given exception that arose during setup of a listener. |
void |
initialize()
Initialize this message listener container. |
protected boolean |
isPubSubNoLocal()
Return whether to inhibit the delivery of messages published by its own connection. |
protected void |
receiveAndExecute(Session session,
MessageConsumer consumer)
Execute the listener for a message received from the given consumer, wrapping the entire operation in an external transaction if demanded. |
protected Message |
receiveMessage(MessageConsumer consumer)
Receive a message from the given consumer. |
protected void |
recoverAfterListenerSetupFailure()
Recover this listener container after a listener failed to set itself up, for example reestablishing the underlying Connection. |
protected void |
refreshConnectionUntilSuccessful()
Refresh the underlying Connection, not returning before an attempt has been successful. |
protected void |
refreshDestination()
Refresh the JMS destination that this listener container operates on. |
protected void |
registerListener()
Creates the specified number of concurrent consumers, in the form of a JMS Session plus associated MessageConsumer running in a separate thread. |
void |
setBeanName(String beanName)
Set the name of the bean in the bean factory that created this bean. |
void |
setCacheLevel(int cacheLevel)
Specify the level of caching that this listener container is allowed to apply. |
void |
setCacheLevelName(String constantName)
Specify the level of caching that this listener container is allowed to apply, in the form of the name of the corresponding constant: e.g. |
void |
setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create. |
void |
setMaxMessagesPerTask(int maxMessagesPerTask)
Set the maximum number of messages to process in one task. |
void |
setPubSubNoLocal(boolean pubSubNoLocal)
Set whether to inhibit the delivery of messages published by its own connection. |
void |
setReceiveTimeout(long receiveTimeout)
Set the timeout to use for receive calls, in milliseconds. |
void |
setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds. |
void |
setTaskExecutor(TaskExecutor taskExecutor)
Set the Spring TaskExecutor to use for running the listener threads. |
void |
setTransactionManager(PlatformTransactionManager transactionManager)
Specify the Spring PlatformTransactionManager to use for transactional wrapping of message reception plus listener execution. |
void |
setTransactionTimeout(int transactionTimeout)
Specify the transaction timeout to use for transactional wrapping, in seconds. |
protected boolean |
sharedConnectionEnabled()
Use a shared JMS Connection depending on the "cacheLevel" setting. |
protected void |
sleepInbetweenRecoveryAttempts()
|
protected void |
startSharedConnection()
This implementations proceeds even after an exception thrown from Connection.start() , relying on listeners to perform
appropriate recovery. |
protected void |
stopSharedConnection()
This implementations proceeds even after an exception thrown from Connection.stop() , relying on listeners to perform
appropriate recovery after a restart. |
Methods inherited from class org.springframework.jms.support.destination.JmsDestinationAccessor |
---|
getDestinationResolver, isPubSubDomain, resolveDestinationName, setDestinationResolver, setPubSubDomain |
Methods inherited from class org.springframework.jms.support.JmsAccessor |
---|
convertJmsAccessException, getConnectionFactory, getSessionAcknowledgeMode, isSessionTransacted, setConnectionFactory, setSessionAcknowledgeMode, setSessionAcknowledgeModeName, setSessionTransacted |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
---|
public static final String DEFAULT_THREAD_NAME_PREFIX
public static final long DEFAULT_RECEIVE_TIMEOUT
public static final long DEFAULT_RECOVERY_INTERVAL
public static final int CACHE_NONE
setCacheLevel(int)
,
Constant Field Valuespublic static final int CACHE_CONNECTION
setCacheLevel(int)
,
Constant Field Valuespublic static final int CACHE_SESSION
setCacheLevel(int)
,
Constant Field Valuespublic static final int CACHE_CONSUMER
setCacheLevel(int)
,
Constant Field ValuesConstructor Detail |
---|
public DefaultMessageListenerContainer()
Method Detail |
---|
public void setPubSubNoLocal(boolean pubSubNoLocal)
TopicSession.createSubscriber(javax.jms.Topic, String, boolean)
protected boolean isPubSubNoLocal()
public void setTaskExecutor(TaskExecutor taskExecutor)
Specify an alternative TaskExecutor for integration with an existing thread pool. Note that this really only adds value if the threads are managed in a specific fashion, for example within a J2EE environment. A plain thread pool does not add much value, as this listener container will occupy a number of threads for its entire lifetime.
setConcurrentConsumers(int)
,
SimpleAsyncTaskExecutor
,
WorkManagerTaskExecutor
public void setConcurrentConsumers(int concurrentConsumers)
public void setMaxMessagesPerTask(int maxMessagesPerTask)
Default is unlimited (-1) in case of a standard TaskExecutor, and 1 in case of a SchedulingTaskExecutor that indicates a preference for short-lived tasks. Specify a number of 10 to 100 messages to balance between extremely long-lived and extremely short-lived tasks here.
Long-lived tasks avoid frequent thread context switches through sticking with the same thread all the way through, while short-lived tasks allow thread pools to control the scheduling. Hence, thread pools will usually prefer short-lived tasks.
setTaskExecutor(org.springframework.core.task.TaskExecutor)
,
setReceiveTimeout(long)
,
SchedulingTaskExecutor.prefersShortLivedTasks()
public void setTransactionManager(PlatformTransactionManager transactionManager)
If specified, this will usually be a Spring JtaTransactionManager, in combination with a JTA-aware ConnectionFactory that this message listener container fetches its Connections from.
Alternatively, pass in a fully configured Spring TransactionTemplate into the "transactionTemplate" property.
public void setTransactionTimeout(int transactionTimeout)
TransactionDefinition.getTimeout()
,
setReceiveTimeout(long)
public void setReceiveTimeout(long receiveTimeout)
NOTE: This value needs to be smaller than the transaction timeout used by the transaction manager (in the appropriate unit, of course). -1 indicates no timeout at all; however, this is only feasible if not running within a transaction manager.
MessageConsumer.receive(long)
,
MessageConsumer.receive()
,
setTransactionTimeout(int)
public void setRecoveryInterval(long recoveryInterval)
handleListenerSetupFailure(java.lang.Throwable, boolean)
public void setCacheLevelName(String constantName) throws IllegalArgumentException
IllegalArgumentException
setCacheLevel(int)
public void setCacheLevel(int cacheLevel)
Default is CACHE_NONE if an external transaction manager has been specified (to reobtain all resources freshly within the scope of the external transaction), and CACHE_CONSUMER else (operating with local JMS resources).
Some J2EE servers only register their JMS resources with an ongoing XA transaction in case of a freshly obtained JMS Connection and Session, which is why this listener container does by default not cache any of those. However, if you want to optimize for a specific server, consider switching this setting to at least CACHE_CONNECTION or CACHE_SESSION even in conjunction with an external transaction manager.
Currently known servers that absolutely require CACHE_NONE for XA transaction processing: JBoss 4. For any others, consider raising the cache level.
CACHE_NONE
,
CACHE_CONNECTION
,
CACHE_SESSION
,
CACHE_CONSUMER
,
setTransactionManager(org.springframework.transaction.PlatformTransactionManager)
public int getCacheLevel()
public void setBeanName(String beanName)
BeanNameAware
Invoked after population of normal bean properties but before an init callback like InitializingBean's afterPropertiesSet or a custom init-method.
setBeanName
in interface BeanNameAware
beanName
- the name of the bean in the factorypublic void afterPropertiesSet()
afterPropertiesSet
in interface InitializingBean
afterPropertiesSet
in class AbstractMessageListenerContainer
AbstractMessageListenerContainer.initialize()
public void initialize()
AbstractMessageListenerContainer
Creates a JMS Connection, registers the
given listener object
,
and starts the Connection
(if "autoStartup"
hasn't been turned off).
initialize
in class AbstractMessageListenerContainer
protected TaskExecutor createDefaultTaskExecutor()
The default implementation builds a SimpleAsyncTaskExecutor
with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
SimpleAsyncTaskExecutor.SimpleAsyncTaskExecutor(String)
protected final boolean sharedConnectionEnabled()
sharedConnectionEnabled
in class AbstractMessageListenerContainer
setCacheLevel(int)
,
CACHE_CONNECTION
protected void registerListener() throws JMSException
registerListener
in class AbstractMessageListenerContainer
JMSException
- if registration failedsetTaskExecutor(org.springframework.core.task.TaskExecutor)
protected void doRescheduleTask(Object task)
doRescheduleTask
in class AbstractMessageListenerContainer
task
- the task object to reschedulesetTaskExecutor(org.springframework.core.task.TaskExecutor)
protected MessageConsumer createListenerConsumer(Session session) throws JMSException
session
- the JMS Session to work on
JMSException
- if thrown by JMS methodsreceiveAndExecute(javax.jms.Session, javax.jms.MessageConsumer)
protected void receiveAndExecute(Session session, MessageConsumer consumer) throws JMSException
session
- the JMS Session to work onconsumer
- the MessageConsumer to work on
JMSException
- if thrown by JMS methodsdoReceiveAndExecute(javax.jms.Session, javax.jms.MessageConsumer, org.springframework.transaction.TransactionStatus)
protected void doReceiveAndExecute(Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException
session
- the JMS Session to work onconsumer
- the MessageConsumer to work onstatus
- the TransactionStatus (may be null
)
JMSException
- if thrown by JMS methodsAbstractMessageListenerContainer.doExecuteListener(javax.jms.Session, javax.jms.Message)
protected Message receiveMessage(MessageConsumer consumer) throws JMSException
consumer
- the MessageConsumer to use
null
if none
JMSException
- if thrown by JMS methodsprotected void establishSharedConnection()
establishSharedConnection
in class AbstractMessageListenerContainer
refreshConnectionUntilSuccessful()
protected void startSharedConnection()
Connection.start()
, relying on listeners to perform
appropriate recovery.
startSharedConnection
in class AbstractMessageListenerContainer
Connection.start()
protected void stopSharedConnection()
Connection.stop()
, relying on listeners to perform
appropriate recovery after a restart.
stopSharedConnection
in class AbstractMessageListenerContainer
Connection.start()
protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered)
The default implementation logs the exception at error level if not recovered yet, and at debug level if already recovered. Can be overridden in subclasses.
ex
- the exception to handlealreadyRecovered
- whether a previously executing listener
already recovered from the present listener setup failure
(this usually indicates a follow-up failure than be ignored
other than for debug log purposes)recoverAfterListenerSetupFailure()
protected void recoverAfterListenerSetupFailure()
The default implementation delegates to DefaultMessageListenerContainer's
recovery-capable refreshConnectionUntilSuccessful
method, which will try
to reestablish a Connection to the JMS provider both for the shared
and the non-shared Connection case.
refreshConnectionUntilSuccessful()
,
refreshDestination()
protected void refreshConnectionUntilSuccessful()
The default implementation retries until it successfully established a Connection, for as long as this message listener container is active. Applies the specified recovery interval between retries.
setRecoveryInterval(long)
protected void refreshDestination()
Called after listener setup failure, assuming that a cached Destination object might have become invalid (a typical case on WebLogic JMS).
The default implementation removes the destination from a DestinationResolver's cache, in case of a CachingDestinationResolver.
AbstractMessageListenerContainer.setDestinationName(java.lang.String)
,
CachingDestinationResolver
protected void sleepInbetweenRecoveryAttempts()
protected void destroyListener() throws JMSException
destroyListener
in class AbstractMessageListenerContainer
JMSException
- if destruction failedprotected Connection getConnection(JmsResourceHolder holder)
This implementation accepts any JMS 1.1 Connection.
holder
- the JmsResourceHolder
null
if none foundprotected Session getSession(JmsResourceHolder holder)
This implementation accepts any JMS 1.1 Session.
holder
- the JmsResourceHolder
null
if none foundprotected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException
This implementation uses JMS 1.1 API.
session
- the JMS Session to create a MessageConsumer fordestination
- the JMS Destination to create a MessageConsumer for
JMSException
- if thrown by JMS API methods
|
The Spring Framework | |||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |