Difference between revisions of "Spark"
m (Add a word) |
m (Use smaller quota for students group.) |
||
Line 30: | Line 30: | ||
== Example == | == Example == | ||
− | Start by running <code>spark-srun 50 2G</code> (or use <code>7 2G</code> if you are in the group '''students'''). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute <code>PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code>. | + | Start by running <code>spark-srun 50 2G</code> (or use <code>spark-srun 7 2G</code> if you are in the group '''students''' to use the maximum number of 8 CPUs, but the example below will take a few minutes). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute <code>PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code>. |
Then, try running the following: | Then, try running the following: |
Latest revision as of 18:57, 13 November 2023
Contents
Apache Spark: Framework for Distributed Computations
Apache Spark is a framework for distributed computations. Natively it works in Python, Scala, and Java.
Apart from embarrassingly parallel computations, Spark framework is suitable for in-memory and/or iterative computations, making it suitable even for machine learning and complex data processing. (The Spark framework shares some underlying implementation with Hadoop, but it is quite different – Hadoop framework does not offer in-memory computations and has only limited support for iterative computations.)
The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.
Current Version as of Nov 2023 is Spark 3.5.0
Initial Configuration
To use Spark on AIC, you need to add the following to your .profile
export PATH="/lnet/aic/data/spark/bin:/lnet/aic/data/spark/slurm:/lnet/aic/data/spark/sbt/bin:$PATH"
Running
An interactive ipython
shell can be started using (use pip3 install --user ipython
if you do not have ipython3
):
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
Such command will use the current Spark cluster (detected through the MASTER
environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark
starts a local cluster with just a single thread.
To create a distributed cluster using Slurm, you can run one of the following commands:
- start Spark cluster and perform a
spark-srun [salloc args] workers memory_per_workerG[:python_memoryG]
srun
inside it
- start Spark cluster and execute the given command inside it
spark-sbatch [sbatch args] workers memory_per_workerG[:python_memoryG] command [arguments...]
A good default for memory per worker is 2G
; the default value for the python_memoryG
is 2G
. If you want to save memory, use memory specification 1G:1G
.
Example
Start by running spark-srun 50 2G
(or use spark-srun 7 2G
if you are in the group students to use the maximum number of 8 CPUs, but the example below will take a few minutes). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute PYSPARK_DRIVER_PYTHON=ipython3 pyspark
.
Then, try running the following:
(sc.textFile("/lnet/aic/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda c1, c2: c1 + c2)
.sortBy(lambda word_count: word_count[1], ascending=False)
.take(10))
Running a Script
To execute a script instead of running from an interactive shell, you need to create the SparkContext
manually:
- File
word_count.py
:#!/usr/bin/env python import argparse parser = argparse.ArgumentParser() parser.add_argument("input", type=str, help="Input file/path") parser.add_argument("output", type=str, help="Output directory") args = parser.parse_args() import pyspark sc = pyspark.SparkContext() input = sc.textFile(args.input, 3*sc.defaultParallelism) words = input.flatMap(lambda line: line.split()) counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2) sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False) sorted.saveAsTextFile(args.output)
You can run such script using
spark-submit script.py input_path output_path
When executed inside a spark-srun/spark-sbatch
session, it connects to the running cluster; otherwise, it starts a local cluster with as many threads as cores (or with just a single thread if MASTER=local
is used).
If you want to use a specific virtual environment, you can use
PYSPARK_PYTHON=path_to_python_in_virtual_env spark-submit ...
Basic Methods
sc.textFile
,sc.parallelize
, …rdd.collect
,rdd.take
,rdd.saveAsTextFile
,rdd.coalesce
,rdd.repartition
, …rdd.map
,rdd.flatMap
,rdd.count
,rdd.distinct
,rdd.sortByKey
,rdd.reduceByKey
,rdd.groupByKey
,rdd.sample
, …rdd.cache
,rdd.unpersist
,rdd.pipe
, …