To speed up processing, it is often desirable to distribute the work over a number of machines. The easiest way to do this is to use the Apache Spark processing framework. The fastest way to get started, is to read the Spark documentation:

The Spark version installed on our cluster is 2.3.2, so it is recommended that you stick with this version. It is however not impossible to use a newer version if really needed. Spark is also installed on your virtual machine, so you can run 'spark-submit' from the command line. To use Spark 2, do the following:

export SPARK_HOME=/usr/hdp/current/spark2-client

To run jobs on the Hadoop cluster, the 'yarn-cluster' mode has to be used, and you need to authenticate with Kerberos. For the authentication, just run 'kinit' on the command line. You will be asked to provide your password. Two other useful commands are 'klist' to show whether you have been authenticated, and 'kdestroy' to clear all authentication information. After some time, your login will expire, so you'll need to run 'kinit' again.

A Python Spark example is available which should help you to get started.

In Spark, you can use one of the Python interpreters which are available on the MEP platform. By default, Python2.7 is used. To select a different Python interpreter, set the PYSPARK_PYTHON environment variable before submitting the Spark job, e.g. to use Python3.5:

export PYSPARK_PYTHON="/usr/bin/python3.5"
spark-submit ... # your spark-submit command goes here

Resource management

Spark jobs are being run on a shared processing cluster. The cluster will divide available resources among all running jobs, based on certain parameters.


To allocate memory to your executors, you can specify it like this: --executor-memory 1G

When running PySpark jobs, you probably consume most memory off heap and should specify the memory overhead (e.g. --conf spark.executor.memoryOverhead 4000)

Number of parallel jobs

The number of tasks that are processed in parallel can be determined dynamically by spark. Therefore you should use these parameters:

--conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true

Optionally, you can set upper or lower bounds:
--conf spark.dynamicAllocation.maxExecutors=30 --conf spark.dynamicAllocation.minExecutors=10

If you want a fixed number of executors, use:
--num-executors 10

We don't recommend this, as it reduces the ability of the cluster manager to optimally allocate resources.



In your application, you can write logging to the standard output and error streams. Afterwards these log files are collected and can be retrieved in a number of ways:

From the command line, on your VM:

yarn logs  -log_files_pattern std.*  -applicationId application_1484394506558_0055

From the PROBA-V MEP job dashboard, accessibly from anywhere:

From the internal YARN job overview page, only accessible from your VM:

This page also gives you access to the 'Spark UI', by clicking on 'Appliction Master'. This UI gives you detailed information on the tasks that are being executed, and can be helpful in case of errors.

By default, Spark will also write a lot of output to the standard error stream. This may make it hard to find your own messages, and is not very useful for most users. Have a look at the Python Spark example: it contains a file that reduces Spark logging to error messages only. To use it, append it to the list of files distributed with spark-submit: '--files'.

Also note how using  "-log_files_pattern std.*" in the yarn logs command, avoids returning some logs that are rarely needed.



If you want to receive a notification (e.g. an email) when the job reaches a final state (succeeded or failed), you can add a SparkListener on the SparkContext for Java or Scala jobs:

SparkContext sc = ... sc.addSparkListener( new SparkListener() { ... @Override public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {   // send email   }   ... });

You can also implement a SparkListener and specify the classname when submitting the Spark job:

spark-submit --conf ...

In PySpark, this is a bit more complicated as you will need to use Py4J:

class PythonSparkListener(object): def onApplicationEnd(self, applicationEnd): // send email   # also implement other onXXX methods class Java: implements = ["org.apache.spark.scheduler.SparkListener"]
sc = SparkContext() sc._gateway.start_callback_server() listener = PythonSparkListener() try: # your Spark logic goes here ... finally: sc._gateway.shutdown_callback_server() sc.stop()