Dask-Yarn¶
Dask-Yarn deploys Dask on YARN clusters, such as are found in traditional Hadoop installations. Dask-Yarn provides an easy interface to quickly start, scale, and stop Dask clusters natively from Python.
from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster where each worker has two cores and eight GB of memory
cluster = YarnCluster(environment='environment.tar.gz',
worker_vcores=2,
worker_memory="8GB")
# Scale out to ten such workers
cluster.scale(10)
# Connect to the cluster
client = Client(cluster)
Dask-Yarn uses Skein, a Pythonic library to manage Yarn services.
Install¶
Install with Conda:
conda install -c conda-forge dask-yarn
Install with Pip:
pip install dask-yarn
Install from Source:
Dask-Yarn is available on github and can always be installed from source.
pip install git+https://github.com/dask/dask-yarn.git
Distributing Python Environments¶
We need to ensure that the libraries used on the Yarn cluster are the same as what you are using locally. We accomplish this by packaging up a conda environment with conda-pack and shipping it to the Yarn cluster with Dask-Yarn.
These environments can contain any Python packages you might need, but require
dask-yarn (and its dependencies) at a minimum.
$ conda create -n my-env dask-yarn scikit-learn # Create an environment
$ conda activate my-env # Activate the environment
$ conda-pack # Package environment
Collecting packages...
Packing environment at '/home/username/miniconda/envs/my-env' to 'my-env.tar.gz'
[########################################] | 100% Completed | 12.2s
You can now start a cluster with that environment by passing the path to the
constructor, e.g. YarnCluster(environment='my-env.tar.gz', ...). You may
want to verify that your versions match with the following:
from dask_yarn import YarnCluster
from dask.distributed import Client
cluster = YarnCluster(environment='my-env.tar.gz')
client = Client(cluster)
client.get_versions(check=True) # check that versions match between all nodes
Configuration¶
Specifying all parameters to the YarnCluster constructor every time can be
error prone, especially when sharing this workflow with new users.
Alternatively, you can provide defaults in a configuration file, traditionally
held in ~/.config/dask/yarn.yaml or /etc/dask/yarn.yaml. Here is an
example:
# /home/username/.config/dask/yarn.yaml
yarn:
name: dask # Application name
queue: default # Yarn queue to deploy to
environment: /path/to/my-env.tar.gz
tags: [] # List of strings to tag applications
scheduler: # Specifications of scheduler container
vcores: 1
memory: 4GiB
worker: # Specifications of worker containers
vcores: 2
memory: 8GiB
count: 0 # Number of workers to start on initialization
restarts: -1 # Allowed number of restarts, -1 for unlimited
Users can now create YarnClusters without specifying any additional information.
from dask_yarn import YarnCluster
cluster = YarnCluster()
cluster.scale(20)
For more information on Dask configuration see the Dask configuration documentation.