Spark

From UFAL AIC

Apache Spark: Framework for Distributed Computations

Apache Spark is a framework for distributed computations. Natively it works in Python, Scala, and Java.

Apart from embarrassingly parallel computations, Spark framework is suitable for in-memory and/or iterative computations, making it suitable even for machine learning and complex data processing. (The Spark framework shares some underlying implementation with Hadoop, but it is quite different – Hadoop framework does not offer in-memory computations and has only limited support for iterative computations.)

The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.

Current Version as of Nov 2023 is Spark 3.5.0

Initial Configuration

To use Spark on AIC, you need to add the following to your .profile

export PATH="/lnet/aic/data/spark/bin:/lnet/aic/data/spark/slurm:/lnet/aic/data/spark/sbt/bin:$PATH"

Running

An interactive ipython shell can be started using (use pip3 install --user ipython if you do not have ipython3):

PYSPARK_DRIVER_PYTHON=ipython3 pyspark

Such command will use the current Spark cluster (detected through the MASTER environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark starts a local cluster with just a single thread.

To create a distributed cluster using Slurm, you can run one of the following commands:

  • spark-srun [salloc args] workers memory_per_workerG[:python_memoryG]
    
    start Spark cluster and perform a srun inside it
  • spark-sbatch [sbatch args] workers memory_per_workerG[:python_memoryG] command [arguments...]
    
    start Spark cluster and execute the given command inside it

A good default for memory per worker is 2G; the default value for the python_memoryG is 2G. If you want to save memory, use memory specification 1G:1G.

Example

Start by running spark-srun 50 2G (or use spark-srun 7 2G if you are in the group students to use the maximum number of 8 CPUs, but the example below will take a few minutes). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute PYSPARK_DRIVER_PYTHON=ipython3 pyspark.

Then, try running the following:

(sc.textFile("/lnet/aic/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
   .flatMap(lambda line: line.split())
   .map(lambda word: (word, 1))
   .reduceByKey(lambda c1, c2: c1 + c2)
   .sortBy(lambda word_count: word_count[1], ascending=False)
   .take(10))

Running a Script

To execute a script instead of running from an interactive shell, you need to create the SparkContext manually:

  • File word_count.py:
    #!/usr/bin/env python
    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("input", type=str, help="Input file/path")
    parser.add_argument("output", type=str, help="Output directory")
    args = parser.parse_args()
    
    import pyspark
    
    sc = pyspark.SparkContext()
    input = sc.textFile(args.input, 3*sc.defaultParallelism)
    words = input.flatMap(lambda line: line.split())
    counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2)
    sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False)
    sorted.saveAsTextFile(args.output)
    

You can run such script using

spark-submit script.py input_path output_path

When executed inside a spark-srun/spark-sbatch session, it connects to the running cluster; otherwise, it starts a local cluster with as many threads as cores (or with just a single thread if MASTER=local is used).

If you want to use a specific virtual environment, you can use

PYSPARK_PYTHON=path_to_python_in_virtual_env spark-submit ...

Basic Methods

Further Pointers