To speed up processing, it is often desirable to distribute the work over a number of machines. The easiest way to do this is to use the Apache Spark processing framework. The fastest way to get started, is to read the Spark documentation: https://spark.apache.org/docs/1.6.2/quick-start.html.

The Spark version installed on our cluster is 1.6.2, so it is recommended that you stick with this version. It is however not impossible to use a newer version if really needed. Spark is also installed on your virtual machine, so you can run 'spark-submit' from the command line. Next to this version, we can now also offer Spark 2.x alongside 1.6.2. To use Spark2, do the following:

export SPARK_MAJOR_VERSION=2
export SPARK_HOME=/usr/hdp/current/spark2-client

To run jobs on the Hadoop cluster, the 'yarn-cluster' mode has to be used, and you need to authenticate with Kerberos. For the authentication, just run 'kinit' on the command line. You will be asked to provide your password. Two other useful commands are 'klist' to show whether you have been authenticated, and 'kdestroy' to clear all authentication information. After some time, your login will expire, so you'll need to run 'kinit' again.

A Python Spark example is available which should help you to get started.

Resource management

Spark jobs are being run on a shared processing cluster. The cluster will divide available resources among all running jobs, based on certain parameters.

Memory

To allocate memory to your executors, there are two relevant settings:

The amount of memory available for the Spark 'Java' process: --executor-memory 1G

The amount of memory for your Python or R script: --conf spark.yarn.executor.memoryOverhead=2048

If you need more detailed tuning of the memory managment inside the Java process, you can use: --conf spark.memory.fraction=0.05

Number of parallel jobs

The number of tasks that are processed in parallel can be determined dynamically by spark. Therefore you should use these parameters:

--conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true

Optionally, you can set upper or lower bounds:
--conf spark.dynamicAllocation.maxExecutors=30 --conf spark.dynamicAllocation.minExecutors=10

If you want a fixed number of executors, use:
--num-executors 10

We don't recommend this, as it reduces the ability of the cluster manager to optimally allocate resources.

Dependencies

A lot of commonly used Python dependencies are preinstalled on the cluster, but in some cases, you want to provide your own.

The first thing you need to do this, is to get a package containing your dependency. PySpark supports zip, egg, or whl packages. The easiest way to get such a package is by using pip:

pip download Flask==1.0.2

This will download the package, and all of its dependencies. Pip will prefer to download a wheel if one is available, but may also return a ".tar.gz" file, which you will need to repackage as zip or wheel.

To repackage a tar.gz as wheel:

tar xzvf package.tar.gz

cd package

python setup.py bdist_wheel

Note that a wheel may contain files that are dependent on the version of Python that you are using, so make sure you use the right (2.7 or 3.5) Python to perform this command.

Once the wheel is available, you can include it in your spark-submit command:

--py-files mypackage.whl

Notifications

If you want to receive a notification (e.g. an email) when the job reaches a final state (succeeded or failed), you can add a SparkListener on the SparkContext for Java or Scala jobs:

SparkContext sc = ...

sc.addSparkListener(
  new SparkListener() {
    ...
    @Override public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
      // send email
    }
    ...
});

You can also implement a SparkListener and specify the classname when submitting the Spark job:

spark-submit --conf spark.extraListeners=path.to.MySparkListener ...

In PySpark, this is a bit more complicated as you will need to use Py4J:

class PythonSparkListener(object):

    def onApplicationEnd(self, applicationEnd):
      // send email
    # also implement other onXXX methods

    class Java:
        implements = ["org.apache.spark.scheduler.SparkListener"]    
sc = SparkContext()
sc._gateway.start_callback_server()
listener = PythonSparkListener()
sc._jsc.sc().addSparkListener(listener)

try:
  # your Spark logic goes here ...
finally:
  sc._gateway.shutdown_callback_server()
  sc.stop()

In a future release of the JobControl dashboard, we will add the possibility to send an email automatically when the job reaches a final state.