Difference between revisions of "Spark"

From UFAL AIC
(More text about Spark)
m (Use smaller quota for students group.)
 
(One intermediate revision by the same user not shown)
Line 19: Line 19:
 
<source lang="sh">PYSPARK_DRIVER_PYTHON=ipython3 pyspark</source>
 
<source lang="sh">PYSPARK_DRIVER_PYTHON=ipython3 pyspark</source>
  
Such command will use the current cluster (detected through the <code>MASTER</code> environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using <code>MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code> starts a local cluster with just a single thread.
+
Such command will use the current Spark cluster (detected through the <code>MASTER</code> environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using <code>MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code> starts a local cluster with just a single thread.
  
 
To create a distributed cluster using Slurm, you can run one of the following commands:
 
To create a distributed cluster using Slurm, you can run one of the following commands:
Line 30: Line 30:
 
== Example ==
 
== Example ==
  
Start by running <code>spark-srun 50 2G</code> (or use <code>7 2G</code> if you are in the group '''students'''). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute <code>PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code>.
+
Start by running <code>spark-srun 50 2G</code> (or use <code>spark-srun 7 2G</code> if you are in the group '''students''' to use the  maximum number of 8 CPUs, but the example below will take a few minutes). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute <code>PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code>.
  
 
Then, try running the following:
 
Then, try running the following:

Latest revision as of 18:57, 13 November 2023

Apache Spark: Framework for Distributed Computations

Apache Spark is a framework for distributed computations. Natively it works in Python, Scala, and Java.

Apart from embarrassingly parallel computations, Spark framework is suitable for in-memory and/or iterative computations, making it suitable even for machine learning and complex data processing. (The Spark framework shares some underlying implementation with Hadoop, but it is quite different – Hadoop framework does not offer in-memory computations and has only limited support for iterative computations.)

The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.

Current Version as of Nov 2023 is Spark 3.5.0

Initial Configuration

To use Spark on AIC, you need to add the following to your .profile

export PATH="/lnet/aic/data/spark/bin:/lnet/aic/data/spark/slurm:/lnet/aic/data/spark/sbt/bin:$PATH"

Running

An interactive ipython shell can be started using (use pip3 install --user ipython if you do not have ipython3):

PYSPARK_DRIVER_PYTHON=ipython3 pyspark

Such command will use the current Spark cluster (detected through the MASTER environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark starts a local cluster with just a single thread.

To create a distributed cluster using Slurm, you can run one of the following commands:

  • spark-srun [salloc args] workers memory_per_workerG[:python_memoryG]
    
    start Spark cluster and perform a srun inside it
  • spark-sbatch [sbatch args] workers memory_per_workerG[:python_memoryG] command [arguments...]
    
    start Spark cluster and execute the given command inside it

A good default for memory per worker is 2G; the default value for the python_memoryG is 2G. If you want to save memory, use memory specification 1G:1G.

Example

Start by running spark-srun 50 2G (or use spark-srun 7 2G if you are in the group students to use the maximum number of 8 CPUs, but the example below will take a few minutes). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute PYSPARK_DRIVER_PYTHON=ipython3 pyspark.

Then, try running the following:

(sc.textFile("/lnet/aic/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
   .flatMap(lambda line: line.split())
   .map(lambda word: (word, 1))
   .reduceByKey(lambda c1, c2: c1 + c2)
   .sortBy(lambda word_count: word_count[1], ascending=False)
   .take(10))

Running a Script

To execute a script instead of running from an interactive shell, you need to create the SparkContext manually:

  • File word_count.py:
    #!/usr/bin/env python
    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("input", type=str, help="Input file/path")
    parser.add_argument("output", type=str, help="Output directory")
    args = parser.parse_args()
    
    import pyspark
    
    sc = pyspark.SparkContext()
    input = sc.textFile(args.input, 3*sc.defaultParallelism)
    words = input.flatMap(lambda line: line.split())
    counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2)
    sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False)
    sorted.saveAsTextFile(args.output)
    

You can run such script using

spark-submit script.py input_path output_path

When executed inside a spark-srun/spark-sbatch session, it connects to the running cluster; otherwise, it starts a local cluster with as many threads as cores (or with just a single thread if MASTER=local is used).

If you want to use a specific virtual environment, you can use

PYSPARK_PYTHON=path_to_python_in_virtual_env spark-submit ...

Basic Methods

Further Pointers