The Aggregation API consists of a number of classes:
The base class AbstractMessageAggregator
and its
subclass MethodInvokingMessageAggregator
The CompletionStrategy
interface and its default
implementation SequenceSizeCompletionStrategy
The AbstractMessageAggregator
is a
MessageConsumer
implementation, encapsulating the common
functionalities of an Aggregator, which are: storing messages until the
message sequence to aggregate is complete (and grouping them according to
their CORRELATION_ID), and implementing the timeout functionality. The
responsibility of deciding whether the message sequence is complete is
delegated to a CompletionStrategy
instance.
A brief highlight of the base AbstractMessageAggregator
(the responsibility of implementing the aggregateMessages method is left
to the developer):
public abstract class AbstractMessageAggregator extends AbstractMessageBarrierConsumer { private volatile CompletionStrategy completionStrategy = new SequenceSizeCompletionStrategy(); .... protected abstract Message<?> aggregateMessages(List<Message<?>> messages); }
For implementing a specific aggregator object for an application, a
developer can extend AbstractMessageAggregator
and implement
the aggregateMessages
method. However, there are better
suited (which reads, less coupled to the API) solutions for implementing
the aggregation logic, which can be configured easily either through XML
or through annotations.
In general, any ordinary Java class (i.e. POJO) can implement the aggregation algorithm. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well). This method will be invoked for aggregating messages, as follows:
if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated for aggregation will be sent to the aggregator
if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
if the return type is not assignable to Message, then it will be treated as the payload for a Message that will be created automatically by the framework.
Note | |
---|---|
In the interest of code simplicity, and promoting best practices such as low coupling, testability, etc., the preferred way of implementing the aggregation logic is through a POJO, and using the XML or annotation support for setting it up in the application. |
The CompletionStrategy
interface is defined as
follows:
public interface CompletionStrategy { boolean isComplete(List<Message<?>> messages); }
In general, any ordinary Java class (i.e. POJO) can implement the completion decision mechanism. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well), and returns a boolean value. This method will be invoked after the arrival of a new message, to decide whether the group is complete or not, as follows:
if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated in the group will be sent to the method
if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
the method must return true if the message group is complete and ready for aggregation, and false otherwise.
Spring Integration provides an out-of-the box implementation for
CompletionStrategy
, the
SequenceSizeCompletionStrategy
This implementation uses the
SEQUENCE_NUMBER and SEQUENCE_SIZE of the arriving messages for deciding
when a message group is complete and ready to be
aggregated.