Difference between revisions of "Spark"
(More text about Spark) |
m (Add a word) |
||
Line 19: | Line 19: | ||
<source lang="sh">PYSPARK_DRIVER_PYTHON=ipython3 pyspark</source> | <source lang="sh">PYSPARK_DRIVER_PYTHON=ipython3 pyspark</source> | ||
− | Such command will use the current cluster (detected through the <code>MASTER</code> environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using <code>MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code> starts a local cluster with just a single thread. | + | Such command will use the current Spark cluster (detected through the <code>MASTER</code> environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using <code>MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code> starts a local cluster with just a single thread. |
To create a distributed cluster using Slurm, you can run one of the following commands: | To create a distributed cluster using Slurm, you can run one of the following commands: |
Revision as of 18:50, 13 November 2023
Contents
Apache Spark: Framework for Distributed Computations
Apache Spark is a framework for distributed computations. Natively it works in Python, Scala, and Java.
Apart from embarrassingly parallel computations, Spark framework is suitable for in-memory and/or iterative computations, making it suitable even for machine learning and complex data processing. (The Spark framework shares some underlying implementation with Hadoop, but it is quite different – Hadoop framework does not offer in-memory computations and has only limited support for iterative computations.)
The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.
Current Version as of Nov 2023 is Spark 3.5.0
Initial Configuration
To use Spark on AIC, you need to add the following to your .profile
export PATH="/lnet/aic/data/spark/bin:/lnet/aic/data/spark/slurm:/lnet/aic/data/spark/sbt/bin:$PATH"
Running
An interactive ipython
shell can be started using (use pip3 install --user ipython
if you do not have ipython3
):
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
Such command will use the current Spark cluster (detected through the MASTER
environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark
starts a local cluster with just a single thread.
To create a distributed cluster using Slurm, you can run one of the following commands:
- start Spark cluster and perform a
spark-srun [salloc args] workers memory_per_workerG[:python_memoryG]
srun
inside it
- start Spark cluster and execute the given command inside it
spark-sbatch [sbatch args] workers memory_per_workerG[:python_memoryG] command [arguments...]
A good default for memory per worker is 2G
; the default value for the python_memoryG
is 2G
. If you want to save memory, use memory specification 1G:1G
.
Example
Start by running spark-srun 50 2G
(or use 7 2G
if you are in the group students). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute PYSPARK_DRIVER_PYTHON=ipython3 pyspark
.
Then, try running the following:
(sc.textFile("/lnet/aic/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda c1, c2: c1 + c2)
.sortBy(lambda word_count: word_count[1], ascending=False)
.take(10))
Running a Script
To execute a script instead of running from an interactive shell, you need to create the SparkContext
manually:
- File
word_count.py
:#!/usr/bin/env python import argparse parser = argparse.ArgumentParser() parser.add_argument("input", type=str, help="Input file/path") parser.add_argument("output", type=str, help="Output directory") args = parser.parse_args() import pyspark sc = pyspark.SparkContext() input = sc.textFile(args.input, 3*sc.defaultParallelism) words = input.flatMap(lambda line: line.split()) counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2) sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False) sorted.saveAsTextFile(args.output)
You can run such script using
spark-submit script.py input_path output_path
When executed inside a spark-srun/spark-sbatch
session, it connects to the running cluster; otherwise, it starts a local cluster with as many threads as cores (or with just a single thread if MASTER=local
is used).
If you want to use a specific virtual environment, you can use
PYSPARK_PYTHON=path_to_python_in_virtual_env spark-submit ...
Basic Methods
sc.textFile
,sc.parallelize
, …rdd.collect
,rdd.take
,rdd.saveAsTextFile
,rdd.coalesce
,rdd.repartition
, …rdd.map
,rdd.flatMap
,rdd.count
,rdd.distinct
,rdd.sortByKey
,rdd.reduceByKey
,rdd.groupByKey
,rdd.sample
, …rdd.cache
,rdd.unpersist
,rdd.pipe
, …