Difference between revisions of "Spark"
(Create initial Spark page. More content to come.) |
m (Use smaller quota for students group.) |
||
(3 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
− | = Spark: Framework for Distributed Computations = | + | = Apache Spark: Framework for Distributed Computations = |
[https://spark.apache.org Apache Spark] is a framework for distributed computations. Natively it works in Python, Scala, and Java. | [https://spark.apache.org Apache Spark] is a framework for distributed computations. Natively it works in Python, Scala, and Java. | ||
Line 7: | Line 7: | ||
The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion. | The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion. | ||
− | + | == Current Version as of Nov 2023 is Spark 3.5.0 == | |
− | + | = Initial Configuration = | |
To use Spark on AIC, you need to add the following to your <code>.profile</code> | To use Spark on AIC, you need to add the following to your <code>.profile</code> | ||
− | + | <source lang="sh">export PATH="/lnet/aic/data/spark/bin:/lnet/aic/data/spark/slurm:/lnet/aic/data/spark/sbt/bin:$PATH"</source> | |
+ | |||
+ | = Running = | ||
+ | |||
+ | An interactive <code>ipython</code> shell can be started using (use <code>pip3 install --user ipython</code> if you do not have <code>ipython3</code>): | ||
+ | <source lang="sh">PYSPARK_DRIVER_PYTHON=ipython3 pyspark</source> | ||
+ | |||
+ | Such command will use the current Spark cluster (detected through the <code>MASTER</code> environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using <code>MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code> starts a local cluster with just a single thread. | ||
+ | |||
+ | To create a distributed cluster using Slurm, you can run one of the following commands: | ||
+ | * <source lang="sh">spark-srun [salloc args] workers memory_per_workerG[:python_memoryG]</source> start Spark cluster and perform a <code>srun</code> inside it | ||
+ | |||
+ | * <source lang="sh">spark-sbatch [sbatch args] workers memory_per_workerG[:python_memoryG] command [arguments...]</source> start Spark cluster and execute the given command inside it | ||
+ | |||
+ | A good default for memory per worker is <code>2G</code>; the default value for the <code>python_memoryG</code> is <code>2G</code>. If you want to save memory, use memory specification <code>1G:1G</code>. | ||
+ | |||
+ | == Example == | ||
+ | |||
+ | Start by running <code>spark-srun 50 2G</code> (or use <code>spark-srun 7 2G</code> if you are in the group '''students''' to use the maximum number of 8 CPUs, but the example below will take a few minutes). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute <code>PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code>. | ||
+ | |||
+ | Then, try running the following: | ||
+ | |||
+ | <source lang="python">(sc.textFile("/lnet/aic/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism) | ||
+ | .flatMap(lambda line: line.split()) | ||
+ | .map(lambda word: (word, 1)) | ||
+ | .reduceByKey(lambda c1, c2: c1 + c2) | ||
+ | .sortBy(lambda word_count: word_count[1], ascending=False) | ||
+ | .take(10))</source> | ||
+ | |||
+ | == Running a Script == | ||
+ | |||
+ | To execute a script instead of running from an interactive shell, you need to create the <code>SparkContext</code> manually: | ||
+ | |||
+ | * File <code>word_count.py</code>: <source lang="python"> | ||
+ | #!/usr/bin/env python | ||
+ | import argparse | ||
+ | |||
+ | parser = argparse.ArgumentParser() | ||
+ | parser.add_argument("input", type=str, help="Input file/path") | ||
+ | parser.add_argument("output", type=str, help="Output directory") | ||
+ | args = parser.parse_args() | ||
+ | |||
+ | import pyspark | ||
+ | |||
+ | sc = pyspark.SparkContext() | ||
+ | input = sc.textFile(args.input, 3*sc.defaultParallelism) | ||
+ | words = input.flatMap(lambda line: line.split()) | ||
+ | counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2) | ||
+ | sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False) | ||
+ | sorted.saveAsTextFile(args.output) | ||
+ | </source> | ||
+ | |||
+ | You can run such script using | ||
+ | |||
+ | <source lang="sh">spark-submit script.py input_path output_path</source> | ||
+ | When executed inside a <code>spark-srun/spark-sbatch</code> session, it connects to the running cluster; otherwise, it starts a local cluster with as many threads as cores (or with just a single thread if <code>MASTER=local</code> is used). | ||
+ | |||
+ | If you want to use a specific virtual environment, you can use | ||
+ | |||
+ | <source lang="sh">PYSPARK_PYTHON=path_to_python_in_virtual_env spark-submit ...</source> | ||
+ | |||
+ | = Basic Methods = | ||
+ | |||
+ | * [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.textFile <code>sc.textFile</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.parallelize <code>sc.parallelize</code>], … | ||
+ | * [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.collect <code>rdd.collect</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.take <code>rdd.take</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.saveAsTextFile <code>rdd.saveAsTextFile</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.coalesce <code>rdd.coalesce</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.repartition <code>rdd.repartition</code>], … | ||
+ | * [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.map <code>rdd.map</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.flatMap <code>rdd.flatMap</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.count <code>rdd.count</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.distinct <code>rdd.distinct</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sortByKey <code>rdd.sortByKey</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey <code>rdd.reduceByKey</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.groupByKey <code>rdd.groupByKey</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sample <code>rdd.sample</code>], … | ||
+ | * [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.cache <code>rdd.cache</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.unpersist <code>rdd.unpersist</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.pipe <code>rdd.pipe</code>], … | ||
+ | |||
+ | = Further Pointers = | ||
+ | |||
+ | * [https://spark.apache.org/docs/latest/rdd-programming-guide.html Spark Programming Guide] | ||
+ | * [https://spark.apache.org/docs/latest/api/python/index.html Spark Python API] | ||
+ | * [https://wiki.ufal.ms.mff.cuni.cz/spark Spark on ÚFAL wiki] | ||
+ | * [https://wiki.ufal.ms.mff.cuni.cz/spark:recipes:reading-text-files Reading Input on ÚFAL wiki] | ||
+ | * [https://wiki.ufal.ms.mff.cuni.cz/spark:recipes:writing-text-files Writing Output on ÚFAL wiki] |
Latest revision as of 18:57, 13 November 2023
Contents
Apache Spark: Framework for Distributed Computations
Apache Spark is a framework for distributed computations. Natively it works in Python, Scala, and Java.
Apart from embarrassingly parallel computations, Spark framework is suitable for in-memory and/or iterative computations, making it suitable even for machine learning and complex data processing. (The Spark framework shares some underlying implementation with Hadoop, but it is quite different – Hadoop framework does not offer in-memory computations and has only limited support for iterative computations.)
The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.
Current Version as of Nov 2023 is Spark 3.5.0
Initial Configuration
To use Spark on AIC, you need to add the following to your .profile
export PATH="/lnet/aic/data/spark/bin:/lnet/aic/data/spark/slurm:/lnet/aic/data/spark/sbt/bin:$PATH"
Running
An interactive ipython
shell can be started using (use pip3 install --user ipython
if you do not have ipython3
):
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
Such command will use the current Spark cluster (detected through the MASTER
environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark
starts a local cluster with just a single thread.
To create a distributed cluster using Slurm, you can run one of the following commands:
- start Spark cluster and perform a
spark-srun [salloc args] workers memory_per_workerG[:python_memoryG]
srun
inside it
- start Spark cluster and execute the given command inside it
spark-sbatch [sbatch args] workers memory_per_workerG[:python_memoryG] command [arguments...]
A good default for memory per worker is 2G
; the default value for the python_memoryG
is 2G
. If you want to save memory, use memory specification 1G:1G
.
Example
Start by running spark-srun 50 2G
(or use spark-srun 7 2G
if you are in the group students to use the maximum number of 8 CPUs, but the example below will take a few minutes). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute PYSPARK_DRIVER_PYTHON=ipython3 pyspark
.
Then, try running the following:
(sc.textFile("/lnet/aic/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda c1, c2: c1 + c2)
.sortBy(lambda word_count: word_count[1], ascending=False)
.take(10))
Running a Script
To execute a script instead of running from an interactive shell, you need to create the SparkContext
manually:
- File
word_count.py
:#!/usr/bin/env python import argparse parser = argparse.ArgumentParser() parser.add_argument("input", type=str, help="Input file/path") parser.add_argument("output", type=str, help="Output directory") args = parser.parse_args() import pyspark sc = pyspark.SparkContext() input = sc.textFile(args.input, 3*sc.defaultParallelism) words = input.flatMap(lambda line: line.split()) counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2) sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False) sorted.saveAsTextFile(args.output)
You can run such script using
spark-submit script.py input_path output_path
When executed inside a spark-srun/spark-sbatch
session, it connects to the running cluster; otherwise, it starts a local cluster with as many threads as cores (or with just a single thread if MASTER=local
is used).
If you want to use a specific virtual environment, you can use
PYSPARK_PYTHON=path_to_python_in_virtual_env spark-submit ...
Basic Methods
sc.textFile
,sc.parallelize
, …rdd.collect
,rdd.take
,rdd.saveAsTextFile
,rdd.coalesce
,rdd.repartition
, …rdd.map
,rdd.flatMap
,rdd.count
,rdd.distinct
,rdd.sortByKey
,rdd.reduceByKey
,rdd.groupByKey
,rdd.sample
, …rdd.cache
,rdd.unpersist
,rdd.pipe
, …