Managing Python Environments

We need to ensure that the libraries used on the Yarn cluster are the same as what you are using locally. There are a few ways to specify this:

  • The path to an archived environment (either conda or virtual environments)
  • The path to a Conda environment (as conda:///...)
  • The path to a virtual environment (as venv:///...)
  • The path to a python executable (as python:///...)

Note that when not using an archive, the provided path must be valid on all nodes in the cluster.

Using Archived Python Environments

The most common way to use dask-yarn is to distribute an archived Python environment throughout the YARN cluster as part of the application. Packaging the environment for distribution is typically handled using

These environments can contain any Python packages you might need, but require dask-yarn (and its dependencies) at a minimum.

Archiving Conda Environments Using Conda-Pack

You can package a conda environment using conda-pack.

$ conda create -n my-env dask-yarn scikit-learn          # Create an environment

$ conda activate my-env                                  # Activate the environment

$ conda-pack                                             # Package environment
Collecting packages...
Packing environment at '/home/username/miniconda/envs/my-env' to 'my-env.tar.gz'
[########################################] | 100% Completed |  12.2s

Archiving Virtual Environments Using Venv-Pack

You can package a virtual environment using venv-pack. The virtual environment can be created using either venv or virtualenv. Note that the python linked to in the virtual environment must exist and be accessible on every node in the YARN cluster. If the environment was created with a different Python, you can change the link path using the --python-prefix flag. For more information see the venv-pack documentation.

$ python -m venv my_env                     # Create an environment using venv
$ python -m virtualenv my_env               # Or create an environment using virtualenv

$ source my_env/bin/activate                # Activate the environment

$ pip install dask-yarn scikit-learn        # Install some packages

$ venv-pack                                 # Package environment
Collecting packages...
Packing environment at '/home/username/my-env' to 'my-env.tar.gz'
[########################################] | 100% Completed |  8.3s

Specifying the Archived Environment

You can now start a cluster with the packaged environment by passing the path to the constructor, e.g. YarnCluster(environment='my-env.tar.gz', ...).

Note that if the environment is a local file, the archive will be automatically uploaded to a temporary directory on HDFS before starting the application. If you find yourself reusing the same environment multiple times, you may want to upload the environment to HDFS once beforehand to avoid repeating this process for each cluster (the environment is then specified as hdfs:///path/to/my-env.tar.gz).

After startup you may want to verify that your versions match with the following:

from dask_yarn import YarnCluster
from dask.distributed import Client

cluster = YarnCluster(environment='my-env.tar.gz')
client = Client(cluster)
client.get_versions(check=True)  # check that versions match between all nodes

Using Python Environments Local to Each Node

Alternatively, you can specify the path to a conda environment, virtual environment, or Python executable that is already found on each node:

from dask_yarn import YarnCluster

# Use a conda environment at /path/to/my/conda/env
cluster = YarnCluster(environment='conda:///path/to/my/conda/env')

# Use a virtual environment at /path/to/my/virtual/env
cluster = YarnCluster(environment='venv:///path/to/my/virtual/env')

# Use a Python executable at /path/to/my/python
cluster = YarnCluster(environment='python:///path/to/my/python')

As before, these environments can have any Python packages, but must include dask-yarn (and its dependencies) at a minimum. It’s also very important that these environments are uniform across all nodes; mismatched environments can lead to hard to diagnose issues. To check this, you can use the Client.get_versions method:

from dask.distributed import Client

client = Client(cluster)
client.get_versions(check=True)  # check that versions match between all nodes