Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
doku:jupyterhub:dask [2022/08/24 07:32] katrindoku:jupyterhub:dask [2023/05/10 09:43] (current) katrin
Line 1: Line 1:
 ====== Dask on JupyterHub ====== ====== Dask on JupyterHub ======
  
-We have prepared conda environment to use with dask (the name may vary):+===== Profile & Environment ===== 
 + 
 +The VSC JupyterHub already provides a conda environment so users can just start using dask without installing anything. 
  
 {{:doku:jupyterhub:conda_options.png}} {{:doku:jupyterhub:conda_options.png}}
  
-To properly use dask from the jupyterhub conda environment use the following settings+Since we run updates in regular intervals the exact name may vary.  
 + 
 +At the time of writing the most current name was ''jupyterhub-vsc-python-v1'' 
 + 
 + 
 +===== Python Example ===== 
 + 
 +To properly use dask from the jupyterhub conda environment use the following code to get started
  
 <code python> <code python>
 +##########################################################################
 +# if you are running on a shared node it is important to setup some dask variables
 +# to not prevent interfering with other users dask setups (e.g. dashboard port ...)
 +
 +import os
 import dask import dask
 +import dask.config
 import dask.distributed   import dask.distributed  
  
-import os+USER = os.environ.get('USER'
 +if USER.startswith('trainee'): 
 +    user_number = int(USER.replace('trainee', '')) 
 +else: 
 +    user_number = hash(USER) % 100 
 +print('user_number:', user_number) 
 +     
 +DASHBOARD_PORT = 45000 + user_number  
 +SCHEDULER_PORT = 46000 + user_number 
 +###########################################################################
  
-DASHBOARD_PORT = 41231      # a port of your choosing - should be free on the node running the notebook+print('dashboard port:', DASHBOARD_PORT
 +print('scheduler port:', SCHEDULER_PORT)
  
-user = os.environ.get('USER')+dask.config.set({'temporary_directory': f'/tmp/dask-{USER}'}) 
 +print('temporary directory:', dask.config.get('temporary_directory'))
  
-dask.config.set({'distributed.dashboard.link': f'/user/{user}/proxy/{DASHBOARD_PORT}/status'}) +dask.config.set({'distributed.dashboard.link': f'/user/{USER}/proxy/{DASHBOARD_PORT}/status'}) 
-dask.config.get('distributed.dashboard.link')+print('dashboard link:', dask.config.get('distributed.dashboard.link')
 +</code>
  
 +<box 80% round blue|Info> Please note that the following configuration is only an example and has to be fine tuned to the actual use case.</box>
 +
 +For a complete and up 2 date list of configuration parameters have a look at the dask documentation: [[https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html]]
 +
 +<code python>
 from dask_jobqueue import SLURMCluster from dask_jobqueue import SLURMCluster
  
-cluster = SLURMCluster(queue='mem_0096',       specify the **partition** to use (even though it says queue) +cluster = SLURMCluster(queue='skylake_0096',   # the slurm partition we want to use for dask-worker processes 
-                       project='sysadmin',     use your project e.g. p72310 +                       #account='p70824',      the account (=VSC project) used for accounting 
-                       cores=48,               # how many cores should each worker job have +                       cores=16,               # number of physical processor cores per SLURM job 
-                       memory='10GB',          how much memory should every worker job have +                       memory='4GB',           # memory one SLURM job should have avaliable (will be divided by number of worker processes in job) 
-                       processes=1             each worker starts multiple processes (using the resources) - depends on your problem +                       processes=2,            number of dask-worker processes (=python processesper SLURM job 
-                       walltime='00:05:00',    # maximum runtime of a worker job +                       name=f'{USER}-worker',  # custom name of dask-workers (used in dask internally) 
-                       interface='ib0',        # interface for workers to communicate on +                       walltime='00:30:00',    # time SLURM job will run for e.g. 30 minutes; after that all dask-worker in the job will shutdown 
-                       scheduler_options={'interface': 'eno1',                         # scheduler interface +                       interface='ib0',        # ib0 is the name of the fast network connection on VSC-4 & VSC-5 
-                                          'dashboard_address': f':{DASHBOARD_PORT}'} set a different dashboard port to avoid collisions +                       scheduler_options={ 
-                       job_extra=['--qos="admin"', '-N 1']                           manually set a qos and optional additional node specs +                           'interface': 'ib0',                          # scheduler should also use ib0 
-                       header_skip=['-n 1'])                                           omit this line from the generated job script (needed so we can add our own above) +                           'port': SCHEDULER_PORT,                      # custom scheduler port (dask-worker processes will use this to connect) 
-print(cluster.job_script())+                           'dashboard_address': f':{DASHBOARD_PORT}',   custom dashboard port from earlier 
 +                       }, 
 +                       # if you use adaptive scaling you might want to set these as well 
 +                       # see https://jobqueue.dask.org/en/latest/advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers 
 +                       #worker_extra_args=[ 
 +                       #    '--lifetime', '25m', 
 +                       #    '--lifetime-stagger', '5m' 
 +                       #], 
 +                       job_directives_skip=[ 
 +                           '-J dask-worker',              skip the default SLURM job name 
 +                       ], 
 +                       job_extra_directives=[ 
 +                           f'--job-name={USER}-worker',   # set a custom SLURM job name 
 +                           '--qos=skylake_0096',          # set e.g. a custom qos that provides you with faster scheduling 
 +                           #'--reservation=training',     # an optional reservation to use                       
 +                       ]
  
-cluster.scale(jobs=1   # jobs here means how many slurm worker jobs we will get+# the settings from above are turned into a slurm job script that can be 
 +# displayed for debugging by using the 'job_script' method 
 +print(cluster.job_script())  
 +</code>
  
-cluster.get_logs()       # check if the jobs have been allocated and the workers are connected alternatively check via slurm squeue -u <USERNAME> +After running the above code you have a ''cluster'' instance that holds the dask scheduler and all the information to spawn ''dask-worker'' processes that will actually execute the generated tasks. So far no worker processes have been spawned yet. You can always check this by running
  
 +<code>
 +$ squeue -u $USER
 +</code>
 +
 +The next step we need to take is now to scale up the cluster to get some worker processes running. In dask there are two options available:
 +- ''scale'': scale the cluster to a fixed number of jobs (or workers) depending on the arguments 
 +- ''adapt'': set minimum and maximum number of jobs and the cluster handles scaling according to the current load
 +
 +Since they are scheduled via SLURM it depends heavily on your resource settings from earlier how long it will take to get them scheduled and be in a running state.
 +
 +<code python>
 +# use 'scale' to schedule a fixed number of slurm jobs that host dask-worker processes
 +cluster.scale(jobs=1)
 +
 +# OR use 'adapt' to use adaptive scaling
 +#cluster.adapt(minimum_jobs=1, maximum_jobs=10)
 +
 +# call the 'get_logs' function to see if everything is ok
 +cluster.get_logs()
 +</code>
 +
 +After executing the above code using ''scale'' you should see jobs appearing in slurm that are called '<username>-worker'
 +<code>
 +$ squeue -u $USER
 +             JOBID            PARTITION     NAME     USER ST       TIME  NODES     NODELIST(REASON)
 +            475376         skylake_0096 user-wor     user  R       0:22      1            n4902-030
 +</code>
 +
 +The next step is important: here we create a dask distributed client that uses the cluster instance.
 +
 +From this point on all scheduled tasks will be sent to the cluster for execution
 +
 +<code python>
 from dask.distributed import Client from dask.distributed import Client
-client = Client(cluster)     # create a client using the cluster+client = Client(cluster) 
 +</code>
  
-# now schedule some work+After the slurm jobs with the worker processes are running we can finally schedule some work via dask:
  
 +<code>
 import dask.array as da import dask.array as da
  
-x = da.random.random((10_000,10_000,10), chunks=(1000,1000,5)) +x = da.random.random((10001000, 10), chunks=(1000, 1000, 5)) 
-y = da.random.random((10_000,10_000,10), chunks=(1000,1000,5))+y = da.random.random((10001000, 10), chunks=(1000, 1000, 5))
 z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1,)) z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1,))
  
 z.compute() z.compute()
 +</code>
  
-cluster.close()           # stop the cluster once you're done (also happens automatically when the notebook kernel exits)+To cleanup resources one can close client & cluster and delete the references. By doing this everything related to those instances will be freed as well. 
 + 
 +<code python> 
 +client.close() 
 +del client 
 +cluster.close() 
 +del cluster
 </code> </code>
  
-Result:+Example of the dask ui in the JupyterHub 
 {{:doku:jupyterhub:dask_example.png?800|}} {{:doku:jupyterhub:dask_example.png?800|}}
  
  • doku/jupyterhub/dask.1661326379.txt.gz
  • Last modified: 2022/08/24 07:32
  • by katrin