Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
doku:bigdata [2020/04/24 10:42] – [Jupyter Notebook] irdoku:bigdata [2023/05/11 08:52] (current) katrin
Line 1: Line 1:
 ====== Big Data ====== ====== Big Data ======
  
-===== Apache Spark on VSC-3 =====+VSC-4 has Big Data modules for running  
 +  * MapReduce jobs 
 +    * in several programming languages, including Java, and  
 +    * using executables which can stream data from standard input to standard output (e.g. grep, cut, tr, sed, ...), and 
 +  * Spark jobs using  
 +    * Python, 
 +    * Java, 
 +    * Scala, 
 +    * R, 
 +    * SQL 
 +either on standard file systems (using Spectrum Scale, formerly GPFS) or HDFS (Hadoop distributed file system) locally on the nodes of a job.
  
-There are many frameworks in Data Science that are using <html><span style="color:#cc3300;font-size:100%;">&dzigrarr;</span> </html> [[https://spark.apache.org|Apache Spark]]<html><sup>TM</sup></html> , for instance, [[http://bdgenomics.org|ADAM]] for genomic analysis, [[https://geotrellis.io|GeoTrellis]] for geospatial data analysis, or [[https://koalas.readthedocs.io/en/latest/#|Koalas]] (distributed Pandas). +==== Big Data and HPC ==== 
-==== Example SLURM Python scripts ==== + 
-Let us calculate the value of pi by means of the following Python script <html><span style="color:#cc3300;font-size:100%;">&dzigrarr;</span> </html> ''pi.py'': +Among the advantages of Big Data also on HPC environments like VSC, we want to mention 
-<code> +  * very easy coding, in many languages, including data centric ones like R or SQL, 
-import sys +  * automatic parallelization without any changes in the user program, and 
-from random import random+  * extremely good scaling behaviour. 
 + 
 +Users have to be aware of difficulties for running Big Data jobs on HPC clusters, namely of shared resources and granularity. 
 +  * File systems are shared by all users. Users with high file system demand should use the local HDFS, if the same data is read more than once in a job. Please, do not submit many Big Data jobs at the same time, e.g. slurm array jobs! 
 +  * Login nodes and internet connections are shared by all users. Bringing huge amounts of data to VSC takes some time. Please make sure that other users of VSC are not affected! (One or two ssh-connections from your workstation are OK, whereas 10 simultaneous connections from a computer with high internet bandwidth, i.e. at least 10Mb/s, could affect others.) 
 +  * Only very coarse grained parallelization will work with Big Data frameworks. Typically parallelization is done by partitioning input data (automatically) and doing the majority of the calculation independently by separated processes.  
 + 
 +In the context of high performance computing a typical application of Big Data methods is pre- and/or postprocessing of large amounts of data, e.g.  
 +  * filtering, 
 +  * data cleaning, 
 +  * discarding input fields which are not required, 
 +  * counting, 
 +  * ... 
 + 
 +Frameworks and applications using Big Data methods are shared among many scientific communities. 
 + 
 +==== Apache Spark on VSC ==== 
 + 
 +There are many frameworks in Data Science that are using <html><span style="color:#cc3300;font-size:100%;">&dzigrarr;</span> </html> [[https://spark.apache.org|Apache Spark]]<html><sup>TM</sup></html> , for instance, [[https://adam.readthedocs.io|ADAM]] for genomic analysis, [[https://geotrellis.io|GeoTrellis]] for geospatial data analysis, or [[https://koalas.readthedocs.io/en/latest/#|Koalas]] (distributed Pandas). 
 + 
 +=== Example slurm python scripts === 
 + 
 +Let us calculate the value of pi by means of the following Python script ''pi.py'' (example originally taken from the Spark distribution): 
 + 
 +<code python>
 from operator import add from operator import add
 +from random import random
 +import sys
 + 
 from pyspark.sql import SparkSession from pyspark.sql import SparkSession
-if __name__ == "__main__": spark = SparkSession\ +  
-.builder\ .appName("PythonPi")\ .getOrCreate() +if __name__ == "__main__": 
-partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions +    """ 
-def f(_): +        Usage: pi [partitions] 
-x = random() * 2 - 1 +    """ 
-y = random() * 2 – +    spark = SparkSession\ 
-return 1 if x ** 2 + y ** 2 <= 1 else 0 +        .builder\ 
-count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) +        .appName("PythonPi")\ 
-print("Pi is roughly %f" % (4.0 * count / n)spark.stop()+        .getOrCreate() 
 +  
 +    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 
 +    n = 100000 * partitions 
 +  
 +    def f(_): 
 +        x = random() * 2 - 1 
 +        y = random() * 2 
 +        return 1 if x ** 2 + y ** 2 <= 1 else 0 
 +  
 +    count = spark.sparkContext
 +        .parallelize(range(1, n + 1), partitions)
 +        .map(f)
 +        .reduce(add) 
 +    pi = 4.0 * count / n 
 +    print(f"Pi is roughly {pi}") 
 +  
 +    spark.stop()
 </code> </code>
 +The idea of the calculation is to check (very often) whether a random point in the area [-1,-1; 1,1] is within a unit circle (i.e. has a distance less than 1 from origin). Since we know formulas for the area of the square as well as the circle we can estimate pi.
  
-In order to submit a batch job to the queuing system we take advantage of the SLURM script below <html><span style="color:#cc3300;font-size:100%;">&dzigrarr;</span> </html> ''pi.slrm'':+Assuming that the script is saved to ''$HOME/pi.py''' we can use the following SLURM script ''pi.slrm'' to run the code on VSC-4:
  
-<code>+<code bash>
 #!/bin/bash #!/bin/bash
-#SBATCH --job-name=spark-wordcount +#SBATCH --nodes=1 
-#SBATCH --nodes=4+#SBATCH --job-name=spark-yarn-pi
 #SBATCH --time=00:10:00 #SBATCH --time=00:10:00
-#SBATCH --partition=binf+#SBATCH --error=err_spark-yarn-pi-%A 
 +#SBATCH --output=out_spark-yarn-pi-%A 
 +#SBATCH --partition=skylake_0096 
 +#SBATCH --qos=skylake_0096 
 module purge module purge
-module load anaconda3 hadoop/2.7 + 
-apache_spark/2.4.4+# you can choose one of the following python versions 
 +# with the current hadoop/spark installation on VSC-4 
 +# note: the current spark version does NOT work with python 3.11 
 +module load python/3.10.7-gcc-12.2.0-5a2kkeu 
 +# module load python/3.9.13-gcc-12.2.0-ctxezzj 
 +# module load python/3.8.12-gcc-12.2.0-tr7w5qy 
 + 
 +module load openjdk 
 +module load hadoop 
 +module load spark 
 + 
 +export PDSH_RCMD_TYPE=ssh 
 + 
 +prolog_create_key.sh 
 +. vsc_start_hadoop.sh
 . vsc_start_spark.sh . vsc_start_spark.sh
-# runs on all nodes + 
-command="$SPARK_HOME/bin/spark-submit -- +spark-submit --master yarn --deploy-mode client --num-executors 140 \ 
-master +      --executor-memory 2G $HOME/pi.py 1000 
-spark://${SPARK_MASTER_HOST}:${SPARK_MASTER + 
-_PORT} --total-executor-cores $NUMCORES -- +. vsc_stop_spark.sh 
-executor-memory 2G $WDIR/pi.py 10000" +. vsc_stop_hadoop.sh 
-echo $command + 
-$command+epilog_discard_key.sh
 </code> </code>
  
-=== Conference Paper ===+In this slurm script we have  
 +  * slurm commands, starting with #SBATCH to set the job name, the maximum execution time and where the output files will go, as well as the slurm qos and partition that should be used, 
 +  * module commands, loading all the modules which are required by our job: python, Java (Hadoop is written in Java), Hadoop, and Spark, 
 +  * setup scripts which create temporary ssh keys, and start Hadoop and Spark services in user context on the nodes of the job, 
 +  * commands to start our job with '''spark-submit''', and 
 +  * a script to discard the temporary ssh keys.
  
-[[http://typo3.vsc.ac.at/fileadmin/user_upload/vsc/conferences/ahpc20/slides/244-Roda-AHPC20.pdf|Apache Spark]] <html><font color= blue><sup>TM</sup></font> </html>  +The script is submitted to the slurm scheduler by executing: 
-[[http://typo3.vsc.ac.at/fileadmin/user_upload/vsc/conferences/ahpc20/slides/244-Roda-AHPC20.pdf|is here to stay]]+<code> 
 +$ sbatch pi.slrm 
 +</code>
  
-===== Hadoop Ecosystem =====+==== MapReduce: Concepts and example ====
  
-==== "Little" Big Data (LBD) Cluster ====+MapReduce splits work into the phases 
 +  * map: a subset of the input data is processed by a single process, e.g. filtering, 
 +  * shuffle: sort the output of the map phase by its key, 
 +  * reduce: combine the sorted data
  
-=== Access === +An example is 
-  * **usage on request:** hadoop@tuwien.ac.at +
-  * **support:** hadoop-support@tuwien.ac.at.+
  
-{{:doku:lbd_cluster.jpg}}+<code bash> 
 +#!/bin/bash 
 +#SBATCH --job-name=simple_mapreduce 
 +#SBATCH --nodes=1 --time=00:10:00 
 +#SBATCH --error=simple_mapreduce_err 
 +#SBATCH --output=simple_mapreduce_out
  
-=== Hardware ===+module purge 
 +module load openjdk 
 +module load hadoop
  
-LBD has the following hardware setup: +export PDSH_RCMD_TYPE=ssh
-  * 2 namenodes (on c100: primary, on c101 secondary namenode) +
-  * 18 datanodes c101--c118 +
-  * an administrative server h1 as +
-    * Cloudera Manager server +
-    * backup of administrative data +
-  * a ZFS file server lbdnfs01 for /home with 300TB of storage space +
-The namenode c100 is also called lbd and it is reachable from within the tuwien domain under lbd.zserv.tuwien.ac.at. Each of the 20 nodes (c100-c118, h1) has +
-  * two XeonE5-2650v4 CPUs with 24 cores (total of 48 cores per node, 864 total worker cores) +
-  * 256GB RAM (total of 4.5TB memory available to worker nodes of the whole cluster) +
-  * four hard disks, each with a capacity of 4TB (total of 16TB per node, 288TB total for worker nodes) +
-Apart from two extra Ethernet devices for external connections on h1 and on c100, all nodes have the same hardware configurations. All ethernet connections (external and inter-node) support a speed of 10Gb/s.+
  
-=== HDFS configuration ===+prolog_create_key.sh 
 +. vsc_start_hadoop.sh
  
-  * current version: Hadoop 3 +hdfs dfs -mkdir input 
-  * block size: 128 MiB +hdfs dfs -put data/wiki\_sample\_2400lines input/ 
-  * default replication factor: 3+hdfs dfs -rm -r tmp_out
  
-----------------+mapred streaming -D mapreduce.job.maps=4 -input input/wiki_sample_2400lines \ 
 +    -output tmp_out -mapper /bin/cat -reducer '/bin/wc -l'
  
-==== Available software ====+# check output in simple_mapreduce_out and simple_mapreduce_err 
 +# check job output on HDFS 
 +hdfs dfs -ls tmp_out 
 +echo "The number of lines is:" 
 +hdfs dfs -cat tmp_out/part-*
  
-<html+epilog_discard_key.sh 
 +</code>
  
-<style> 
-table { 
-    border-collapse: collapse; 
-    width: 100%; 
-} 
  
-td, th { +==== Further examples ====
-    border: 1px solid #dddddd; +
-    text-align: left; +
-    padding: 8px; +
-}+
  
-tr:nth-child(even) { +For examples in Scala, R, and SQL, including slurm scripts to run on VSC, we want to refer to the course material in [[ https://vsc.ac.at/training/2021/BigData-Mar| Big Data on VSC]].
-    background-color: #dddddd; +
-+
-</style>+
  
-<table> +==== Available modules ====
-  <tr> +
-    <th>Name</th> +
-    <th>Status</th> +
-    <th>Kommentar</th> +
-  </tr> +
-  <tr> +
-    <td>Centos 7</td> +
-    <td>Betriebssystem</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>XCAT</td> +
-    <td>Deploymentumgebung</td> +
-    <td>OK</td> +
-  </tr> +
-  </tr> +
-    <tr> +
-    <td>Cloudera Manager</td> +
-    <td>Big Data Deployment</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Cloudera HDFS</td> +
-    <td>Hadoop distributed file system</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Cloudera Accumulo</td> +
-    <td>Key/value store</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Cloudera HBase</td> +
-    <td>Database on top of HDFS</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Cloudera Hive</td> +
-    <td>Data warehouse using SQL</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Cloudera Hue</td> +
-    <td>Hadoop user experience, web gui, SQL analytics workbench</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Cloudera Impala</td> +
-    <td>SQL query engine, used by Hue</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Oozie</td> +
-    <td>Oozie is a workflow scheduler system to manage Apache Hadoop jobs. Used by Hue</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Cloudera Solr</td> +
-    <td>open source enterprise search platform, used by Hue, used by Key-Value Store Indexer</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Cloudera Key-Value Store Indexer</td> +
-    <td>The Key-Value Store Indexer service uses the Lily HBase NRT Indexer to index the stream of records being added to HBase tables. Indexing allows you to query data stored in HBase with the Solr service.</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Cloudera Spark (Spark 2)</td> +
-    <td> cluster-computing framework mit Scala 2.10 (2.11)</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Cloudera YARN (MR2 Included)</td> +
-    <td>Yet Another Resource Negotiator (cluster management)</td> +
-    <td>OK</td> +
-  </tr> +
-  </tr> +
-    <tr> +
-    <td>Cloudera ZooKeeper</td> +
-    <td>ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Java 1.8</td> +
-    <td>Programmiersprache</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Python 3.6.3 (python3.6), Python 3.4.5 (python3.4) Python 2.7.5 (python2)</td> +
-    <td>Programmiersprache</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Anaconda Python (python)</td> +
-    <td>export PATH=/home/anaconda3/bin/:$PATH</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>Jupyter</td> +
-    <td>Notebook, benötigt anaconda</td> +
-    <td>OK</td> +
-  </tr> +
-  <tr> +
-    <td>MongoDB | </td> +
-    <td>benötigt Plattenplatz, nicht alle Knoten</td> +
-    <td>Beta testing</td> +
-  </tr>   +
-  <tr> +
-    <td>Kafka</td> +
-    <td>Verarbeitung von Datenströmen</td> +
-    <td>Beta testing</td> +
-  </tr> +
-  <tr> +
-    <td>Cassandra</td> +
-    <td>benötigt Plattenplatz, nicht alle Knoten</td> +
-    <td>TODO</td> +
-  </tr> +
-  <tr> +
-    <td>Storm</td> +
-    <td>Eher Spark Streaming?</td> +
-    <td>auf weitere Anfrage</td> +
-  </tr> +
-  <tr> +
-    <td>Drill</td> +
-    <td></td> +
-    <td>-</td> +
-  </tr> +
-  <tr> +
-    <td>Flume</td> +
-    <td></td> +
-    <td>-</td> +
-  </tr> +
-    <tr> +
-    <td>Kudu</td> +
-    <td></td> +
-    <td>-</td> +
-  </tr> +
-  <tr> +
-    <td>Zeppelin</td> +
-    <td></td> +
-    <td>-</td> +
-  </tr> +
-    <tr> +
-    <td>Giraph</td> +
-    <td></td> +
-    <td>TODO</td> +
-  </tr> +
-  </table>+
  
-</html>  +To use Big Data on VSC use those modules:
-  +
-  +
  
-===== Jupyter Notebook =====+  * python (note: the current spark version does NOT work with python 3.11) 
 +    * python/3.10.7-gcc-12.2.0-5a2kkeu 
 +    * python/3.9.13-gcc-12.2.0-ctxezzj 
 +    * python/3.8.12-gcc-12.2.0-tr7w5qy 
 +  * openjdk/11.0.15_10-gcc-11.3.0-ih4nksq 
 +  * hadoop (hadoop/3.3.2-gcc-11.3.0-ypaxzwt) 
 +  * spark (spark/3.1.1-gcc-11.3.0-47oqksq) 
 +  * r
  
  
-To use a Jupyter notebookconnect to https://lbd.zserv.tuwien.ac.at:8000, and login with your user's credentials.+Depending on the applicationchoose the right combination: 
 +  * MapReduce => openjdk + hadoop 
 +  * Scala => openjdk + hadoop + spark 
 +  * Java => openjdk + hadoop + spark 
 +  * PySpark => openjdk + hadoop + spark + python 
 +  * R => openjdk + hadoop + spark + r
  
-Start a new notebook, e.g. Python3, PySpark3, a terminal, ...+==== Background ====
  
-A short example: new -> PySpark3 +What is VSC actually doing to run a Big Data job using Hadoop and/or Spark? 
-<code> + 
-import pyspark +== HPC == 
-import random + 
-sc pyspark.SparkContext(appName="Pi"+User jobs on VSC are submitted using Slurm, thus making sure that a job gets a well defined execution environment which is similar in terms of resources and performance for each similar job execution. 
-num_samples 10000 + 
-def inside(p):      +== Big Data == 
-  x, y = random.random(), random.random() + 
-  return x*x + y*y < 1 +Big Data jobs on the other hand are often executed on Big Data clusters, which make sure that  
-count = sc.parallelize(range(0, num_samples)).filter(inside).count() +  * programs are started on the nodes where data is stored, thus reducing communication overhead, 
-pi count num_samples +  * load balancing is done automatically, 
-print(pi) +  * fault tolerance is added by the framework, 
-sc.stop() +  * very good scaling behaviour is ensured without any changes in user programs. 
-</code>+Much of this work is done by Yarn (Yet another resource negotiator), which is a scheduler (and runs usually at a similar level on Big Data clusters as Slurm is running on VSC). 
 + 
 +== Combination of HPC and Big Data == 
 + 
 +In our setup on VSC we combine those two worlds of HPC and Big data by starting Yarn, HDFS and spark in user context within a job (on the nodes belonging to the job)
 +User data can be accessed  
 +  * from the standard VSC paths using Spectrum Scale (formerly GPFS), or 
 +  * from HDFS, which is started locally on the local SSDs of the compute nodesIn this case the data has to be copied in and/or out at the beginning/end of the job. After a job run, the local HDFS instance is destroyed and remaining data is lost. 
 + 
 +VSC has several scripts - most are optional - which are added to the path using '''module load hadoop''' and '''module load spark''': 
 +  * prolog_create_key.sh and epilog_discard_key.sh, which are required to start services on other nodes, but also to access the local (masternode. 
 +  * vsc_start_hadoop.sh, which starts Yarn (scheduler) and HDFS (distributed file systemon the nodes of the job. (If only one node is used this is optional, since Spark can also run with its own scheduler and without HDFS.) 
 +  * vsc_start_spark.sh, which starts the Spark service on the nodes of the job. (This is optional, since MapReduce jobs run without Spark.
 +  * vsc_stop_hadoop.sh, which is usually not required since the services are stopped at the end by the system anyway. 
 +  * vsc_stop_spark.sh, which is usually not required since the services are stopped at the end by the system anyway. 
 + 
 +==== References ==== 
 + 
 + 
 +  [[ https://vsc.ac.at/training/2021/BigData-Mar/ | Big Data on VSC]]: VSC training course, including course material 
 +  *[[http://typo3.vsc.ac.at/fileadmin/user_upload/vsc/conferences/ahpc20/slides/244-Roda-AHPC20.pdf|Apache Spark 
 +is here to stay]]: Talk at Austrian HPC Meeting 2020 
 +  * {{ :projects:hadoop:332-tutorial-hadoop-ahpc18.pdf |Hadoop for HPC Users: Overview and first steps}}: Tutorial at Austrian HPC Meeting 2018 by Elmar Kiesling 
 + 
 +==== Hadoop at the TU Wien ====
  
-==== Congress slides ==== +At the Technische Universität Wien there is a [[doku:lbd Big Data cluster]], mainly for teaching purposes, which is also used by researchers.
-  * {{ :projects:hadoop:332-tutorial-hadoop-ahpc18.pdf |Hadoop for HPC Users: Overview and first steps}} (Tutorial at Austrian HPC Meeting 2018 by Elmar Kiesling)+
    
  • doku/bigdata.txt
  • Last modified: 2023/05/11 08:52
  • by katrin