This version is outdated by a newer approved version.This version (2022/08/24 07:33) was approved by katrin.The Previously approved version (2022/08/24 07:25) is available.
This is an old revision of the document!
Dask on JupyterHub
We have a prepared conda environment to use with dask (the name may vary):
To properly use dask from the jupyterhub conda environment use the following settings
import dask import dask.distributed import os DASHBOARD_PORT = 41231 # a port of your choosing - should be free on the node running the notebook user = os.environ.get('USER') dask.config.set({'distributed.dashboard.link': f'/user/{user}/proxy/{DASHBOARD_PORT}/status'}) dask.config.get('distributed.dashboard.link') from dask_jobqueue import SLURMCluster cluster = SLURMCluster(queue='mem_0096', # specify the **partition** to use (even though it says queue) project='sysadmin', # use your project e.g. p72310 cores=48, # how many cores should each worker job have memory='10GB', # how much memory should every worker job have processes=1 # each worker starts multiple processes (using the resources) - depends on your problem walltime='00:05:00', # maximum runtime of a worker job interface='ib0', # interface for workers to communicate on scheduler_options={'interface': 'eno1', # scheduler interface 'dashboard_address': f':{DASHBOARD_PORT}'}, # set a different dashboard port to avoid collisions job_extra=['--qos="admin"', '-N 1'], # manually set a qos and optional additional node specs header_skip=['-n 1']) # omit this line from the generated job script (needed so we can add our own above) print(cluster.job_script()) cluster.scale(jobs=1) # jobs here means how many slurm worker jobs we will get cluster.get_logs() # check if the jobs have been allocated and the workers are connected - alternatively check via slurm squeue -u <USERNAME> from dask.distributed import Client client = Client(cluster) # create a client using the cluster # now schedule some work import dask.array as da x = da.random.random((10_000,10_000,10), chunks=(1000,1000,5)) y = da.random.random((10_000,10_000,10), chunks=(1000,1000,5)) z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1,)) z.compute() cluster.close() # stop the cluster once you're done (also happens automatically when the notebook kernel exits)