Configuration

Specifying all parameters to the YarnCluster constructor every time may be error prone, especially when sharing this workflow with new users. Alternatively, you can provide defaults in a configuration file, traditionally held in ~/.config/dask/yarn.yaml or /etc/dask/yarn.yaml. Note that this configuration is optional, and only changes the defaults when not specified in the constructor. You only need to set the fields you care about, unset fields will fall back to the default configuration.

Example:

# ~/.config/dask/yarn.yaml
yarn:
  name: dask                 # Application name
  queue: default             # Yarn queue to deploy to

  environment: /path/to/my-env.tar.gz

  scheduler:                 # Specifications of scheduler container
    vcores: 1
    memory: 4GiB

  worker:                   # Specifications of worker containers
    vcores: 2
    memory: 8GiB

Users can now create YarnClusters without specifying any additional information.

from dask_yarn import YarnCluster

cluster = YarnCluster()
cluster.scale(20)

For more information on Dask configuration see the Dask configuration documentation.

Providing a Custom Skein Specification

Sometimes you’ll need more control over the deployment than is provided by the above configuration fields. In this case you can provide the path to a custom Skein specification to the yarn.specification field. If this field is present in the configuration, it will be used as long as no parameters are passed to the YarnCluster constructor. Note that this is equivalent to calling YarnCluster.from_specification() programatically.

# /home/username/.config/dask/yarn.yaml
yarn:
  specification: /path/to/spec.yaml

The specification requires at least one Service named dask.worker which describes how to start a single worker. If an additional service dask.scheduler is provided, this will be assumed to start the scheduler. If dask.scheduler isn’t present, a scheduler will be started locally instead.

In the script section for each service, the appropriate dask-yarn CLI Docs command should be used:

  • dask-yarn services worker to start the worker
  • dask-yarn services scheduler to start the worker

Beyond that, you have full flexibility for how to define a specification. See the Skein documentation for more information. A few examples are provided below:

Example: deploy-mode local with node_label restrictions

This specification is similar to that created automatically when deploy_mode='local' is specified (scheduler runs locally, only worker service specified), except it adds node_label restrictions for the workers. Here we restrict workers to run only on nodes labeled as GPU.

# /path/to/spec.yaml
name: dask
queue: myqueue

services:
  dask.worker:
    # Restrict workers to GPU nodes only
    node_label: GPU
    # Don't start any workers initially
    instances: 0
    # Workers can infinite number of times
    max_restarts: -1
    # Restrict workers to 4 GiB and 2 cores each
    resources:
      memory: 4 GiB
      vcores: 2
    # Distribute this python environment to every worker node
    files:
      environment: /path/to/my/environment.tar.gz
    # The bash script to start the worker
    # Here we activate the environment, then start the worker
    script: |
      source environment/bin/activate
      dask-yarn services worker

Example: deploy-mode remote with custom setup

This specification is similar to that created automatically when deploy_mode='remote' is specified (both scheduler and worker run inside YARN containers), except it runs an initialization script before starting each service.

# /path/to/spec.yaml
name: dask
queue: myqueue

services:
  dask.scheduler:
    # Restrict scheduler to 2 GiB and 1 core
    resources:
      memory: 2 GiB
      vcores: 1
    # The bash script to start the scheduler.
    # Here we have dask-yarn already installed on the node,
    # and also run a custom script before starting the service
    script: |
      some-custom-initialization-script
      dask-yarn services worker

  dask.worker:
    # Don't start any workers initially
    instances: 0
    # Workers can infinite number of times
    max_restarts: -1
    # Workers should only be started after the scheduler starts
    depends:
      - dask.scheduler
    # Restrict workers to 4 GiB and 2 cores each
    resources:
      memory: 4 GiB
      vcores: 2
    # The bash script to start the worker.
    # Here we have dask-yarn already installed on the node,
    # and also run a custom script before starting the service
    script: |
      some-custom-initialization-script
      dask-yarn services worker

Default Configuration

The default configuration file is as follows

yarn:
  specification: null        # A path to a skein specification yaml file.
                             # Overrides the following configuration if given.

  name: dask                 # Application name
  queue: default             # Yarn queue to deploy to
  deploy-mode: remote        # The deploy mode to use (either remote or local)
  environment: null          # The Python environment to use
  tags: []                   # List of strings to tag applications
  user: ''                   # The user to submit the application on behalf of,
                             # leave as empty string for current user.

  host: "0.0.0.0"            # The scheduler host, when in deploy-mode=local
  port: 0                    # The scheduler port, when in deploy-mode=local
  dashboard-address: ":0"    # The dashboard address, when in deploy-mode=local

  scheduler:                 # Specifications of scheduler container
    vcores: 1
    memory: 2GiB
    gpus: 0                 # Number of GPUs requested

  worker:                   # Specifications of worker containers
    vcores: 1
    memory: 2GiB
    count: 0                # Number of workers to start on initialization
    restarts: -1            # Allowed number of restarts, -1 for unlimited
    env: {}                 # A map of environment variables to set on the worker
    gpus: 0                 # Number of GPUs requested
    worker_class: "dask.distributed.Nanny" # The kind of worker to launch
    worker_options: {}      # A map of options to pass to the worker

  client:                   # Specification of client container
    vcores: 1
    memory: 2GiB
    gpus: 0                 # Number of GPUs requested
    env: {}                 # A map of environment variables ot set on the client