Table of Contents

Big Data

VSC-4 has Big Data modules for running

either on standard file systems (using Spectrum Scale, formerly GPFS) or HDFS (Hadoop distributed file system) locally on the nodes of a job.

Big Data and HPC

Among the advantages of Big Data also on HPC environments like VSC, we want to mention

Users have to be aware of difficulties for running Big Data jobs on HPC clusters, namely of shared resources and granularity.

In the context of high performance computing a typical application of Big Data methods is pre- and/or postprocessing of large amounts of data, e.g.

Frameworks and applications using Big Data methods are shared among many scientific communities.

Apache Spark on VSC

There are many frameworks in Data Science that are using <html><span style=“color:#cc3300;font-size:100%;”>&dzigrarr;</span> </html> Apache Spark<html>TM</html> , for instance, ADAM for genomic analysis, GeoTrellis for geospatial data analysis, or Koalas (distributed Pandas).

Example slurm & python scripts

Let us calculate the value of pi by means of the following Python script pi.py (example originally taken from the Spark distribution):

from operator import add
from random import random
import sys
 
from pyspark.sql import SparkSession
 
if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()
 
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions
 
    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0
 
    count = spark.sparkContext\
        .parallelize(range(1, n + 1), partitions)\
        .map(f)\
        .reduce(add)
    pi = 4.0 * count / n
    print(f"Pi is roughly {pi}")
 
    spark.stop()

The idea of the calculation is to check (very often) whether a random point in the area [-1,-1; 1,1] is within a unit circle (i.e. has a distance less than 1 from origin). Since we know formulas for the area of the square as well as the circle we can estimate pi.

Assuming that the script is saved to $HOME/pi.py' we can use the following SLURM script pi.slrm to run the code on VSC-4:

#!/bin/bash
#SBATCH --nodes=1
#SBATCH --job-name=spark-yarn-pi
#SBATCH --time=00:10:00
#SBATCH --error=err_spark-yarn-pi-%A
#SBATCH --output=out_spark-yarn-pi-%A
#SBATCH --partition=skylake_0096
#SBATCH --qos=skylake_0096
 
module purge
 
# you can choose one of the following python versions
# with the current hadoop/spark installation on VSC-4
# note: the current spark version does NOT work with python 3.11
module load python/3.10.7-gcc-12.2.0-5a2kkeu
# module load python/3.9.13-gcc-12.2.0-ctxezzj
# module load python/3.8.12-gcc-12.2.0-tr7w5qy
 
module load openjdk
module load hadoop
module load spark
 
export PDSH_RCMD_TYPE=ssh
 
prolog_create_key.sh
. vsc_start_hadoop.sh
. vsc_start_spark.sh
 
spark-submit --master yarn --deploy-mode client --num-executors 140 \
      --executor-memory 2G $HOME/pi.py 1000
 
. vsc_stop_spark.sh
. vsc_stop_hadoop.sh
 
epilog_discard_key.sh

In this slurm script we have

The script is submitted to the slurm scheduler by executing:

$ sbatch pi.slrm

MapReduce: Concepts and example

MapReduce splits work into the phases

An example is

#!/bin/bash
#SBATCH --job-name=simple_mapreduce
#SBATCH --nodes=1 --time=00:10:00
#SBATCH --error=simple_mapreduce_err
#SBATCH --output=simple_mapreduce_out
 
module purge
module load openjdk
module load hadoop
 
export PDSH_RCMD_TYPE=ssh
 
prolog_create_key.sh
. vsc_start_hadoop.sh
 
hdfs dfs -mkdir input
hdfs dfs -put data/wiki\_sample\_2400lines input/
hdfs dfs -rm -r tmp_out
 
mapred streaming -D mapreduce.job.maps=4 -input input/wiki_sample_2400lines \
    -output tmp_out -mapper /bin/cat -reducer '/bin/wc -l'
 
# check output in simple_mapreduce_out and simple_mapreduce_err
# check job output on HDFS
hdfs dfs -ls tmp_out
echo "The number of lines is:"
hdfs dfs -cat tmp_out/part-*
 
epilog_discard_key.sh

Further examples

For examples in Scala, R, and SQL, including slurm scripts to run on VSC, we want to refer to the course material in Big Data on VSC.

Available modules

To use Big Data on VSC use those modules:

Depending on the application, choose the right combination:

Background

What is VSC actually doing to run a Big Data job using Hadoop and/or Spark?

HPC

User jobs on VSC are submitted using Slurm, thus making sure that a job gets a well defined execution environment which is similar in terms of resources and performance for each similar job execution.

Big Data

Big Data jobs on the other hand are often executed on Big Data clusters, which make sure that

Much of this work is done by Yarn (Yet another resource negotiator), which is a scheduler (and runs usually at a similar level on Big Data clusters as Slurm is running on VSC).

Combination of HPC and Big Data

In our setup on VSC we combine those two worlds of HPC and Big data by starting Yarn, HDFS and spark in user context within a job (on the nodes belonging to the job). User data can be accessed

VSC has several scripts - most are optional - which are added to the path using 'module load hadoop' and 'module load spark':

References

Hadoop at the TU Wien

At the Technische Universität Wien there is a Big Data cluster, mainly for teaching purposes, which is also used by researchers.