Difference between revisions of "Spark"

From UFAL AIC
m (Use Apache Spark in the Title.)
(More text about Spark)
Line 7: Line 7:
 
The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.
 
The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.
  
=== Current Version as of Nov 2023 is Spark 3.5.0 ===
+
== Current Version as of Nov 2023 is Spark 3.5.0 ==
  
== Initial Configuration ==
+
= Initial Configuration =
  
 
To use Spark on AIC, you need to add the following to your <code>.profile</code>
 
To use Spark on AIC, you need to add the following to your <code>.profile</code>
export PATH="/lnet/aic/data/spark/bin:/lnet/aic/data/spark/slurm:/lnet/aic/data/spark/sbt/bin:$PATH"
+
<source lang="sh">export PATH="/lnet/aic/data/spark/bin:/lnet/aic/data/spark/slurm:/lnet/aic/data/spark/sbt/bin:$PATH"</source>
 +
 
 +
= Running =
 +
 
 +
An interactive <code>ipython</code> shell can be started using (use <code>pip3 install --user ipython</code> if you do not have <code>ipython3</code>):
 +
<source lang="sh">PYSPARK_DRIVER_PYTHON=ipython3 pyspark</source>
 +
 
 +
Such command will use the current cluster (detected through the <code>MASTER</code> environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using <code>MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code> starts a local cluster with just a single thread.
 +
 
 +
To create a distributed cluster using Slurm, you can run one of the following commands:
 +
* <source lang="sh">spark-srun [salloc args] workers memory_per_workerG[:python_memoryG]</source> start Spark cluster and perform a <code>srun</code> inside it
 +
 
 +
* <source lang="sh">spark-sbatch [sbatch args] workers memory_per_workerG[:python_memoryG] command [arguments...]</source> start Spark cluster and execute the given command inside it
 +
 
 +
A good default for memory per worker is <code>2G</code>; the default value for the <code>python_memoryG</code> is <code>2G</code>. If you want to save memory, use memory specification <code>1G:1G</code>.
 +
 
 +
== Example ==
 +
 
 +
Start by running <code>spark-srun 50 2G</code> (or use <code>7 2G</code> if you are in the group '''students'''). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute <code>PYSPARK_DRIVER_PYTHON=ipython3 pyspark</code>.
 +
 
 +
Then, try running the following:
 +
 
 +
<source lang="python">(sc.textFile("/lnet/aic/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
 +
  .flatMap(lambda line: line.split())
 +
  .map(lambda word: (word, 1))
 +
  .reduceByKey(lambda c1, c2: c1 + c2)
 +
  .sortBy(lambda word_count: word_count[1], ascending=False)
 +
  .take(10))</source>
 +
 
 +
== Running a Script ==
 +
 
 +
To execute a script instead of running from an interactive shell, you need to create the <code>SparkContext</code> manually:
 +
 
 +
* File <code>word_count.py</code>: <source lang="python">
 +
#!/usr/bin/env python
 +
import argparse
 +
 
 +
parser = argparse.ArgumentParser()
 +
parser.add_argument("input", type=str, help="Input file/path")
 +
parser.add_argument("output", type=str, help="Output directory")
 +
args = parser.parse_args()
 +
 
 +
import pyspark
 +
 
 +
sc = pyspark.SparkContext()
 +
input = sc.textFile(args.input, 3*sc.defaultParallelism)
 +
words = input.flatMap(lambda line: line.split())
 +
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2)
 +
sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False)
 +
sorted.saveAsTextFile(args.output)
 +
</source>
 +
 
 +
You can run such script using
 +
 
 +
<source lang="sh">spark-submit script.py input_path output_path</source>
 +
When executed inside a <code>spark-srun/spark-sbatch</code> session, it connects to the running cluster; otherwise, it starts a local cluster with as many threads as cores (or with just a single thread if <code>MASTER=local</code> is used).
 +
 
 +
If you want to use a specific virtual environment, you can use
 +
 
 +
<source lang="sh">PYSPARK_PYTHON=path_to_python_in_virtual_env spark-submit ...</source>
 +
 
 +
= Basic Methods =
 +
 
 +
* [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.textFile <code>sc.textFile</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.parallelize <code>sc.parallelize</code>], …
 +
* [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.collect <code>rdd.collect</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.take <code>rdd.take</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.saveAsTextFile <code>rdd.saveAsTextFile</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.coalesce <code>rdd.coalesce</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.repartition <code>rdd.repartition</code>], …
 +
* [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.map <code>rdd.map</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.flatMap <code>rdd.flatMap</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.count <code>rdd.count</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.distinct <code>rdd.distinct</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sortByKey <code>rdd.sortByKey</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey <code>rdd.reduceByKey</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.groupByKey <code>rdd.groupByKey</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sample <code>rdd.sample</code>], …
 +
* [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.cache <code>rdd.cache</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.unpersist <code>rdd.unpersist</code>], [https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.pipe <code>rdd.pipe</code>], …
 +
 
 +
= Further Pointers =
 +
 
 +
* [https://spark.apache.org/docs/latest/rdd-programming-guide.html Spark Programming Guide]
 +
* [https://spark.apache.org/docs/latest/api/python/index.html Spark Python API]
 +
* [https://wiki.ufal.ms.mff.cuni.cz/spark Spark on ÚFAL wiki]
 +
* [https://wiki.ufal.ms.mff.cuni.cz/spark:recipes:reading-text-files Reading Input on ÚFAL wiki]
 +
* [https://wiki.ufal.ms.mff.cuni.cz/spark:recipes:writing-text-files Writing Output on ÚFAL wiki]

Revision as of 18:29, 13 November 2023

Apache Spark: Framework for Distributed Computations

Apache Spark is a framework for distributed computations. Natively it works in Python, Scala, and Java.

Apart from embarrassingly parallel computations, Spark framework is suitable for in-memory and/or iterative computations, making it suitable even for machine learning and complex data processing. (The Spark framework shares some underlying implementation with Hadoop, but it is quite different – Hadoop framework does not offer in-memory computations and has only limited support for iterative computations.)

The Spark framework can run either locally using one thread, locally using multiple threads or in a distributed fashion.

Current Version as of Nov 2023 is Spark 3.5.0

Initial Configuration

To use Spark on AIC, you need to add the following to your .profile

export PATH="/lnet/aic/data/spark/bin:/lnet/aic/data/spark/slurm:/lnet/aic/data/spark/sbt/bin:$PATH"

Running

An interactive ipython shell can be started using (use pip3 install --user ipython if you do not have ipython3):

PYSPARK_DRIVER_PYTHON=ipython3 pyspark

Such command will use the current cluster (detected through the MASTER environment variable), starting a local cluster with as many threads as cores if no cluster exists. Using MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark starts a local cluster with just a single thread.

To create a distributed cluster using Slurm, you can run one of the following commands:

  • spark-srun [salloc args] workers memory_per_workerG[:python_memoryG]
    
    start Spark cluster and perform a srun inside it
  • spark-sbatch [sbatch args] workers memory_per_workerG[:python_memoryG] command [arguments...]
    
    start Spark cluster and execute the given command inside it

A good default for memory per worker is 2G; the default value for the python_memoryG is 2G. If you want to save memory, use memory specification 1G:1G.

Example

Start by running spark-srun 50 2G (or use 7 2G if you are in the group students). When the cluster starts, it prints a URL where it can be monitored. After the cluster starts, execute PYSPARK_DRIVER_PYTHON=ipython3 pyspark.

Then, try running the following:

(sc.textFile("/lnet/aic/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
   .flatMap(lambda line: line.split())
   .map(lambda word: (word, 1))
   .reduceByKey(lambda c1, c2: c1 + c2)
   .sortBy(lambda word_count: word_count[1], ascending=False)
   .take(10))

Running a Script

To execute a script instead of running from an interactive shell, you need to create the SparkContext manually:

  • File word_count.py:
    #!/usr/bin/env python
    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("input", type=str, help="Input file/path")
    parser.add_argument("output", type=str, help="Output directory")
    args = parser.parse_args()
    
    import pyspark
    
    sc = pyspark.SparkContext()
    input = sc.textFile(args.input, 3*sc.defaultParallelism)
    words = input.flatMap(lambda line: line.split())
    counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2)
    sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False)
    sorted.saveAsTextFile(args.output)
    

You can run such script using

spark-submit script.py input_path output_path

When executed inside a spark-srun/spark-sbatch session, it connects to the running cluster; otherwise, it starts a local cluster with as many threads as cores (or with just a single thread if MASTER=local is used).

If you want to use a specific virtual environment, you can use

PYSPARK_PYTHON=path_to_python_in_virtual_env spark-submit ...

Basic Methods

Further Pointers