Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
doku:bigdata [2021/05/27 07:13] dieter |
doku:bigdata [2023/05/11 08:52] (current) katrin |
||
---|---|---|---|
Line 1: | Line 1: | ||
====== Big Data ====== | ====== Big Data ====== | ||
- | ===== Apache | + | VSC-4 has Big Data modules for running |
+ | * MapReduce jobs | ||
+ | * in several programming languages, including Java, and | ||
+ | * using executables which can stream data from standard input to standard output (e.g. grep, cut, tr, sed, ...), and | ||
+ | * Spark jobs using | ||
+ | * Python, | ||
+ | * Java, | ||
+ | * Scala, | ||
+ | * R, | ||
+ | * SQL | ||
+ | either | ||
- | There are many frameworks in Data Science that are using < | + | ==== Big Data and HPC ==== |
- | ==== Example SLURM & Python scripts ==== | + | Among the advantages of Big Data also on HPC environments like VSC, we want to mention |
+ | * very easy coding, in many languages, including data centric ones like R or SQL, | ||
+ | * automatic parallelization without any changes in the user program, and | ||
+ | * extremely good scaling behaviour. | ||
- | Let us calculate the value of pi by means of the following Python script < | + | Users have to be aware of difficulties for running Big Data jobs on HPC clusters, namely |
- | < | + | * File systems are shared by all users. Users with high file system demand should use the local HDFS, if the same data is read more than once in a job. Please, do not submit many Big Data jobs at the same time, e.g. slurm array jobs! |
- | from __future__ import print_function | + | * Login nodes and internet connections are shared by all users. Bringing huge amounts of data to VSC takes some time. Please make sure that other users of VSC are not affected! |
+ | * Only very coarse grained parallelization will work with Big Data frameworks. Typically parallelization is done by partitioning input data (automatically) and doing the majority of the calculation independently by separated processes. | ||
- | import sys | + | In the context of high performance computing a typical application of Big Data methods is pre- and/or postprocessing of large amounts of data, e.g. |
- | from random import random | + | * filtering, |
- | from operator import add | + | * data cleaning, |
+ | * discarding input fields which are not required, | ||
+ | * counting, | ||
+ | * ... | ||
- | from pyspark.sql import SparkSession | + | Frameworks and applications using Big Data methods are shared among many scientific communities. |
+ | ==== Apache Spark on VSC ==== | ||
+ | |||
+ | There are many frameworks in Data Science that are using < | ||
+ | |||
+ | === Example slurm & python scripts === | ||
+ | |||
+ | Let us calculate the value of pi by means of the following Python script '' | ||
+ | |||
+ | <code python> | ||
+ | from operator import add | ||
+ | from random import random | ||
+ | import sys | ||
+ | |||
+ | from pyspark.sql import SparkSession | ||
+ | |||
if __name__ == " | if __name__ == " | ||
""" | """ | ||
Line 25: | Line 57: | ||
.appName(" | .appName(" | ||
.getOrCreate() | .getOrCreate() | ||
+ | |||
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 | partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 | ||
n = 100000 * partitions | n = 100000 * partitions | ||
+ | |||
def f(_): | def f(_): | ||
x = random() * 2 - 1 | x = random() * 2 - 1 | ||
y = random() * 2 - 1 | y = random() * 2 - 1 | ||
return 1 if x ** 2 + y ** 2 <= 1 else 0 | return 1 if x ** 2 + y ** 2 <= 1 else 0 | ||
- | + | ||
- | count = spark.sparkContext.parallelize(range(1, | + | count = spark.sparkContext\ |
- | | + | |
- | + | | |
- | spark.stop()</ | + | |
+ | | ||
+ | print(f" | ||
+ | |||
+ | spark.stop() | ||
+ | </ | ||
The idea of the calculation is to check (very often) whether a random point in the area [-1,-1; 1,1] is within a unit circle (i.e. has a distance less than 1 from origin). Since we know formulas for the area of the square as well as the circle we can estimate pi. | The idea of the calculation is to check (very often) whether a random point in the area [-1,-1; 1,1] is within a unit circle (i.e. has a distance less than 1 from origin). Since we know formulas for the area of the square as well as the circle we can estimate pi. | ||
- | In order to submit a batch job to the queuing system | + | Assuming that the script is saved to '' |
- | < | + | < |
#!/bin/bash | #!/bin/bash | ||
#SBATCH --nodes=1 | #SBATCH --nodes=1 | ||
Line 49: | Line 86: | ||
#SBATCH --error=err_spark-yarn-pi-%A | #SBATCH --error=err_spark-yarn-pi-%A | ||
#SBATCH --output=out_spark-yarn-pi-%A | #SBATCH --output=out_spark-yarn-pi-%A | ||
+ | #SBATCH --partition=skylake_0096 | ||
+ | #SBATCH --qos=skylake_0096 | ||
module purge | module purge | ||
- | module load python/3.8.0-gcc-9.1.0-wkjbtaa | + | |
- | module load openjdk/11.0.2-gcc-9.1.0-ayy5f5t | + | # you can choose one of the following python versions |
+ | # with the current hadoop/ | ||
+ | # note: the current spark version does NOT work with python 3.11 | ||
+ | module load python/3.10.7-gcc-12.2.0-5a2kkeu | ||
+ | # module load python/3.9.13-gcc-12.2.0-ctxezzj | ||
+ | # module load python/ | ||
+ | |||
+ | module load openjdk | ||
module load hadoop | module load hadoop | ||
module load spark | module load spark | ||
+ | |||
+ | export PDSH_RCMD_TYPE=ssh | ||
prolog_create_key.sh | prolog_create_key.sh | ||
Line 60: | Line 108: | ||
. vsc_start_spark.sh | . vsc_start_spark.sh | ||
- | export SPARK_EXAMPLES=${HOME}/ | + | spark-submit --master yarn --deploy-mode client --num-executors 140 \ |
+ | --executor-memory 2G $HOME/pi.py 1000 | ||
- | spark-submit --master yarn --deploy-mode client --num-executors 140 --executor-memory 2G $SPARK_EXAMPLES/ | + | . vsc_stop_spark.sh |
+ | . vsc_stop_hadoop.sh | ||
epilog_discard_key.sh | epilog_discard_key.sh | ||
Line 68: | Line 118: | ||
In this slurm script we have | In this slurm script we have | ||
- | * slurm commands, starting with #SBATCH to set the job name, the maximum execution time and where the output files will go, | + | * slurm commands, starting with #SBATCH to set the job name, the maximum execution time and where the output files will go, as well as the slurm qos and partition that should be used, |
* module commands, loading all the modules which are required by our job: python, Java (Hadoop is written in Java), Hadoop, and Spark, | * module commands, loading all the modules which are required by our job: python, Java (Hadoop is written in Java), Hadoop, and Spark, | ||
* setup scripts which create temporary ssh keys, and start Hadoop and Spark services in user context on the nodes of the job, | * setup scripts which create temporary ssh keys, and start Hadoop and Spark services in user context on the nodes of the job, | ||
Line 74: | Line 124: | ||
* a script to discard the temporary ssh keys. | * a script to discard the temporary ssh keys. | ||
- | === References === | + | The script is submitted to the slurm scheduler by executing: |
+ | < | ||
+ | $ sbatch pi.slrm | ||
+ | </ | ||
+ | ==== MapReduce: Concepts and example ==== | ||
+ | |||
+ | MapReduce splits work into the phases | ||
+ | * map: a subset of the input data is processed by a single process, e.g. filtering, | ||
+ | * shuffle: sort the output of the map phase by its key, | ||
+ | * reduce: combine the sorted data | ||
+ | |||
+ | An example is | ||
+ | |||
+ | <code bash> | ||
+ | #!/bin/bash | ||
+ | #SBATCH --job-name=simple_mapreduce | ||
+ | #SBATCH --nodes=1 --time=00: | ||
+ | #SBATCH --error=simple_mapreduce_err | ||
+ | #SBATCH --output=simple_mapreduce_out | ||
+ | |||
+ | module purge | ||
+ | module load openjdk | ||
+ | module load hadoop | ||
+ | |||
+ | export PDSH_RCMD_TYPE=ssh | ||
+ | |||
+ | prolog_create_key.sh | ||
+ | . vsc_start_hadoop.sh | ||
+ | |||
+ | hdfs dfs -mkdir input | ||
+ | hdfs dfs -put data/ | ||
+ | hdfs dfs -rm -r tmp_out | ||
+ | |||
+ | mapred streaming -D mapreduce.job.maps=4 -input input/ | ||
+ | -output tmp_out -mapper /bin/cat -reducer '/ | ||
+ | |||
+ | # check output in simple_mapreduce_out and simple_mapreduce_err | ||
+ | # check job output on HDFS | ||
+ | hdfs dfs -ls tmp_out | ||
+ | echo "The number of lines is:" | ||
+ | hdfs dfs -cat tmp_out/ | ||
+ | |||
+ | epilog_discard_key.sh | ||
+ | </ | ||
+ | |||
+ | |||
+ | ==== Further examples ==== | ||
+ | |||
+ | For examples in Scala, R, and SQL, including slurm scripts to run on VSC, we want to refer to the course material in [[ https:// | ||
+ | |||
+ | ==== Available modules ==== | ||
+ | |||
+ | To use Big Data on VSC use those modules: | ||
+ | |||
+ | * python (note: the current spark version does NOT work with python 3.11) | ||
+ | * python/ | ||
+ | * python/ | ||
+ | * python/ | ||
+ | * openjdk/ | ||
+ | * hadoop (hadoop/ | ||
+ | * spark (spark/ | ||
+ | * r | ||
+ | |||
+ | |||
+ | Depending on the application, | ||
+ | * MapReduce => openjdk + hadoop | ||
+ | * Scala => openjdk + hadoop + spark | ||
+ | * Java => openjdk + hadoop + spark | ||
+ | * PySpark => openjdk + hadoop + spark + python | ||
+ | * R => openjdk + hadoop + spark + r | ||
+ | |||
+ | ==== Background ==== | ||
+ | |||
+ | What is VSC actually doing to run a Big Data job using Hadoop and/or Spark? | ||
+ | |||
+ | == HPC == | ||
+ | |||
+ | User jobs on VSC are submitted using Slurm, thus making sure that a job gets a well defined execution environment which is similar in terms of resources and performance for each similar job execution. | ||
+ | |||
+ | == Big Data == | ||
+ | |||
+ | Big Data jobs on the other hand are often executed on Big Data clusters, which make sure that | ||
+ | * programs are started on the nodes where data is stored, thus reducing communication overhead, | ||
+ | * load balancing is done automatically, | ||
+ | * fault tolerance is added by the framework, | ||
+ | * very good scaling behaviour is ensured without any changes in user programs. | ||
+ | Much of this work is done by Yarn (Yet another resource negotiator), | ||
+ | |||
+ | == Combination of HPC and Big Data == | ||
+ | |||
+ | In our setup on VSC we combine those two worlds of HPC and Big data by starting Yarn, HDFS and spark in user context within a job (on the nodes belonging to the job). | ||
+ | User data can be accessed | ||
+ | * from the standard VSC paths using Spectrum Scale (formerly GPFS), or | ||
+ | * from HDFS, which is started locally on the local SSDs of the compute nodes. In this case the data has to be copied in and/or out at the beginning/ | ||
+ | |||
+ | VSC has several scripts - most are optional - which are added to the path using ''' | ||
+ | * prolog_create_key.sh and epilog_discard_key.sh, | ||
+ | * vsc_start_hadoop.sh, | ||
+ | * vsc_start_spark.sh, | ||
+ | * vsc_stop_hadoop.sh, | ||
+ | * vsc_stop_spark.sh, | ||
+ | |||
+ | ==== References ==== | ||
+ | |||
+ | |||
+ | * [[ https:// | ||
*[[http:// | *[[http:// | ||
is here to stay]]: Talk at Austrian HPC Meeting 2020 | is here to stay]]: Talk at Austrian HPC Meeting 2020 | ||
* {{ : | * {{ : | ||
- | * [[ https:// | ||
- | ===== Hadoop | + | ==== Hadoop |
At the Technische Universität Wien there is a [[doku:lbd | Big Data cluster]], mainly for teaching purposes, which is also used by researchers. | At the Technische Universität Wien there is a [[doku:lbd | Big Data cluster]], mainly for teaching purposes, which is also used by researchers. | ||