Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
doku:bigdata [2021/05/27 07:11] dieter |
doku:bigdata [2023/05/11 08:52] (current) katrin |
||
---|---|---|---|
Line 1: | Line 1: | ||
====== Big Data ====== | ====== Big Data ====== | ||
- | ===== Apache | + | VSC-4 has Big Data modules for running |
+ | * MapReduce jobs | ||
+ | * in several programming languages, including Java, and | ||
+ | * using executables which can stream data from standard input to standard output (e.g. grep, cut, tr, sed, ...), and | ||
+ | * Spark jobs using | ||
+ | * Python, | ||
+ | * Java, | ||
+ | * Scala, | ||
+ | * R, | ||
+ | * SQL | ||
+ | either | ||
- | There are many frameworks in Data Science that are using < | + | ==== Big Data and HPC ==== |
- | ==== Example SLURM & Python scripts ==== | + | Among the advantages of Big Data also on HPC environments like VSC, we want to mention |
+ | * very easy coding, in many languages, including data centric ones like R or SQL, | ||
+ | * automatic parallelization without any changes in the user program, and | ||
+ | * extremely good scaling behaviour. | ||
- | Let us calculate | + | Users have to be aware of difficulties for running Big Data jobs on HPC clusters, namely of shared resources and granularity. |
- | < | + | * File systems are shared by all users. Users with high file system demand should use the local HDFS, if the same data is read more than once in a job. Please, do not submit many Big Data jobs at the same time, e.g. slurm array jobs! |
- | import sys | + | * Login nodes and internet connections are shared by all users. Bringing huge amounts |
- | from random import random | + | * Only very coarse grained parallelization will work with Big Data frameworks. Typically parallelization is done by partitioning input data (automatically) and doing the majority |
+ | |||
+ | In the context of high performance computing a typical application of Big Data methods is pre- and/or postprocessing of large amounts of data, e.g. | ||
+ | * filtering, | ||
+ | * data cleaning, | ||
+ | * discarding input fields which are not required, | ||
+ | * counting, | ||
+ | * ... | ||
+ | |||
+ | Frameworks and applications using Big Data methods are shared among many scientific communities. | ||
+ | |||
+ | ==== Apache Spark on VSC ==== | ||
+ | |||
+ | There are many frameworks in Data Science that are using < | ||
+ | |||
+ | === Example slurm & python scripts === | ||
+ | |||
+ | Let us calculate the value of pi by means of the following Python script | ||
+ | |||
+ | < | ||
from operator import add | from operator import add | ||
+ | from random import random | ||
+ | import sys | ||
+ | |||
from pyspark.sql import SparkSession | from pyspark.sql import SparkSession | ||
- | if __name__ == " | + | |
- | .builder\ .appName(" | + | if __name__ == " |
- | partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions | + | """ |
- | def f(_): | + | Usage: pi [partitions] |
- | x = random() * 2 - 1 | + | """ |
- | y = random() * 2 – 1 | + | |
- | return 1 if x ** 2 + y ** 2 <= 1 else 0 | + | .builder\ |
- | count = spark.sparkContext.parallelize(range(1, | + | |
- | print(" | + | |
+ | |||
+ | | ||
+ | | ||
+ | |||
+ | | ||
+ | x = random() * 2 - 1 | ||
+ | y = random() * 2 - 1 | ||
+ | return 1 if x ** 2 + y ** 2 <= 1 else 0 | ||
+ | |||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | pi = 4.0 * count / n | ||
+ | print(f" | ||
+ | |||
+ | | ||
</ | </ | ||
The idea of the calculation is to check (very often) whether a random point in the area [-1,-1; 1,1] is within a unit circle (i.e. has a distance less than 1 from origin). Since we know formulas for the area of the square as well as the circle we can estimate pi. | The idea of the calculation is to check (very often) whether a random point in the area [-1,-1; 1,1] is within a unit circle (i.e. has a distance less than 1 from origin). Since we know formulas for the area of the square as well as the circle we can estimate pi. | ||
- | In order to submit a batch job to the queuing system | + | Assuming that the script is saved to '' |
- | < | + | < |
#!/bin/bash | #!/bin/bash | ||
#SBATCH --nodes=1 | #SBATCH --nodes=1 | ||
Line 34: | Line 86: | ||
#SBATCH --error=err_spark-yarn-pi-%A | #SBATCH --error=err_spark-yarn-pi-%A | ||
#SBATCH --output=out_spark-yarn-pi-%A | #SBATCH --output=out_spark-yarn-pi-%A | ||
+ | #SBATCH --partition=skylake_0096 | ||
+ | #SBATCH --qos=skylake_0096 | ||
module purge | module purge | ||
- | module load python/3.8.0-gcc-9.1.0-wkjbtaa | + | |
- | module load openjdk/11.0.2-gcc-9.1.0-ayy5f5t | + | # you can choose one of the following python versions |
+ | # with the current hadoop/ | ||
+ | # note: the current spark version does NOT work with python 3.11 | ||
+ | module load python/3.10.7-gcc-12.2.0-5a2kkeu | ||
+ | # module load python/3.9.13-gcc-12.2.0-ctxezzj | ||
+ | # module load python/ | ||
+ | |||
+ | module load openjdk | ||
module load hadoop | module load hadoop | ||
module load spark | module load spark | ||
+ | |||
+ | export PDSH_RCMD_TYPE=ssh | ||
prolog_create_key.sh | prolog_create_key.sh | ||
Line 45: | Line 108: | ||
. vsc_start_spark.sh | . vsc_start_spark.sh | ||
- | export SPARK_EXAMPLES=${HOME}/ | + | spark-submit --master yarn --deploy-mode client --num-executors 140 \ |
+ | --executor-memory 2G $HOME/pi.py 1000 | ||
- | spark-submit --master yarn --deploy-mode client --num-executors 140 --executor-memory 2G $SPARK_EXAMPLES/ | + | . vsc_stop_spark.sh |
+ | . vsc_stop_hadoop.sh | ||
epilog_discard_key.sh | epilog_discard_key.sh | ||
Line 53: | Line 118: | ||
In this slurm script we have | In this slurm script we have | ||
- | * slurm commands, starting with #SBATCH to set the job name, the maximum execution time and where the output files will go, | + | * slurm commands, starting with #SBATCH to set the job name, the maximum execution time and where the output files will go, as well as the slurm qos and partition that should be used, |
* module commands, loading all the modules which are required by our job: python, Java (Hadoop is written in Java), Hadoop, and Spark, | * module commands, loading all the modules which are required by our job: python, Java (Hadoop is written in Java), Hadoop, and Spark, | ||
* setup scripts which create temporary ssh keys, and start Hadoop and Spark services in user context on the nodes of the job, | * setup scripts which create temporary ssh keys, and start Hadoop and Spark services in user context on the nodes of the job, | ||
Line 59: | Line 124: | ||
* a script to discard the temporary ssh keys. | * a script to discard the temporary ssh keys. | ||
- | === References === | + | The script is submitted to the slurm scheduler by executing: |
+ | < | ||
+ | $ sbatch pi.slrm | ||
+ | </ | ||
+ | ==== MapReduce: Concepts and example ==== | ||
+ | |||
+ | MapReduce splits work into the phases | ||
+ | * map: a subset of the input data is processed by a single process, e.g. filtering, | ||
+ | * shuffle: sort the output of the map phase by its key, | ||
+ | * reduce: combine the sorted data | ||
+ | |||
+ | An example is | ||
+ | |||
+ | <code bash> | ||
+ | #!/bin/bash | ||
+ | #SBATCH --job-name=simple_mapreduce | ||
+ | #SBATCH --nodes=1 --time=00: | ||
+ | #SBATCH --error=simple_mapreduce_err | ||
+ | #SBATCH --output=simple_mapreduce_out | ||
+ | |||
+ | module purge | ||
+ | module load openjdk | ||
+ | module load hadoop | ||
+ | |||
+ | export PDSH_RCMD_TYPE=ssh | ||
+ | |||
+ | prolog_create_key.sh | ||
+ | . vsc_start_hadoop.sh | ||
+ | |||
+ | hdfs dfs -mkdir input | ||
+ | hdfs dfs -put data/ | ||
+ | hdfs dfs -rm -r tmp_out | ||
+ | |||
+ | mapred streaming -D mapreduce.job.maps=4 -input input/ | ||
+ | -output tmp_out -mapper /bin/cat -reducer '/ | ||
+ | |||
+ | # check output in simple_mapreduce_out and simple_mapreduce_err | ||
+ | # check job output on HDFS | ||
+ | hdfs dfs -ls tmp_out | ||
+ | echo "The number of lines is:" | ||
+ | hdfs dfs -cat tmp_out/ | ||
+ | |||
+ | epilog_discard_key.sh | ||
+ | </ | ||
+ | |||
+ | |||
+ | ==== Further examples ==== | ||
+ | |||
+ | For examples in Scala, R, and SQL, including slurm scripts to run on VSC, we want to refer to the course material in [[ https:// | ||
+ | |||
+ | ==== Available modules ==== | ||
+ | |||
+ | To use Big Data on VSC use those modules: | ||
+ | |||
+ | * python (note: the current spark version does NOT work with python 3.11) | ||
+ | * python/ | ||
+ | * python/ | ||
+ | * python/ | ||
+ | * openjdk/ | ||
+ | * hadoop (hadoop/ | ||
+ | * spark (spark/ | ||
+ | * r | ||
+ | |||
+ | |||
+ | Depending on the application, | ||
+ | * MapReduce => openjdk + hadoop | ||
+ | * Scala => openjdk + hadoop + spark | ||
+ | * Java => openjdk + hadoop + spark | ||
+ | * PySpark => openjdk + hadoop + spark + python | ||
+ | * R => openjdk + hadoop + spark + r | ||
+ | |||
+ | ==== Background ==== | ||
+ | |||
+ | What is VSC actually doing to run a Big Data job using Hadoop and/or Spark? | ||
+ | |||
+ | == HPC == | ||
+ | |||
+ | User jobs on VSC are submitted using Slurm, thus making sure that a job gets a well defined execution environment which is similar in terms of resources and performance for each similar job execution. | ||
+ | |||
+ | == Big Data == | ||
+ | |||
+ | Big Data jobs on the other hand are often executed on Big Data clusters, which make sure that | ||
+ | * programs are started on the nodes where data is stored, thus reducing communication overhead, | ||
+ | * load balancing is done automatically, | ||
+ | * fault tolerance is added by the framework, | ||
+ | * very good scaling behaviour is ensured without any changes in user programs. | ||
+ | Much of this work is done by Yarn (Yet another resource negotiator), | ||
+ | |||
+ | == Combination of HPC and Big Data == | ||
+ | |||
+ | In our setup on VSC we combine those two worlds of HPC and Big data by starting Yarn, HDFS and spark in user context within a job (on the nodes belonging to the job). | ||
+ | User data can be accessed | ||
+ | * from the standard VSC paths using Spectrum Scale (formerly GPFS), or | ||
+ | * from HDFS, which is started locally on the local SSDs of the compute nodes. In this case the data has to be copied in and/or out at the beginning/ | ||
+ | |||
+ | VSC has several scripts - most are optional - which are added to the path using ''' | ||
+ | * prolog_create_key.sh and epilog_discard_key.sh, | ||
+ | * vsc_start_hadoop.sh, | ||
+ | * vsc_start_spark.sh, | ||
+ | * vsc_stop_hadoop.sh, | ||
+ | * vsc_stop_spark.sh, | ||
+ | |||
+ | ==== References ==== | ||
+ | |||
+ | |||
+ | * [[ https:// | ||
*[[http:// | *[[http:// | ||
is here to stay]]: Talk at Austrian HPC Meeting 2020 | is here to stay]]: Talk at Austrian HPC Meeting 2020 | ||
* {{ : | * {{ : | ||
- | * [[ https:// | ||
- | ===== Hadoop | + | ==== Hadoop |
At the Technische Universität Wien there is a [[doku:lbd | Big Data cluster]], mainly for teaching purposes, which is also used by researchers. | At the Technische Universität Wien there is a [[doku:lbd | Big Data cluster]], mainly for teaching purposes, which is also used by researchers. | ||