Dask-Yarn

Dask-Yarn deploys Dask on YARN clusters, such as are found in traditional Hadoop installations. Dask-Yarn provides an easy interface to quickly start, scale, and stop Dask clusters natively from Python.

from dask_yarn import YarnCluster
from dask.distributed import Client

# Create a cluster where each worker has two cores and eight GB of memory
cluster = YarnCluster(environment='environment.tar.gz',
                      worker_vcores=2,
                      worker_memory="8GB")
# Scale out to ten such workers
cluster.scale(10)

# Connect to the cluster
client = Client(cluster)

Dask-Yarn uses Skein, a Pythonic library to manage Yarn services.

Install

Install with Conda:

conda install -c conda-forge dask-yarn

Install with Pip:

pip install dask-yarn

Install from Source:

Dask-Yarn is available on github and can always be installed from source.

pip install git+https://github.com/dask/dask-yarn.git

Distributing Python Environments

We need to ensure that the libraries used on the Yarn cluster are the same as what you are using locally. We accomplish this by packaging up a conda environment with conda-pack and shipping it to the Yarn cluster with Dask-Yarn.

These environments can contain any Python packages you might need, but require dask-yarn (and its dependencies) at a minimum.

$ conda create -n my-env dask-yarn scikit-learn          # Create an environment

$ conda activate my-env                                  # Activate the environment

$ conda-pack                                             # Package environment
Collecting packages...
Packing environment at '/home/username/miniconda/envs/my-env' to 'my-env.tar.gz'
[########################################] | 100% Completed |  12.2s

You can now start a cluster with that environment by passing the path to the constructor, e.g. YarnCluster(environment='my-env.tar.gz', ...). You may want to verify that your versions match with the following:

from dask_yarn import YarnCluster
from dask.distributed import Client

cluster = YarnCluster(environment='my-env.tar.gz')
client = Client(cluster)
client.get_versions(check=True)  # check that versions match between all nodes

Configuration

Specifying all parameters to the YarnCluster constructor every time can be error prone, especially when sharing this workflow with new users. Alternatively, you can provide defaults in a configuration file, traditionally held in ~/.config/dask/yarn.yaml or /etc/dask/yarn.yaml. Here is an example:

# /home/username/.config/dask/yarn.yaml
yarn:
  name: dask                 # Application name
  queue: default             # Yarn queue to deploy to

  environment: /path/to/my-env.tar.gz

  tags: []                   # List of strings to tag applications

  scheduler:                 # Specifications of scheduler container
    vcores: 1
    memory: 4GiB

  worker:                   # Specifications of worker containers
    vcores: 2
    memory: 8GiB
    count: 0                 # Number of workers to start on initialization
    restarts: -1             # Allowed number of restarts, -1 for unlimited

Users can now create YarnClusters without specifying any additional information.

from dask_yarn import YarnCluster

cluster = YarnCluster()
cluster.scale(20)

For more information on Dask configuration see the Dask configuration documentation.