Big Data

VSC has Big Data modules for running

  • MapReduce jobs, and
  • Spark jobs using
    • Python,
    • Java,
    • Scala,
    • R,
    • SQL

either on standard file systems (using Spectrum Scale, formerly GPFS) or HDFS (Hadoop distributed file system) locally on the nodes of a job.

Among the advantages of Big Data also on HPC environments like VSC, we want to mention

  • very easy coding, also in data centric languages like R or SQL,
  • automatic parallelization without any changes in the user program,
  • extremely good scaling behaviour.

Users have to be aware of difficulties for running Big Data jobs on HPC clusters, namely of shared resources and granularity.

  • File systems are shared by all users. Users with high file system demand should use the local HDFS.
  • Login nodes and internet connections are shared by all users. Bringing huge amounts of data to VSC takes some time. Please make sure that other users of VSC are not affected!
  • Only very coarse grained parallelization will work with Big Data frameworks. Typically parallelization is done by partitioning input data and doing the majority of the calculation independently by separated processes.

In the context of high performance computing a typical application of Big Data methods is pre- and/or postprocessing of large amounts of data, e.g. filtering.

Frameworks and applications using Big Data methods are shared amoung many scientific communities.

There are many frameworks in Data Science that are using <html><span style=“color:#cc3300;font-size:100%;”>&dzigrarr;</span> </html> Apache Spark<html>TM</html> , for instance, ADAM for genomic analysis, GeoTrellis for geospatial data analysis, or Koalas (distributed Pandas).

Let us calculate the value of pi by means of the following Python script <html><span style=“color:#cc3300;font-size:100%;”>&dzigrarr;</span> </html> (from the Spark distribution):

from __future__ import print_function

import sys
from random import random
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
        Usage: pi [partitions]
    spark = SparkSession\

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))


The idea of the calculation is to check (very often) whether a random point in the area [-1,-1; 1,1] is within a unit circle (i.e. has a distance less than 1 from origin). Since we know formulas for the area of the square as well as the circle we can estimate pi.

In order to submit a batch job to the queuing system we use the SLURM script <html><span style=“color:#cc3300;font-size:100%;”>&dzigrarr;</span> </html> pi.slrm:

#SBATCH --nodes=1
#SBATCH --job-name=spark-yarn-pi
#SBATCH --time=00:10:00
#SBATCH --error=err_spark-yarn-pi-%A
#SBATCH --output=out_spark-yarn-pi-%A

module purge
module load python/3.8.0-gcc-9.1.0-wkjbtaa
module load openjdk/11.0.2-gcc-9.1.0-ayy5f5t
module load hadoop
module load spark

export SPARK_EXAMPLES=${HOME}/BigData/SparkDistributionExamples/

spark-submit --master yarn --deploy-mode client --num-executors 140 --executor-memory 2G $SPARK_EXAMPLES/src/main/python/ 1000

In this slurm script we have

  • slurm commands, starting with #SBATCH to set the job name, the maximum execution time and where the output files will go,
  • module commands, loading all the modules which are required by our job: python, Java (Hadoop is written in Java), Hadoop, and Spark,
  • setup scripts which create temporary ssh keys, and start Hadoop and Spark services in user context on the nodes of the job,
  • commands to start our job with 'spark-submit', and
  • a script to discard the temporary ssh keys.


What is VSC actually doing to run a Big Data job using Hadoop and/or Spark?

User jobs on VSC are submitted using Slurm, thus making sure that a job gets a well defined execution environment which is similar in terms of resources and performance for each similar job execution.

Big Data jobs are often executed on Big Data clusters, which make sure that

  • programs are started on the nodes where data is stored, thus reducing communication overhead,
  • load balancing is done automatically,
  • fault tolerance is added by the framework,
  • very good scaling behaviour is ensured without any changes in user programs.

Much of this work is done by Yarn (Yet another resource negotiator), which is a scheduler (and runs usually at a similar level on Big Data clusters as Slurm is running on VSC).

In our setup on VSC we combine those two worlds of HPC and Big data by starting Yarn and other services in user context within a job on the nodes belonging to the job. The other services are HDFS (Hadoop distributed file system) and Spark. User data can be accessed

  • from the standard VSC paths using Spectrum Scale (formerly GPFS), or
  • from HDFS, which is started locally on the local SSDs of the compute nodes. In this case the data has to be copied in and/or out before using it.

VSC has several scripts, most are optional, which are added to the path using 'module load hadoop' and 'module load spark':

  • and, which are required to start services on other nodes, but also to access the local (master) node.
  •, which starts Yarn (scheduler) and HDFS (distributed file system) on the nodes of the job. (This is optional, since Spark can also run with its own scheduler and without HDFS.)
  •, which is usually not required since the services are stopped at the end by the system anyway.
  •, which starts the Spark service on the nodes of the job. (This is optional, since MapReduce jobs run without Spark.)
  •, which is usually not required since the services are stopped at the end by the system anyway.


Hadoop Ecosystem

At the Technische Universität Wien there is a Big Data cluster, mainly for teaching purposes, which is also used by researchers.

