Splitter

The splitter is a component whose role is to partition a message into several parts and send the resulting messages to be processed independently. Very often, they are upstream producers in a pipeline that includes an aggregator.

Programming Model

The API for performing splitting consists of one base class, AbstractMessageSplitter. It is a MessageHandler implementation that encapsulates features common to splitters, such as filling in the appropriate message headers (CORRELATION_ID, SEQUENCE_SIZE, and SEQUENCE_NUMBER) on the messages that are produced. This filling enables tracking down the messages and the results of their processing (in a typical scenario, these headers get copied to the messages that are produced by the various transforming endpoints). The values can then be used, for example, by a composed message processor.

The following example shows an excerpt from AbstractMessageSplitter:

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

To implement a specific splitter in an application, you can extend AbstractMessageSplitter and implement the splitMessage method, which contains logic for splitting the messages. The return value can be one of the following:

  • A Collection or an array of messages or an Iterable (or Iterator) that iterates over messages. In this case, the messages are sent as messages (after the CORRELATION_ID, SEQUENCE_SIZE and SEQUENCE_NUMBER are populated). Using this approach gives you more control — for example, to populate custom message headers as part of the splitting process.

  • A Collection or an array of non-message objects or an Iterable (or Iterator) that iterates over non-message objects. It works like the prior case, except that each collection element is used as a message payload. Using this approach lets you focus on the domain objects without having to consider the messaging system and produces code that is easier to test.

  • a Message or non-message object (but not a collection or an array). It works like the previous cases, except that a single message is sent out.

In Spring Integration, any POJO can implement the splitting algorithm, provided that it defines a method that accepts a single argument and has a return value. In this case, the return value of the method is interpreted as described earlier. The input argument might either be a Message or a simple POJO. In the latter case, the splitter receives the payload of the incoming message. We recommend this approach, because it decouples the code from the Spring Integration API and is typically easier to test.

Iterators

Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split. Note, in the case of an Iterator (or Iterable), we don’t have access to the number of underlying items and the SEQUENCE_SIZE header is set to 0. This means that the default SequenceSizeReleaseStrategy of an <aggregator> won’t work and the group for the CORRELATION_ID from the splitter won’t be released; it will remain as incomplete. In this case you should use an appropriate custom ReleaseStrategy or rely on send-partial-result-on-expiry together with group-timeout or a MessageGroupStoreReaper.

Starting with version 5.0, the AbstractMessageSplitter provides protected obtainSizeIfPossible() methods to allow the determination of the size of the Iterable and Iterator objects if that is possible. For example XPathMessageSplitter can determine the size of the underlying NodeList object. And starting with version 5.0.9, this method also properly returns a size of the com.fasterxml.jackson.core.TreeNode.

An Iterator object is useful to avoid the need for building an entire collection in the memory before splitting. For example, when underlying items are populated from some external system (e.g. DataBase or FTP MGET) using iterations or streams.

Stream and Flux

Starting with version 5.0, the AbstractMessageSplitter supports the Java Stream and Reactive Streams Publisher types for the value to split. In this case, the target Iterator is built on their iteration functionality.

In addition, if the splitter’s output channel is an instance of a ReactiveStreamsSubscribableChannel, the AbstractMessageSplitter produces a Flux result instead of an Iterator, and the output channel is subscribed to this Flux for back-pressure-based splitting on downstream flow demand.

Starting with version 5.2, the splitter supports a discardChannel option for sending those request messages for which a split function has returned an empty container (collection, array, stream, Flux etc.). In this case there is just no item to iterate for sending to the outputChannel. The null splitting result remains as an end of flow indicator.

Configuring a Splitter with Java, Groovy and Kotlin DSLs

An example of simple splitter based on a Message and its iterable payload with DSL configuration:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

@Bean
public IntegrationFlow someFlow() {
    return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
    integrationFlow {
        split<Message<*>> { it.payload }
    }
@Bean
someFlow() {
    integrationFlow {
        splitWith {
		    expectedType Message<?>
		    function { it.payload }
        }
    }
}

See more information about the DSLs in the respective chapters:

Configuring a Splitter with XML

A splitter can be configured through XML as follows:

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           (1)
  ref="splitterBean"                  (2)
  method="split"                      (3)
  input-channel="inputChannel"        (4)
  output-channel="outputChannel"      (5)
  discard-channel="discardChannel" /> (6)

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 The ID of the splitter is optional.
2 A reference to a bean defined in the application context. The bean must implement the splitting logic, as described in the earlier section. Optional. If a reference to a bean is not provided, it is assumed that the payload of the message that arrived on the input-channel is an implementation of java.util.Collection and the default splitting logic is applied to the collection, incorporating each individual element into a message and sending it to the output-channel.
3 The method (defined on the bean) that implements the splitting logic. Optional.
4 The input channel of the splitter. Required.
5 The channel to which the splitter sends the results of splitting the incoming message. Optional (because incoming messages can specify a reply channel themselves).
6 The channel to which the request message is sent in case of empty splitting result. Optional (they will stop as in case of null result).

We recommend using a ref attribute if the custom splitter implementation can be referenced in other <splitter> definitions. However, if the custom splitter handler implementation should be scoped to a single definition of the <splitter>, you can configure an inner bean definition, as the following example follows:

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
Using both a ref attribute and an inner handler definition in the same <int:splitter> configuration is not allowed, as it creates an ambiguous condition and results in an exception being thrown.
If the ref attribute references a bean that extends AbstractMessageProducingHandler (such as splitters provided by the framework itself), the configuration is optimized by injecting the output channel into the handler directly. In this case, each ref must be a separate bean instance (or a prototype-scoped bean) or use the inner <bean/> configuration type. However, this optimization applies only if you do not provide any splitter-specific attributes in the splitter XML definition. If you inadvertently reference the same message handler from multiple beans, you get a configuration exception.

Configuring a Splitter with Annotations

The @Splitter annotation is applicable to methods that expect either the Message type or the message payload type, and the return values of the method should be a Collection of any type. If the returned values are not actual Message objects, each item is wrapped in a Message as the payload of the Message. Each resulting Message is sent to the designated output channel for the endpoint on which the @Splitter is defined.

The following example shows how to configure a splitter by using the @Splitter annotation:

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}