API Docs¶
-
class
dask_yarn.
YarnCluster
(environment=None, n_workers=None, worker_vcores=None, worker_memory=None, worker_restarts=None, worker_env=None, worker_class=None, worker_options=None, worker_gpus=None, scheduler_vcores=None, scheduler_gpus=None, scheduler_memory=None, deploy_mode=None, name=None, queue=None, tags=None, user=None, host=None, port=None, dashboard_address=None, skein_client=None, asynchronous=False, loop=None)¶ Start a Dask cluster on YARN.
You can define default values for this in Dask’s
yarn.yaml
configuration file. See http://docs.dask.org/en/latest/configuration.html for more information.Parameters: - environment : str, optional
The Python environment to use. Can be one of the following:
- A path to an archived Python environment
- A path to a conda environment, specified as conda:///…
- A path to a virtual environment, specified as venv:///…
- A path to a python executable, specifed as python:///…
Note that if not an archive, the paths specified must be valid on all nodes in the cluster.
- n_workers : int, optional
The number of workers to initially start.
- worker_vcores : int, optional
The number of virtual cores to allocate per worker.
- worker_memory : str, optional
The amount of memory to allocate per worker. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.
- worker_restarts : int, optional
The maximum number of worker restarts to allow before failing the application. Default is unlimited.
- worker_env : dict, optional
A mapping of environment variables to their values. These will be set in the worker containers before starting the dask workers.
- worker_gpus : int, options
The number of gpus to allocate per worker
- scheduler_vcores : int, optional
The number of virtual cores to allocate per scheduler.
- scheduler_gpus : int, options
The number of gpus to allocate per scheduler
- scheduler_memory : str, optional
The amount of memory to allocate to the scheduler. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.
- deploy_mode : {‘remote’, ‘local’}, optional
The deploy mode to use. If
'remote'
, the scheduler will be deployed in a YARN container. If'local'
, the scheduler will run locally, which can be nice for debugging. Default is'remote'
.- name : str, optional
The application name.
- queue : str, optional
The queue to deploy to.
- tags : sequence, optional
A set of strings to use as tags for this application.
- user : str, optional
The user to submit the application on behalf of. Default is the current user - submitting as a different user requires user permissions, see the YARN documentation for more information.
- host : str, optional
Host address on which the scheduler will listen. Only used if
deploy_mode='local'
. Defaults to'0.0.0.0'
.- port : int, optional
The port on which the scheduler will listen. Only used if
deploy_mode='local'
. Defaults to0
for a random port.- dashboard_address : str
Address on which to the dashboard server will listen. Only used if
deploy_mode='local'
. Defaults to ‘:0’ for a random port.- skein_client : skein.Client, optional
The
skein.Client
to use. If not provided, one will be started.- asynchronous : bool, optional
If true, starts the cluster in asynchronous mode, where it can be used in other async code.
- loop : IOLoop, optional
The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.
Examples
>>> cluster = YarnCluster(environment='my-env.tar.gz', ...) >>> cluster.scale(10)
-
adapt
(minimum=0, maximum=inf, interval='1s', wait_count=3, target_duration='5s', **kwargs)¶ Turn on adaptivity
This scales Dask clusters automatically based on scheduler activity.
Parameters: - minimum : int, optional
Minimum number of workers. Defaults to
0
.- maximum : int, optional
Maximum number of workers. Defaults to
inf
.- interval : timedelta or str, optional
Time between worker add/remove recommendations.
- wait_count : int, optional
Number of consecutive times that a worker should be suggested for removal before we remove it.
- target_duration : timedelta or str, optional
Amount of time we want a computation to take. This affects how aggressively we scale up.
- **kwargs :
Additional parameters to pass to
distributed.Scheduler.workers_to_close
.
Examples
>>> cluster.adapt(minimum=0, maximum=10)
-
dashboard_link
¶ Link to the dask dashboard. None if dashboard isn’t running
-
classmethod
from_application_id
(app_id, skein_client=None, asynchronous=False, loop=None)¶ Connect to an existing
YarnCluster
with a given application id.Parameters: - app_id : str
The existing cluster’s application id.
- skein_client : skein.Client
The
skein.Client
to use. If not provided, one will be started.- asynchronous : bool, optional
If true, starts the cluster in asynchronous mode, where it can be used in other async code.
- loop : IOLoop, optional
The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.
Returns: - YarnCluster
-
classmethod
from_current
(asynchronous=False, loop=None)¶ Connect to an existing
YarnCluster
from inside the cluster.Parameters: - asynchronous : bool, optional
If true, starts the cluster in asynchronous mode, where it can be used in other async code.
- loop : IOLoop, optional
The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.
Returns: - YarnCluster
-
classmethod
from_specification
(spec, skein_client=None, asynchronous=False, loop=None)¶ Start a dask cluster from a skein specification.
Parameters: - spec : skein.ApplicationSpec, dict, or filename
The application specification to use. Must define at least one service:
'dask.worker'
. If no'dask.scheduler'
service is defined, a scheduler will be started locally.- skein_client : skein.Client, optional
The
skein.Client
to use. If not provided, one will be started.- asynchronous : bool, optional
If true, starts the cluster in asynchronous mode, where it can be used in other async code.
- loop : IOLoop, optional
The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.
-
logs
(scheduler=True, workers=True)¶ Return logs for the scheduler and/or workers
Parameters: - scheduler : boolean, optional
Whether or not to collect logs for the scheduler
- workers : boolean or iterable, optional
A list of worker addresses to select. Defaults to all workers if
True
or no workers ifFalse
Returns: - logs : dict
A dictionary of name -> logs.
-
scale
(n)¶ Scale cluster to n workers.
Parameters: - n : int
Target number of workers
Examples
>>> cluster.scale(10) # scale cluster to ten workers
-
shutdown
(status='SUCCEEDED', diagnostics=None)¶ Shutdown the application.
Parameters: - status : {‘SUCCEEDED’, ‘FAILED’, ‘KILLED’}, optional
The yarn application exit status.
- diagnostics : str, optional
The application exit message, usually used for diagnosing failures. Can be seen in the YARN Web UI for completed applications under “diagnostics”. If not provided, a default will be used.
-
workers
()¶ A list of all currently running worker containers.