To speed up processing, it is often desirable to distribute the work over a number of machines. The easiest way to do this is to use the Apache Spark processing framework. The fastest way to get started, is to read the Spark documentation: https://spark.apache.org/docs/1.6.2/quick-start.html.

The Spark version installed on our cluster is 1.6.2, so it is recommended that you stick with this version. It is however not impossible to use a newer version if really needed. Spark is also installed on your virtual machine, so you can run 'spark-submit' from the command line. Next to this version, we can now also offer Spark 2.x alongside 1.6.2. To use Spark2, do the following:

export SPARK_MAJOR_VERSION=2
export SPARK_HOME=/usr/hdp/current/spark2-client

To run jobs on the Hadoop cluster, the 'yarn-cluster' mode has to be used, and you need to authenticate with Kerberos. For the authentication, just run 'kinit' on the command line. You will be asked to provide your password. Two other useful commands are 'klist' to show whether you have been authenticated, and 'kdestroy' to clear all authentication information. After some time, your login will expire, so you'll need to run 'kinit' again.

A Python Spark example is available which should help you to get started.

 

Resource management

Spark jobs are being run on a shared processing cluster. The cluster will divide available resources among all running jobs, based on certain parameters.

Memory

To allocate memory to your executors, you can specify it like this: --executor-memory 2G

Spark will also use some of the memory for it's won purposes. If you want to allocate most of the memory to your own job, use: --conf spark.memory.fraction=0.05

Number of parallel jobs

The number of tasks that are processed in parallel can be determined dynamically by spark. Therefore you should use these parameters:

--conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true

Optionally, you can set upper or lower bounds:
--conf spark.dynamicAllocation.maxExecutors=30 --conf spark.dynamicAllocation.minExecutors=10

If you want a fixed number of executors, use:
--num-executors 10

We don't recommend this, as it reduces the ability of the cluster manager to optimally allocate resources.

 

Logging

In your application, you can write logging to the standard output and error streams. Afterwards these log files are collected and can be retrieved in a number of ways:

From the command line, on your VM:

yarn logs  -log_files_pattern std.*  -applicationId application_1484394506558_0055

From the PROBA-V MEP job dashboard, accessibly from anywhere:

https://proba-v-mep.esa.int/applications/jobdashboard/

From the internal YARN job overview page, only accessible from your VM:

http://epod6.vgt.vito.be:8088/cluster/

This page also gives you access to the 'Spark UI', by clicking on 'Appliction Master'. This UI gives you detailed information on the tasks that are being executed, and can be helpful in case of errors.

By default, Spark will also write a lot of output to the standard error stream. This may make it hard to find your own messages, and is not very useful for most users. Have a look at the Python Spark example: it contains a log4j.properties file that reduces Spark logging to error messages only. To use it, append it to the list of files distributed with spark-submit: '--files log4j.properties'.

Also note how using  "-log_files_pattern std.*" in the yarn logs command, avoids returning some logs that are rarely needed.

 

Notifications

If you want to receive a notification (e.g. an email) when the job reaches a final state (succeeded or failed), you can add a SparkListener on the SparkContext for Java or Scala jobs:

SparkContext sc = ... sc.addSparkListener( new SparkListener() { ... @Override public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {   // send email   }   ... });

You can also implement a SparkListener and specify the classname when submitting the Spark job:

spark-submit --conf spark.extraListeners=path.to.MySparkListener ...

In PySpark, this is a bit more complicated as you will need to use Py4J:

class PythonSparkListener(object): def onApplicationEnd(self, applicationEnd): // send email   # also implement other onXXX methods class Java: implements = ["org.apache.spark.scheduler.SparkListener"]
sc = SparkContext() sc._gateway.start_callback_server() listener = PythonSparkListener() sc._jsc.sc().addSparkListener(listener) try: # your Spark logic goes here ... finally: sc._gateway.shutdown_callback_server() sc.stop()

In a future release of the JobControl dashboard, we will add the possibility to send an email automatically when the job reaches a final state.