SHDP provides basic support for Cascading library through the
  spring-cascading sub-project. All support
  is provided in the
  org.springframework.data.hadoop.cascading package - one
  can create Flows or Cascades, either
  through XML or/and Java and execute them, either in a simplistic manner or
  as part of a Spring Batch job. In addition, dedicated
  Taps for Spring environments are available.
As Cascading is aimed at code configuration, typically one would
  configure the library programatically. Such code can easily be integrated
  into Spring in various ways - through 
      factory
      methods
     or @Configuration and
  @Bean (see this
  chapter  for more information). In short one uses Java code (or any
  JVM language for that matter) to create beans.
The Cascading support provides a dedicated namespace in addition to the regular namespace for Spring for Apache Hadoop. You can include this namespace using the following declaration:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:casc="
http://www.springframework.org/schema/cascading" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd http://www.springframework.org/schema/cascading http://www.springframework.org/schema/cascading/spring-cascading.xsd
"> <bean id ... >
<casc:cascading-runner id="runner" ...> </beans>
| Spring for Cascading namespace prefix. Any name can do but
      throughout the reference documentation,  | |
| The namespace URI. | |
| The namespace URI location. Note that even though the location points to an external address (which exists and is valid), Spring will resolve the schema locally as it is included in the Spring Cascading library. | |
| Declaration example for the Cascading namespace. Notice the prefix usage. | 
For example, looking at the official Cascading sample (Cascading for the Impatient, Part2) one can simply call the Cascading setup method from within the Spring container (original vs updated):
public class Impatient { public static FlowDef createFlowDef(String docPath, String wcPath) { // create source and sink taps Tap docTap = new Hfs(new TextDelimited(true, "\t"), docPath); Tap wcTap = new Hfs(new TextDelimited(true, "\t"), wcPath); // specify a regex operation to split the "document" text lines into a token stream Fields token = new Fields("token"); Fields text = new Fields("text"); RegexSplitGenerator splitter = new RegexSplitGenerator(token, "[ \\[\\]\\(\\),.]"); // only returns "token" Pipe docPipe = new Each("token", text, splitter, Fields.RESULTS); // determine the word counts Pipe wcPipe = new Pipe("wc", docPipe); wcPipe = new GroupBy(wcPipe, token); wcPipe = new Every(wcPipe, Fields.ALL, new Count(), Fields.ALL); // connect the taps, pipes, etc., into a flow FlowDef flowDef = FlowDef.flowDef().setName("wc").addSource(docPipe, docTap).addTailSink(wcPipe, wcTap); return flowDef; } }
The entire Cascading configuration (defining the
  Flow) is encapsulated within one method, which can be
  called by the container:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:casc="http://www.springframework.org/schema/cascading" xmlns:c="http://www.springframework.org/schema/c" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd http://www.springframework.org/schema/cascading http://www.springframework.org/schema/cascading/spring-cascading.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- factory-method approach called with two parameters available as property placeholders --> <bean id="flowDef" class="impatient.Main" factory-method="createFlowDef" c:_0="${in}" c:_1="${out}"/> <casc:cascading-flow id="wc" definition-ref="flowDef" write-dot="dot/wc.dot"/> <casc:cascading-cascade id="cascade" flow-ref="wc"/> <casc:cascading-runner unit-of-work-ref="cascade" run-at-startup="true"/> </beans>
Note that no jar needs to be setup - the Cascading namespace (in
  particular cascading-flow, backed by
  HadoopFlowFactoryBean) tries to automatically setup the
  resulting job classpath. By default, it will automatically add the Cascading
  library and its dependency to Hadoop DistributedCache so
  that when the job runs inside the Hadoop cluster, the jars are properly
  found. When using custom jars (for example to add custom Cascading
  functions) or when running against a cluster that is already provisioned,
  one can customize this behaviour through the jar-setup,
  jar and jar-by-class. For Cascading
  users, these settings are the equivalent of the
  AppProps.setApplicationJarClass().
Furthermore, one can break down the configuration method in multiple
  pieces which is useful for reusing the components between multiple
  flows/cascades. This goes hand in hand with Spring
  @Configuration feature - see the example below that
  configures a Cascade pipes and taps as individual beans (see the original
  example):
@Configuration public class CascadingAnalysisConfig { // fields that act as placeholders for externalized values @Value("${cascade.sec}") private String sec; @Value("${cascade.min}") private String min; @Bean public Pipe tsPipe() { DateParser dateParser = new DateParser(new Fields("ts"), "dd/MMM/yyyy:HH:mm:ss Z"); return new Each("arrival rate", new Fields("time"), dateParser); } @Bean public Pipe tsCountPipe() { Pipe tsCountPipe = new Pipe("tsCount", tsPipe()); tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts")); return new Every(tsCountPipe, Fields.GROUP, new Count()); } @Bean public Pipe tmCountPipe() { Pipe tmPipe = new Each(tsPipe(), new ExpressionFunction(new Fields("tm"), "ts - (ts % (60 * 1000))", long.class)); Pipe tmCountPipe = new Pipe("tmCount", tmPipe); tmCountPipe = new GroupBy(tmCountPipe, new Fields("tm")); return new Every(tmCountPipe, Fields.GROUP, new Count()); } @Bean public Map<String, Tap> sinks(){ Tap tsSinkTap = new Hfs(new TextLine(), sec); Tap tmSinkTap = new Hfs(new TextLine(), min); return Cascades.tapsMap(Pipe.pipes(tsCountPipe(), tmCountPipe()), Tap.taps(tsSinkTap, tmSinkTap)); } @Bean public String regex() { return "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$"; } @Bean public Fields fields() { return new Fields("ip", "time", "method", "event", "status", "size"); } }
The class above creates several objects (all part of the Cascading package) (named after the methods) which can be injected or wired just like any other bean (notice how the wiring is done between the beans by point to their methods). One can mix and match (if needed) code and XML configurations inside the same application:
<!-- code configuration class --> <bean class="org.springframework.data.hadoop.cascading.CascadingAnalysisConfig"/> <!-- Tap created through XML rather then code (using Spring's 3.1 c: namespace)--> <bean id="tap" class="cascading.tap.hadoop.Hfs" c:fields-ref="fields" c:string-path-value="${cascade.input}"/> <!-- standard bean declaration used to showcase the container flexibility --> <!-- note the tap and sinks are imported from the CascadingAnalysisConfig bean --> <bean id="analysisFlow" class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean" p:configuration-ref="hadoopConfiguration" p:source-ref="tap" p:sinks-ref="sinks"> <property name="tails"><list> <ref bean="tsCountPipe"/> <ref bean="tmCountPipe"/> </list></property> </bean> </list></property> </bean> <casc:cascading-cascade flow="analysisFlow" /> <casc:cascading-runner unit-of-work-ref="cascade" run-at-startup="true"/>
The XML above, whose main purpose is to illustrate possible ways of
  configuring, uses SHDP classes to create a Cascade with
  one nested Flow using the taps and sinks configured by
  the code class. Additionally it also shows how the cascade is ran (through
  cascading-runner). The runner will trigger the execution
  during the application start-up (notice the
  run-at-startup flag which is by default
  false). Do note that the runner will not run unless
  triggered manually or if run-at-startup is set to
  true. Additionally the runner (as in fact do all runners in SHDP) allows one or multiple
  pre and post actions to be specified
  to be executed before and after each run. Typically other runners (such as
  other jobs or scripts) can be specified but any JDK
  Callable can be passed in. For more information on
  runners, see the dedicated chapter.
Whether XML or Java config is better is up to the user and is usually
  based on the type of the configuration required. Java config suits Cascading
  better but note that the FactoryBeans above handle the
  lifecycle and some default configuration for both the
  Flow and Cascade objects. Either way,
  whatever option is used, SHDP fully supports it.
For Spring Batch environments, SHDP provides a dedicated tasklet
    (similar to CascadeRunner above) for executing
    Cascade or Flow instances, on
    demand, as part of a batch or workflow. The declaration is pretty
    straightforward:
<casc:tasklet p:unit-of-work-ref="cascade" />
There are quite a number of DSLs built on top of Cascading, most noteably Cascalog (written in Clojure) and Scalding (written in Scala). This documentation will cover Scalding however the same concepts can be applied across the board to all DSLs.
As with the rest of the DSLs, Scalding offers a simplified, fluent
    syntax for creating units of code that build on top of Cascading. This in
    turn translates to Map Reduce jobs that get executed on Hadoop. Once
    compiled, the DSL gets translated into actual JVM classes that get
    executed by Scalding through its own Tool instance
    (namely com.twitter.scalding.Tool). One has the
    option of either deploy the Scalding jobs directly (by invoking the
    aforementioned Tool) or use Scalding's
    scald.rb script which does the same thing based on the
    various attributes passed to it. Both approaches can be used in SHDP, the
    former through the Tool support
    (described below) and the latter by invoking the
    scald.rb script directly through the scripting feature.
For example, to run the tutorial examples (say Tutorial1), one can issue the following command:
scripts/scald.rb --local tutorial/Tutorial1.scala
which compiles Tutorial1, creates a bundled jar and runs it on a
    local Hadoop instance. When using the Tool support, the
    compilation and the library provisioning are external tasks (just as in
    the case of typical Hadoop jobs). The SHDP configuration to run the
    tutorial looks as follows:
<!-- the tool automatically is injected with 'hadoopConfiguration' --> <hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool"> <hdp:arg value="tutorial/Tutorial1"/> <hdp:arg value="--local"/> </hdp:tool-runner>
Besides dedicated configuration support, SHDP also provides
    read-only Tap implementations
    useful inside Spring environments. Currently they are meant for
    local use only such as testing or single-node Hadoop
    setups.
The Taps in
    org.springframework.data.hadoop.cascading.tap.local tap
    (pun intended) into the rich resource support from Spring Framework and
    Spring Integration allowing data to flow easily in and out of a Cascading
    flow.
Below is a list of the type of Taps available and
    their backing support.
Table 8.1. Local Taps
| Tap Name | Tap Type | Backing Resource | Resource Description | 
|---|---|---|---|
| ResourceTap | Source | Spring 
                Resource
               | classpath, file-system, URL-based or even in-memory content | 
| MessageSourceTap | Source | Spring Integration MessageSource | Inbound adapter for anything from arbitrary streams, FTP or JDBC to RSS/Atom and Twitter | 
| MessageHandlerTap | Sink | Spring Integration MessageHandler | The opposite of MessageSourceTap:
            Outbound adapter for Files, JMS, TCP, etc... | 
Note the Taps do not require any special
    configuration and are fully compatible with the existing Cascading local
    Schemes. To wit:
<bean id="cp-txt-files" class="org.springframework.data.hadoop.cascading.tap.local.ResourceTap"> <constructor-arg><bean class="cascading.scheme.local.TextLine"/></constructor-arg> <constructor-arg><value>classpath:/data/*.txt</value></constructor-arg> </bean>
The Tap above reads all the text files in the
    classpath, under data folder, through Cascading
    TextLine. Simply wire that to a Cascading flow (as
    described in the previous section) and you are good to go.