8. Cascading integration

SHDP provides basic support for Cascading library through the org.springframework.data.hadoop.cascading package - one can create Flows or Cascades, either through XML or/and Java and execute them, either in a simplistic manner or as part of a Spring Batch job. In addition, dedicated Taps for Spring environments are available.

As Cascading is aimed at code configuration, typically one would configure the library programatically. Such code can easily be integrated into Spring in various ways - through factory methods or @Configuration and @Bean (see this chapter for more information). In short one use Java code (or any JVM language for that matter) to create beans.

For example, looking at the official Cascading sample (Cascading for the Impatient, Part2) one can simply call the Cascading setup method from within the Spring container ( original vs updated):

public class Impatient {
    public static FlowDef createFlowDef(String docPath, String wcPath) {
        // create source and sink taps
        Tap docTap = new Hfs(new TextDelimited(true, "\t"), docPath);
        Tap wcTap = new Hfs(new TextDelimited(true, "\t"), wcPath);

        // specify a regex operation to split the "document" text lines into a token stream
        Fields token = new Fields("token");
        Fields text = new Fields("text");
        RegexSplitGenerator splitter = new RegexSplitGenerator(token, "[ \\[\\]\\(\\),.]");
        // only returns "token"
        Pipe docPipe = new Each("token", text, splitter, Fields.RESULTS);

        // determine the word counts
        Pipe wcPipe = new Pipe("wc", docPipe);
        wcPipe = new GroupBy(wcPipe, token);
        wcPipe = new Every(wcPipe, Fields.ALL, new Count(), Fields.ALL);

        // connect the taps, pipes, etc., into a flow
        FlowDef flowDef = FlowDef.flowDef().setName("wc").addSource(docPipe, docTap).addTailSink(wcPipe, wcTap);
        return flowDef; }
}

The entire Cascading configuration (defining the Flow) is encapsulated within one method, which can be called by the container:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:hdp="http://www.springframework.org/schema/hadoop"
    xmlns:c="http://www.springframework.org/schema/c"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"
    
    <!-- factory-method approach called with two parameters available as property placeholders -->
    <bean id="flowDef" class="impatient.Main" factory-method="createFlowDef" c:_0="${in}" c:_1="${out}"/>

    <hdp:cascading-flow id="wc" definition-ref="flowDef" write-dot="dot/wc.dot"/>
    <hdp:cascading-cascade id="cascade" flow-ref="wc"/>
    <hdp:cascading-runner unit-of-work-ref="cascade" run-at-startup="true"/>
</beans>

Note that no jar needs to be setup - the Cascading namespace (in particular cascading-flow, backed by HadoopFlowFactoryBean) tries to automatically setup the resulting job classpath. By default, it will automatically add the Cascading library and its dependency to Hadoop DistributedCache so that when the job runs inside the Hadoop cluster, the jars are properly found. When using custom jars (for example to add custom Cascading functions) that already include Cascading or when running against a cluster that is already provisioned, one can customize this behaviour through the add-cascading-jars, jar and jar-by-class attributes. For Cascading users, these settings are the equivalent of the AppProps.setApplicationJarClass().

Further more, one can break-down the configuration method in multiple pieces which is useful for reusing the components between multiple flows/cascades. This goes hand in hand with Spring @Configuration feature - see the example below that configures a Cascade pipes and taps as individual beans (see the original example):

@Configuration
public class CascadingAnalysisConfig {
    // fields that act as placeholders for externalized values
    @Value("${cascade.sec}") private String sec;
    @Value("${cascade.min}") private String min;
    
    @Bean public Pipe tsPipe() {
        DateParser dateParser = new DateParser(new Fields("ts"), "dd/MMM/yyyy:HH:mm:ss Z");
        return new Each("arrival rate", new Fields("time"), dateParser);
    }

    @Bean public Pipe tsCountPipe() {
        Pipe tsCountPipe = new Pipe("tsCount", tsPipe());
        tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts"));
        return new Every(tsCountPipe, Fields.GROUP, new Count());
    }

    @Bean public Pipe tmCountPipe() {
        Pipe tmPipe = new Each(tsPipe(),
                new ExpressionFunction(new Fields("tm"), "ts - (ts % (60 * 1000))", long.class));
        Pipe tmCountPipe = new Pipe("tmCount", tmPipe);
        tmCountPipe = new GroupBy(tmCountPipe, new Fields("tm"));
        return new Every(tmCountPipe, Fields.GROUP, new Count());
    }

    @Bean public Map<String, Tap> sinks(){
        Tap tsSinkTap = new Hfs(new TextLine(), sec);
        Tap tmSinkTap = new Hfs(new TextLine(), min);
        return Cascades.tapsMap(Pipe.pipes(tsCountPipe(), tmCountPipe()), Tap.taps(tsSinkTap, tmSinkTap));
    }

    @Bean public String regex() {
        return "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$";
    }
    
    @Bean public Fields fields() {
        return new Fields("ip", "time", "method", "event", "status", "size");
    }
}

The class above creates several objects (all part of the Cascading package) (named after the methods) which can be injected or wired just like any other bean (notice how the wiring is done between the beans by point to their methods). One can mix and match (if needed) code and XML configurations inside the same application:

<!-- code configuration class -->
<bean class="org.springframework.data.hadoop.cascading.CascadingAnalysisConfig"/>

<!-- Tap created through XML rather then code (using Spring's 3.1 c: namespace)-->
<bean id="tap" class="cascading.tap.hadoop.Hfs" c:fields-ref="fields" c:string-path-value="${cascade.input}"/>

<!-- standard bean declaration used to showcase the container flexibility -->
<!-- note the tap and sinks are imported from the CascadingAnalysisConfig bean -->
<bean id="analysisFlow" class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean" p:configuration-ref="hadoopConfiguration" p:source-ref="tap" p:sinks-ref="sinks">
     <property name="tails"><list>
         <ref bean="tsCountPipe"/>
         <ref bean="tmCountPipe"/>
     </list></property>
    </bean>
  </list></property>
</bean>

<hdp:cascading-cascade flow="analysisFlow" />
<hdp:cascading-runner unit-of-work-ref="cascade" run-at-startup="true"/>

The XML above, whose main purpose is to illustrate possible ways of configuring, uses SHDP classes to create a Cascade with one nested Flow using the taps and sinks configured by the code class. Additionally it also shows how the cascade is ran (through cascading-runner). The runner will trigger the execution during the application start-up (notice the run-at-startup flag which is by default false). Do note that the runner will not run unless triggered manually or if run-at-startup is set to true. Additionally the runner (as in fact do all runners in SHDP) allows one or multiple pre and post actions to be specified to be executed before and after each run. Typically other runners (such as other jobs or scripts) can be specified but any JDK Callable can be passed in. For more information on runners, see the dedicated chapter.

Whether XML or Java config is better is up to the user and is usually based on the type of the configuration required. Java config suits Cascading better but note that the FactoryBeans above handle the life-cycle and some default configuration for both the Flow and Cascade object. Either way, whatever option is used, SHDP fully supports it.

8.1 Using the Cascading tasklet

For Spring Batch environments, SHDP provides a dedicated tasklet (similar to CascadeRunner above) for executing Cascade or Flow instances, on demand, as part of a batch or workflow. The declaration is pretty straight forward:

<hdp:tasklet p:unit-of-work-ref="cascade" />

8.2 Using Scalding

There are quite a number of DSLs built on top of Cascading, most noteably Cascalog (written in Clojure) and Scalding (written in Scala). This documentation will cover Scalding however the same concepts can be applied across the board to the DSLs.

As with the rest of the DSLs, Scalding offers a simplified, fluent syntax for creating units of code that built on top of Cascading. This in turn translate to Map Reduce jobs that get executed on Hadoop. Once compiled, the DSL gets translated into actual JVM classes that get executed by Scalding through its own Tool instance (namely com.twitter.scalding.Tool). One has the option or either deploy the Scalding jobs directly (by invoking the aforementioned Tool) or use Scalding's scald.rb script which does the same thing based on the various attributes passed to it. Both approaches can be used in SHDP, the former through the Tool support (described below) and the latter by invoking the scald.rb script directly through the scripting feature.

For example, to run the tutorial examples (say Tutorial1), one can issue the following command:

scripts/scald.rb --local tutorial/Tutorial1.scala

which compiles Tutorial1, creates a bundled jar and runs it on a local Hadoop instance. When using the Tool support, the compilation and the library provisioning are external tasks (just as in the case of typical Hadoop jobs). The SHDP configuration to run the tutorial looks as follows:

<!-- the tool automatically is injected with 'hadoopConfiguration' -->
<hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool">
   <hdp:arg value="tutorial/Tutorial1"/>
   <hdp:arg value="--local"/>
</hdp:tool-runner>

8.3 Spring-specific local Taps

Besides dedicated configuration support, SHDP also provides read-only Tap implementations useful inside Spring environments. Currently they are meant for local use only such as testing or single-node Hadoop setups.

The Taps in org.springframework.data.hadoop.cascading.tap.local tap (pun intended) into the rich resource support from Spring Framework and Spring Integration allowing data to flow easily in and out of a Cascading flow.

Below is a list of the type of Taps available and their backing support.

Table 8.1. Local Taps

Tap NameTap TypeBacking ResourceResource Description
ResourceTapSourceSpring Resourceclasspath, file-system, URL-based or even in-memory content
MessageSourceTapSourceSpring Integration MessageSourceInbound adapter for anything from arbitrary streams, FTP or JDBC to RSS/Atom and Twitter
MessageHandlerTapSinkSpring Integration MessageHandlerThe opposite of MessageSourceTap: Outbound adapter for Files, JMS, TCP, etc...

Note the Taps do not require any special configuration and are fully compatible with the existing Cascading local Schemes. To wit:

<bean id="cp-txt-files" class="org.springframework.data.hadoop.cascading.tap.local.ResourceTap">
	<constructor-arg><bean class="cascading.scheme.local.TextLine"/></constructor-arg>
	<constructor-arg><value>classpath:/data/*.txt</value></constructor-arg>
</bean>

The Tap above, reads all the text files in the classpath, under data folder, through Cascading TextLine. Simply wire that to a Cascading flow (as described in the previous section) and you are good to go.