org.springframework.integration.aggregator
Class AggregatingMessageHandler

java.lang.Object
  extended by org.springframework.integration.aggregator.AbstractMessageBarrierHandler
      extended by org.springframework.integration.aggregator.AggregatingMessageHandler
All Implemented Interfaces:
org.springframework.beans.factory.InitializingBean, MessageHandler

public class AggregatingMessageHandler
extends AbstractMessageBarrierHandler

An AbstractMessageBarrierHandler that waits for a complete group of Messages to arrive and then delegates to an Aggregator to combine them into a single Message.

The default strategy for determining whether a group is complete is based on the 'sequenceSize' property of the header. Alternatively, a custom implementation of the CompletionStrategy may be provided.

All considerations regarding timeout and grouping by 'correlationId' from AbstractMessageBarrierHandler apply here as well.

Author:
Mark Fisher, Marius Bogoevici

Field Summary
private  Aggregator aggregator
           
private  CompletionStrategy completionStrategy
           
 
Fields inherited from class org.springframework.integration.aggregator.AbstractMessageBarrierHandler
barriers, DEFAULT_REAPER_INTERVAL, DEFAULT_SEND_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY, executor, logger, outputChannel, sendTimeout, trackedCorrelationIds
 
Constructor Summary
AggregatingMessageHandler(Aggregator aggregator)
           
AggregatingMessageHandler(Aggregator aggregator, java.util.concurrent.ScheduledExecutorService executor)
          Create a handler that delegates to the provided aggregator to combine a group of messages into a single message.
 
Method Summary
protected  MessageBarrier createMessageBarrier()
          Factory method for creating a suitable MessageBarrier implementation.
protected  boolean isBarrierRemovable(java.lang.Object correlationId, java.util.List<Message<?>> releasedMessages)
          Implements the logic for deciding whether, based on what the MessageBarrier has released so far, work for the correlationId can be considered done and the barrier can be released.
protected  Message<?>[] processReleasedMessages(java.lang.Object correlationId, java.util.List<Message<?>> messages)
          Implements the logic for transforming the released Messages.
 void setCompletionStrategy(CompletionStrategy completionStrategy)
          Strategy to determine whether the group of messages is complete.
 
Methods inherited from class org.springframework.integration.aggregator.AbstractMessageBarrierHandler
afterPropertiesSet, handle, resolveReplyTargetFromMessage, setDiscardChannel, setOutputChannel, setReaperInterval, setSendPartialResultOnTimeout, setSendTimeout, setTimeout, setTrackedCorrelationIdCapacity
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

aggregator

private final Aggregator aggregator

completionStrategy

private volatile CompletionStrategy completionStrategy
Constructor Detail

AggregatingMessageHandler

public AggregatingMessageHandler(Aggregator aggregator,
                                 java.util.concurrent.ScheduledExecutorService executor)
Create a handler that delegates to the provided aggregator to combine a group of messages into a single message. The executor will be used for scheduling a background maintenance thread. If null, a new single-threaded executor will be created.


AggregatingMessageHandler

public AggregatingMessageHandler(Aggregator aggregator)
Method Detail

setCompletionStrategy

public void setCompletionStrategy(CompletionStrategy completionStrategy)
Strategy to determine whether the group of messages is complete.


createMessageBarrier

protected MessageBarrier createMessageBarrier()
Description copied from class: AbstractMessageBarrierHandler
Factory method for creating a suitable MessageBarrier implementation.

Specified by:
createMessageBarrier in class AbstractMessageBarrierHandler

isBarrierRemovable

protected boolean isBarrierRemovable(java.lang.Object correlationId,
                                     java.util.List<Message<?>> releasedMessages)
Description copied from class: AbstractMessageBarrierHandler
Implements the logic for deciding whether, based on what the MessageBarrier has released so far, work for the correlationId can be considered done and the barrier can be released.

Specified by:
isBarrierRemovable in class AbstractMessageBarrierHandler

processReleasedMessages

protected Message<?>[] processReleasedMessages(java.lang.Object correlationId,
                                               java.util.List<Message<?>> messages)
Description copied from class: AbstractMessageBarrierHandler
Implements the logic for transforming the released Messages.

Specified by:
processReleasedMessages in class AbstractMessageBarrierHandler