Appendix A. Spring Integration Samples

[Note]Note
Starting with the current release of Spring Integration the samples are distributed as independent Maven-based projects (http://maven.apache.org/) to minimize the setup time. Since each project is also an Eclipse-based project, they can be imported as is using the Eclipse Import wizard. If you prefer another IDE, configuration should be very trivial, since a special Maven profile was setup to download all of the required dependencies for all samples. Detailed instructions on how to build and run the samples are provided in the README.txt file located in the samples directory of the main distribution.

A.1 The Cafe Sample

In this section, we will review a sample application that is included in the Spring Integration distribution. This sample is inspired by one of the samples featured in Gregor Hohpe's Ramblings.

The domain is that of a Cafe, and the basic flow is depicted in the following diagram:

The Order object may contain multiple OrderItems. Once the order is placed, a Splitter will break the composite order message into a single message per drink. Each of these is then processed by a Router that determines whether the drink is hot or cold (checking the OrderItem object's 'isIced' property). The Barista prepares each drink, but hot and cold drink preparation are handled by two distinct methods: 'prepareHotDrink' and 'prepareColdDrink'. The prepared drinks are then sent to the Waiter where they are aggregated into a Delivery object.

Here is the XML configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xmlns:stream="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
            http://www.springframework.org/schema/integration/stream
            http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd">

    <gateway id="cafe" service-interface="org.springframework.integration.samples.cafe.Cafe"/>

    <channel id="orders"/>
    <splitter input-channel="orders" ref="orderSplitter" method="split" output-channel="drinks"/>

    <channel id="drinks"/>
    <router input-channel="drinks" ref="drinkRouter" method="resolveOrderItemChannel"/>

    <channel id="coldDrinks">
        <queue capacity="10"/>
    </channel>
    <service-activator input-channel="coldDrinks" ref="barista"
                       method="prepareColdDrink" output-channel="preparedDrinks"/>

    <channel id="hotDrinks">
        <queue capacity="10"/>
    </channel>
    <service-activator input-channel="hotDrinks" ref="barista"
                       method="prepareHotDrink" output-channel="preparedDrinks"/>

    <channel id="preparedDrinks"/>
    <aggregator input-channel="preparedDrinks" ref="waiter"
                method="prepareDelivery" output-channel="deliveries"/>

    <stream:stdout-channel-adapter id="deliveries"/>

    <beans:bean id="orderSplitter"
                class="org.springframework.integration.samples.cafe.xml.OrderSplitter"/>

    <beans:bean id="drinkRouter"
                class="org.springframework.integration.samples.cafe.xml.DrinkRouter"/>

    <beans:bean id="barista" class="org.springframework.integration.samples.cafe.xml.Barista"/>

    <beans:bean id="waiter" class="org.springframework.integration.samples.cafe.xml.Waiter"/>

    <poller id="poller" default="true">
        <interval-trigger interval="1000"/>
    </poller>

</beans:beans>

As you can see, each Message Endpoint is connected to input and/or output channels. Each endpoint will manage its own Lifecycle (by default endpoints start automatically upon initialization - to prevent that add the "auto-startup" attribute with a value of "false"). Most importantly, notice that the objects are simple POJOs with strongly typed method arguments. For example, here is the Splitter:

public class OrderSplitter {

    public List<OrderItem> split(Order order) {
        return order.getItems();
    }
}

In the case of the Router, the return value does not have to be a MessageChannel instance (although it can be). As you see in this example, a String-value representing the channel name is returned instead.

public class DrinkRouter {

    public String resolveOrderItemChannel(OrderItem orderItem) {
        return (orderItem.isIced()) ? "coldDrinks" : "hotDrinks";
    }
}

Now turning back to the XML, you see that there are two <service-activator> elements. Each of these is delegating to the same Barista instance but different methods: 'prepareHotDrink' or 'prepareColdDrink' corresponding to the two channels where order items have been routed.

public class Barista {

    private long hotDrinkDelay = 5000;
    private long coldDrinkDelay = 1000;

    private AtomicInteger hotDrinkCounter = new AtomicInteger();
    private AtomicInteger coldDrinkCounter = new AtomicInteger();

    public void setHotDrinkDelay(long hotDrinkDelay) {
        this.hotDrinkDelay = hotDrinkDelay;
    }

    public void setColdDrinkDelay(long coldDrinkDelay) {
        this.coldDrinkDelay = coldDrinkDelay;
    }

    public Drink prepareHotDrink(OrderItem orderItem) {
        try {
            Thread.sleep(this.hotDrinkDelay);
            System.out.println(Thread.currentThread().getName()
                    + " prepared hot drink #" + hotDrinkCounter.incrementAndGet()
                    + " for order #" + orderItem.getOrder().getNumber() + ": " + orderItem);
            return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(),
                    orderItem.isIced(), orderItem.getShots());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    public Drink prepareColdDrink(OrderItem orderItem) {
        try {
            Thread.sleep(this.coldDrinkDelay);
            System.out.println(Thread.currentThread().getName()
                    + " prepared cold drink #" + coldDrinkCounter.incrementAndGet()
                    + " for order #" + orderItem.getOrder().getNumber() + ": " + orderItem);
            return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(),
                    orderItem.isIced(), orderItem.getShots());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

}

As you can see from the code excerpt above, the barista methods have different delays (the hot drinks take 5 times as long to prepare). This simulates work being completed at different rates. When the CafeDemo 'main' method runs, it will loop 100 times sending a single hot drink and a single cold drink each time. It actually sends the messages by invoking the 'placeOrder' method on the Cafe interface. Above, you will see that the <gateway> element is specified in the configuration file. This triggers the creation of a proxy that implements the given 'service-interface' and connects it to a channel. The channel name is provided on the @Gateway annotation of the Cafe interface.

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

Finally, have a look at the main() method of the CafeDemo itself.

public static void main(String[] args) {
    AbstractApplicationContext context = null;
    if (args.length > 0) {
        context = new FileSystemXmlApplicationContext(args);
    }
    else {
        context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
    }
    Cafe cafe = (Cafe) context.getBean("cafe");
    for (int i = 1; i <= 100; i++) {
        Order order = new Order(i);
        order.addItem(DrinkType.LATTE, 2, false);
        order.addItem(DrinkType.MOCHA, 3, true);
        cafe.placeOrder(order);
    }
}

[Tip]Tip
To run this sample as well as 8 others, refer to the README.txt within the "samples" directory of the main distribution as described at the beginning of this chapter.

When you run cafeDemo, you will see that the cold drinks are initially prepared more quickly than the hot drinks. Because there is an aggregator, the cold drinks are effectively limited by the rate of the hot drink preparation. This is to be expected based on their respective delays of 1000 and 5000 milliseconds. However, by configuring a poller with a concurrent task executor, you can dramatically change the results. For example, you could use a thread pool executor with 5 workers for the hot drink barista while keeping the cold drink barista as it is:

<service-activator input-channel="hotDrinks"
                   ref="barista"
                   method="prepareHotDrink"
                   output-channel="preparedDrinks"/>

<service-activator input-channel="hotDrinks"
                   ref="barista"
                   method="prepareHotDrink"
                   output-channel="preparedDrinks">
    <poller task-executor="pool">
        <interval-trigger interval="1000"/>
    </poller>
</service-activator>

<task:executor id="pool" pool-size="5"/>

Also, notice that the worker thread name is displayed with each invocation. You will see that the hot drinks are prepared by the task-executor threads. If you provide a much shorter poller interval (such as 100 milliseconds), then you will notice that occasionally it throttles the input by forcing the task-scheduler (the caller) to invoke the operation.

[Note]Note
In addition to experimenting with the poller's concurrency settings, you can also add the 'transactional' sub-element and then refer to any PlatformTransactionManager instance within the context.

A.2 The XML Messaging Sample

The xml messaging sample in the org.springframework.integration.samples.xml illustrates how to use some of the provided components which deal with xml payloads. The sample uses the idea of processing an order for books represented as xml.

First the order is split into a number of messages, each one representing a single order item using the XPath splitter component.

<si-xml:xpath-splitter id="orderItemSplitter" input-channel="ordersChannel" 
            output-channel="stockCheckerChannel" create-documents="true">
    <si-xml:xpath-expression expression="/orderNs:order/orderNs:orderItem" namespace-map="orderNamespaceMap" />
</si-xml:xpath-splitter>

A service activator is then used to pass the message into a stock checker POJO. The order item document is enriched with information from the stock checker about order item stock level. This enriched order item message is then used to route the message. In the case where the order item is in stock the message is routed to the warehouse. The XPath router makes use of a MapBasedChannelResolver which maps the XPath evaluation result to a channel reference.

<si-xml:xpath-router id="instockRouter" channel-resolver="mapChannelResolver" 
        input-channel="orderRoutingChannel" resolution-required="true">
    <si-xml:xpath-expression expression="/orderNs:orderItem/@in-stock" namespace-map="orderNamespaceMap"  />
</si-xml:xpath-router>
	
<bean id="mapChannelResolver"
    class="org.springframework.integration.channel.MapBasedChannelResolver">
    <property name="channelMap">
        <map>
            <entry key="true" value-ref="warehouseDispatchChannel" />
            <entry key="false" value-ref="outOfStockChannel" />
        </map>
    </property>
</bean>

Where the order item is not in stock the message is transformed using xslt into a format suitable for sending to the supplier.

<si-xml:xslt-transformer input-channel="outOfStockChannel" output-channel="resupplyOrderChannel" 
     xsl-resource="classpath:org/springframework/integration/samples/xml/bigBooksSupplierTransformer.xsl"/>

A.3 The OSGi Samples

This release of Spring Integration includes several samples that are OSGi enabled as well as samples that were specifically designed to show some of the other benefits of OSGi and Spring Integration when used together. First lets look at the two familiar examples that are also configured to be valid OSGi bundles. These are Hello World and Cafe. All you need to do to see these samples work in an OSGi environment is deploy the generated JAR into such an environment.

Use Maven to generate the JAR by executing the 'mvn install' command on either of these projects. This will generate the JAR file in the target directory. Now you can simply drop that JAR file into the deployment directory of your OSGi platform. For example, if you are using SpringSource dm Server, drop the files into the 'pickup' directory within the dm Server home directory.

[Note]Note
Prior to deploying and testing Spring Integration samples in the dm Server or any other OSGi server platform, you must have the Spring Integration and Spring bundles installed on that platform. For example, to install Spring Integration into SpringSource dm Server, copy all JAR files that are located in the 'dist' directory of your Spring Integration distribution into the 'repository/bundles/usr' directory of your dm Server instance (see the dm Server User Guide for more detail on how to install bundles).

The Spring Integration samples require a few other bundles to be installed. For the 1.0.3 release, the full list including transitive dependencies is:

  • org.apache.commons.codec-1.3.0.jar
  • org.apache.commons.collections-3.2.0.jar
  • org.apache.commons.httpclient-3.1.0.jar
  • org.apache.ws.commons.schema-1.3.2.jar
  • org.springframework.oxm-1.5.5.A.jar
  • org.springframework.security-2.0.4.A.jar
  • org.springframework.ws-1.5.5.A.jar
  • org.springframework.xml-1.5.5.A.jar

These are all located within the 'lib' directory of the Spring Integration distribution. So, you can simply copy those JARs into the dm Server 'repository/bundles/usr' directory as well.

[Note]Note
The Spring Framework bundles (aop, beans, context, etc.) are also included in the 'lib' directory of the Spring Integration distribution, but they do not need to be installed since they are already part of the dm Server infrastructure. Also, note that the versions listed above are those included with the Spring Integration 1.0.3 release. Newer versions of individual JARs may be used as long as they are within the range specified in the MANIFEST.MF files of those bundles that depend upon them.

[Tip]Tip
The bundles listed above are appropriate for a SpringSource dm Server 1.0.x deployment environment with a Spring Framework 2.5.x foundation. That is the version against which Spring Integration 1.0.3 has been developed and tested. However, as of the time of the Spring Integration 1.0.3 release, the Spring Framework 3.0 release candidates are about to be available, and the dm Server 2.0.x milestones are available. If you want to try running these samples in that environment, then you will need to replace the Spring Security and Spring Web Services bundles with versions that support Spring 3.0. The OXM functionality is moving into the Spring Framework itself for the 3.0 release. Otherwise, Spring Integration 1.0.3 has been superficially tested against the Spring 3.0 snapshots available at the time of its release. In fact, some internal changes were made in the 1.0.3 release specifically to support Spring 3.0 (whereas 1.0.2 does not). Spring Integration 2.0 will be built upon a Spring 3.0 foundation.

To demonstrate some of the benefits of running Spring Integration projects in an OSGi environment (e.g. modularity, OSGi service dynamics, etc.), we have included a couple new samples that are dedicated to highlighting those benefits. In the 'samples' directory, you will find the following two projects:

  • osgi-inbound (producer bundle)
  • osgi-outbound (consumer bundle)

Unlike the other samples in the distribution, these are not Maven enabled. Instead, we have simply configured them as valid dm Server Bundle projects. That means you can import these projects directly into an STS workspace using the "Existing Projects into Workspace" option from the Eclipse Import wizard. Then, you can take advantage of the STS dm Server tools to deploy them into a SpringSource dm Server instance.

[Note]Note
A simple Ant 'build.xml' file has been included within each of these projects as well. The build files contain a single 'jar' target. Therefore, after these projects have been built within Eclipse/STS, you can generate the bundle (JAR) directly and deploy it manually.

The structure of these projects is very simple, yet the concepts they showcase are quite powerful. The 'osgi-inbound' module enables sending a Message to a Publish-Subscribe Channel using a Spring Integration Gateway proxy. The interesting part, however, is that the Publish-Subscribe Channel is exported as an OSGi service via the <osgi:service/> element. As a result, any other bundles can be developed, deployed, and maintained independently yet still subscribe to that channel.

The 'osgi-outbound' module is an example of such a subscribing consumer bundle. It uses the corresponding <osgi:reference/> element to locate the channel exported by the 'osgi-inbound' bundle. It also contains configuration for a <file:outbound-gateway/> which is a subscriber to that channel and will write the Message content to a file once it arrives. It then sends a response Message with the name of the file and its location.

To make it easy to run, we've exposed a command-line interface where you can type in the command, the message, and the file name to execute the demo. This is exposed through the OSGi console. Likewise, the response that provides the name and location of the resulting file will also be visible within the OSGi console.

To run these samples, make sure your OSGi environment is properly configured to host Spring Integration bundles (as described in the note above). Deploy the producer bundle (osgi-inbound) first, and then deploy the consumer bundle (osgi-outbound). After you have deployed these bundles, open the OSGi console and type the following command:

 osgi> help 

You will see the following amidst the output:

 ---Spring Integration CLI-based OSGi Demo---
         siSend <message> <filename> - send text to be written to a file

As you can see, that describes the command that you will be able to use to send messages. If you are interested in how it is implemented or want to customize message sending logic or even create a new command look at InboundDemoBundleActivator.java in the consumer bundle.

[Tip]Tip
When using the SpringSource Tool Suite, you can open the OSGi console by first opening the dm Server view and then choosing the 'Server Console' tab at the bottom (to open the dm Server view, navigate to the dm Server instance listed in the 'Servers' view and either double-click or hit F3). Alternatively, you can open the OSGi console by connecting to port 2401 via telnet (as long as that is enabled, and for dm Server, it is enabled by default):
 telnet localhost 2401

Now send a message by typing:

 osgi> siSend "Hello World" hello.txt

You will see something similar to the following in the OSGi console:

 Sending message: 'Hello World'
 Message sent and its contents were written to:   
 /usr/local/dm-server/work/tmp/spring-integration-samples/output/hello.txt

[Note]Note
It is not necessary to wrap the message in quotes if it does not contain spaces. Go ahead and open up the file and verify that the message content was written to it.

Let's assume you wanted to change the directory to which the files are written or make any other change to the consumer bundle (osgi-outboud). You only need to update the consumer bundle and not the producer bundle. So, go ahead and change the directory in the 'osgi-outbound.xml' file located within 'src/META-INF/spring' and refresh the consumer bundle.

[Tip]Tip
If using STS to deploy to dm Server, the refresh will happen automatically. If replacing bundles manually, you can issue the command 'refresh n' in the OSGi console (where n would be the ID of the bundle as displayed at any point after issuing the 'ss' command to see the short status output).

You will see that the change takes affect immediately. Not only that, you could even start developing and deploying new bundles that subscribe to the messages produced by the producer bundle the same way as the existing consumer bundle (osgi-outbound) does. With a publish-subscribe-channel any newly deployed bundles would start receiving each Message as well.

[Note]Note
If you also want to modify and refresh the producer bundle, be sure to refresh the consumer bundle afterwards as well. This is necessary because the consumer's subscription must be explicitly re-enabled after the producer's channel disappears. You could alternatively deploy a relatively static bundle that defines channels so that producers and consumers can be completely dynamic without affecting each other at all. In Spring Integration 2.0, we plan to support automatic re-subscription and more through the use of a Control Bus.

That pretty much wraps up this very simple example. Hopefully it has successfully demonstrated the benefits of modularity and OSGi service dynamics while working with Spring Integration. Feel free to experiment by following some of the suggestions mentioned above. For deeper coverage of the applicability of OSGi when used with Spring Integration, read this blog by Spring Integration team member Iwein Fuld.