The MEP supports scheduling your jobs at specific intervals. This example shows how to do this for a Spark program. It assumes that you already have developed a Spark program in Python or Java, as described here.

We recommend using the Oozie workflow system, that is part of the Hadoop ecosystem. Next to automated scheduling of multi-step workflows, it also supports sending mails on failure.

A UI is also available to build and manage your workflows within your browser. In this example we'll explain the setup using only command line tools, that are preconfigured on your MEP virtual machine.

We will use our 'advanced Spark sample' as the basis for our example. You can check out the code with:

git clone https://bitbucket.org/vitotap/python-spark-advanced.git

Defining your workflow

To use Oozie, we need to create an XML file that describes our workflow. We advice to also keep these workflows under version control. In our sample you can find it in the 'workflows' folder. You can just copy this folder into your own project to get started.

This is the workflow.xml that can be found in the sample:

<workflow-app name="SampleWorkflow" xmlns="uri:oozie:workflow:0.5">
 <parameters>
  <property>
   <name>jobTracker</name>
   <value>hacluster:8050</value>
  </property>
  <property>
   <name>nameNode</name>
   <value>hdfs://hacluster</value>
  </property>
  <property>
   <name>oozie.use.system.libpath</name> 
   <value>True</value>
  </property>
  <property>
   <name>output_dir</name>
   <value>.</value> 
  </property>
 </parameters> 
 <start to="ComputeAverage" />
 <action name="ComputeAverage">
   <spark xmlns="uri:oozie:spark-action:0.2">
   <job-tracker>${jobTracker}</job-tracker>
   <name-node>${nameNode}</name-node>
   <configuration>
    <property> <name>mapred.queue.name</name> <value>default</value> </property>
    <property> <name>fs.permissions.umask-mode</name> <value>007</value> </property>
   </configuration>
   <master>yarn</master>
   <mode>cluster</mode>
   <name>Sample workflow</name>
   <jar>subtile_average.py</jar>
   <spark-opts>--num-executors 4 --executor-memory 2G --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/opt/rh/python27/pythonspark27 --conf spark.hadoop.fs.permissions.umask-mode=007 --py-files tiles.zip</spark-opts>
   <arg>${output_dir}</arg>
  </spark>
  <ok to="end" />
  <error to="fail" />
 </action>
 <action name="fail">
  <email xmlns="uri:oozie:email-action:0.2">
   <to>test@mail.com</to> <subject>Sample workflow failed</subject> <body>Sample workflow ${wf:id()} failed: ${wf:errorMessage(wf:lastErrorNode())}</body>
  </email>
  <ok to="kill"/>
  <error to="kill"/>
 </action>
 <kill name="kill">
  <message>Sample workflow failed: ${wf:errorMessage(wf:lastErrorNode())}</message>
 </kill> 
 <end name="end" />
</workflow-app>

This simple workflow consists of 3 important sections:

A list of 'property' elements. Some of them are fixed, others have a default value that can be changed when the job is started. This is the way to parameterize your workflow.
A Spark action element, defining the Spark job to launch. Other types of jobs, such as running a shell script, are supported by different types of actions.
A block that sends a mail upon failure.

Deploying your workflow

Once your workflow is ready, you should store all necessary files in the hdfs filesystem, so that they are accessible when it will be automatically triggered. The end result can be seen here. All of these steps can be carried out in the Hue web application, the next steps use the command line inside a MEP virtual machine.

  1. Log in to ensure that we can access Hadoop tools, so run: kinit
  2. Create a directory for your new workflow: hdfs dfs -mkdir /samples/tile_average
  3. Copy your workflow.xml from the python-spark-advanced directory onto HDFS: hdfs dfs -put workflows/workflow.xml /samples/tile_average
  4. Create a directory for auxiliary data and code, we call this the 'lib' dir: hdfs dfs -mkdir /samples/tile_average/lib
  5. Put our main python script in the lib dir: hdfs dfs -put subtile_average.py /samples/tile_average/lib/
  6. Create a zip containing the code that we need:  zip -r  tiles.zip tiles/  --exclude *.pyc
  7. Copy our code to the lib dir:  hdfs dfs -put tiles.zip /samples/tile_average/lib/
  8. This zip file is referenced inside the workflow.xml file, so make sure the 'spark-opts' entry is correct.

Now we are ready to run our workflow.

Running

To run a workflow, Oozie allows you to configure all parameters in a properties file. The only mandatory parameter is the location of your workflow, otherwise you can also override default parameters specific to your workflow. This is an example:

oozie.wf.application.path=/samples/tile_average/workflow.xml
oozie.use.system.libpath=True
output_dir=.

This file can also be found in the 'workflows' directory of our sample, as 'job.properties'.

To submit your job, run:

 oozie job -oozie http://epod6.vgt.vito.be:11000/oozie/ -config job.properties -run

Once running, the job should become visible in the Hue Oozie dashboard.

Scheduling

Now that our workflow is ready, we can schedule it to run on a regular basis. In Oozie, this requires the use of a coordinator. Similarly to the workflow, we need to create an xml file and a properties file.

coordinator.xml:

<coordinator-app name="SampleCoordinator" frequency="0 2 * * *" start="2016-06-10T00:00Z" end="2050-01-01T00:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1">
    <controls>
      <concurrency>1</concurrency>
      <execution>LAST_ONLY</execution>
    </controls>
  <action>
  <workflow>
    <app-path>hdfs://hacluster/samples/tile_average/workflow.xml</app-path>
    </workflow>
  </action>
</coordinator-app>

coordinator.properties:

oozie.coord.application.path=/samples/tile_average/coordinator.xml

This file should also be saved to HDFS:

hdfs dfs -put workflows/coordinator.xml /samples/tile_average

Now we can start the coordinator, which will run continuously to schedule jobs at the requested time intervals:

oozie job -oozie http://epod6.vgt.vito.be:11000/oozie/ -config coordinator.properties -run