Specification

Skein uses a declarative api for creating applications. The application may be specificed as a YAML or JSON document, or via the Python API. Here we describe the pieces of an application specification in detail.

Specification Components

Top-Level Fields

At the top-level, a specification starts with an ApplicationSpec. This takes the following fields:

name

The name of the application. Optional, defaults to skein.

queue

The queue to submit the application to. Optional, defaults to default.

node_label

The node_label to request for all containers in this application. Services can override this value by setting node_label on the service directly. Default is no label.

max_attempts

The maximum number of submission attempts before marking the application as failed. Note that this only considers failures of the application master during startup. Optional, default is 1 (recommended).

tags

A list of strings to use as tags for this application. Optional.

Example

tags:
  - my-tag
  - my-other-tag

file_systems

A list of Hadoop file systems to acquire delegation tokens for. A token is always acquired for the default filesystem (fs.defaultFS in core-site.xml). In many cases the default is sufficient. Optional.

Example

file_systems:
  - hdfs://nn1.com:8032
  - hdfs://nn2.com:8032
  - webhdfs://nn3.com:50070

acls

Configures the application-level Access Control Lists (ACLs). Optional, defaults to no ACLs.

The following access types are supported:

  • VIEW : view application details
  • MODIFY : modify the application via YARN (e.g. killing the application)
  • UI : access the application Web UI

The VIEW and MODIFY access types are handled by YARN directly; permissions for these can be set by users and/or groups. Authorizing UI access is handled by Skein internally, and only user-level access control is supported.

The application owner (the user who submitted the application) will always have permission for all access types.

By default, ACLs are disabled - to enable, set enable: True. If enabled, access is restricted only to the application owner by default - add users/groups to the access types you wish to expand to other users. You can use the wildcard character "*" to enable access for all users. Here we give view access to all users:

Supported subfields are:

  • enable: whether to enable ACLs for this application. Default is False.
  • view_users: users to give VIEW access. Default is [].
  • view_groups: groups to give VIEW access. Default is [].
  • modify_users: users to give MODIFY access. Default is [].
  • modify_groups: groups to give MODIFY access. Default is [].
  • ui_users: users to give UI access. Default is [].

Example

acls:
  enable: True    # Enable ACLs. Without this ACLs will be ignored.

  ui_users:
    - "*"           # Give all users access to the Web UI

  view_users:
    - nancy         # Give nancy view access

  # The application owner always has access to all access types. Since
  # `modify_users`/`modify_groups` are unchanged, only the owner has modify
  # access.

For more information on ACLs see:

master

Additional configuration tuning for the Application Master. Optional. Under common use the defaults should be sufficient.

Supported subfields are:

  • log_level: The application master log level. Possible values are (from most to least verbose): all, trace, debug, info, warn, error, fatal or off. Default is info. Note that this sets the skein.log.level system property, which is used in the default log4j.properties file - if you provide your own log4j.properties file this field may have no effect.
  • log_config: a path to a custom log4j.properties file. Could be local or on a remote filesystem. If not provided, a default logging configuration is used. See the Log4j documentation for more information.

Example

master:
  log_config: path/to/my/log4j.properties

services

A dict of service-name to Service. At least one service is required.

Example

services:
  my_service:
    ...

Service

The basic of unit of an application is a Service. Services describe how to launch an executable, as well as how that executable should be managed over the course of the application. A service may also have multiple instances, each running in their own YARN container. A service description takes the following fields:

resources

The memory and cpu requirements to a single instance of the service. Takes the following fields:

  • memory

    The amount of memory to request. Can be either a string with units (e.g. "5 GiB"), or numeric. If numeric, specifies the amount of memory in MiB. Note that the units are in mebibytes (MiB) not megabytes (MB) - the former being binary based (1024 MiB in a GiB), the latter being decimal based (1000 MB in a GB). See here for more information on this distinction.

    Requests smaller than the minimum allocation will receive the minimum allocation (1024 MiB by default). Requests larger than the maximum allocation will error on application submission.

  • vcores

    The number of virtual cores to request. Depending on your system configuration one virtual core may map to a single actual core, or a fraction of a core. Requests larger than the maximum allocation will error on application submission.

Example

services:
  my_service:
    memory: 2 GiB
    vcores: 2

commands

Shell commands to startup the service. Commands are run in the order provided, with subsequent commands only run if the prior commands succeeded. At least one command must be provided.

services:
  my_service:
    commands:
      - echo "This is a single command"
      - |
        if [[ "$SOME_CONDITION" == "true" ]]; then
            echo "You can use multi-line strings "
            echo "to handle more complicated behavior"
        fi

instances

The number of instances to create on startup. Must be >= 0. After startup additional instances may be created by the ApplicationClient. Optional, default is 1.

Example

services:
  my_service:
    instances: 4  # Start 4 instances

files

Any files or archives needed to run the service. A mapping of destination relative paths to File or str objects describing the sources for these paths. File objects are described in more detail below. Each File object takes the following fields:

  • source

    The path to the file/archive. If no scheme is specified, the path is assumed to be on the local filesystem (file:// scheme). Relative paths are supported, and are taken relative to the location of the specification file.

  • type

    The type of file to distribute – either archive or file. Archive’s are automatically extracted by yarn into a directory with the same name as their destination (only .zip, .tar.gz, and .tgz supported). Optional; by default the type is inferred from the file extension.

  • visibility

    The resource visibility. Describes how resources are shared between applications. Valid values are:

    • application – Shared among containers of the same application on the node.
    • private – Shared among all applications of the same user on the node.
    • public – Shared by all users on the node.

    Optional, default is application. In most cases the default is what you want.

  • size

    The resource size in bytes. Optional; if not provided will be determined by the file system. In most cases the default is what you want.

  • timestamp

    The time the resource was last modified. Optional; if not provided will be determined by the file system. In most cases the default is what you want.

As a shorthand, values may be the source path instead of a File object.

For more information see Distributing Files.

Example

services:
  my_service:
    files:
      # /local/path/to/file.zip will be uploaded to hdfs, and extracted
      # into the directory path_on_container
      path_on_container:
        source: /local/path/to/file.zip
        type: archive

      # Can also specify only the source path - missing fields are inferred
      script_path.py: /path/to/script.py

      # Files on remote filesystems can be used by specifying the scheme.
      script2_path.py: hdfs:///remote/path/to/script2.py

env

A mapping of environment variables needed to run the service. Optional.

Example

services:
  my_service:
    env:
      ENV1: VAL1
      ENV2: VAL2

depends

A list of service names that this service depends on. The service will only be started after all its dependencies have been started. Optional.

Example

services:
  starts_first:
    ...
  starts_second:
    depends:
      - starts_first

max_restarts

The maximum number of restarts allowed for this service. Must be >= -1. On failure, a container will be restarted if the total number of restarts for its service is < max_restarts. Once this limit is exceeded, the service is marked as failed and the application will be terminated. Set to -1 to always restart, or 0 to never restart. Optional, default is 0.

Example

services:
  my_service:
    max_restarts: -1  # always restart
    ...

  my_service2:
    max_restarts: 0   # never restart
    ...

  my_service3:
    max_restarts: 3   # restart a maximum of 3 times
    ...

node_label

The node_label to request for all containers in this service. If not set, defaults to the application-level node_label (if set).

Example

node_label: mylabel

services:
  my_service1:
    node_label: GPU  # This service will be allocated on "GPU" nodes only.
    ...

  my_service2:
    # node_label is not set, the application label "mylabel" will be used.
    ...

nodes

A list of node host names to restrict containers for this service to. Optional, defaults to no node restrictions.

racks

A list of rack names to restrict containers for this service to. The racks corresponding to any nodes requested will be automatically added to this list. Optional, defaults to no rack restrictions.

relax_locality

Whether to interpret the nodes and racks specifications as locality suggestions whether than requirements. If True, containers for this request may be assigned on hosts and racks other than the ones explicitly requested. If False, those restrictions are strictly enforced. Optional, default is False.

Example

services:
  my_service1:
    # This service *must* run on either worker1 or worker2
    relax_locality: false
    nodes:
      - worker1
      - worker2

  my_service2:
    # This service is *suggested* to run on either worker1 or worker2,
    # but may run on any node
    relax_locality: true
    nodes:
      - worker1
      - worker2

Example

An example specification file. This starts a jupyter notebook and a 4 node dask.distributed cluster. The example uses conda-pack to package and distribute conda environments, but applications are free to package files any way they see fit.

name: dask-with-jupyter
queue: default

services:
  jupyter:
    resources:
      memory: 1 GiB
      vcores: 1
    files:
      conda_env: env.zip
      data.csv: hdfs:///path/to/some/data.csv
    commands:
      - source conda_env/bin/activate
      - start-jupyter-notebook-and-register-address  # pseudocode

  dask.scheduler:
    resources:
      memory: 2 GiB
      vcores: 1
    files:
      conda_env: env.zip
    commands:
      - source conda_env/bin/activate
      - start-dask-scheduler-and-register-address  # pseudocode

  dask.worker:
    instances: 4
    resources:
      memory: 2 GiB
      vcores: 4
    max_restarts: 8  # Restart workers a maximum of 8 times
    files:
      conda_env: env.zip
    depends:
      - dask.scheduler  # Ensure scheduler is started before workers
    commands:
      - source conda_env/bin/activate
      - get-dask-scheduler-address-and-start-worker  # pseudocode

Python API Example

The above YAML file format is also composable using the Python API. The python classes (ApplicationSpec, Service, etc…) map 1:1 to the YAML format described above. They can be read from a file, or created directly:

import skein

# Create from a yaml file
spec = skein.ApplicationSpec.from_file('spec.yaml')

# Create directly
jupyter = skein.Service(resources=skein.Resources(memory=1024, vcores=1),
                        files={'conda_env': 'env.zip',
                               'data.csv': 'hdfs:///path/to/some/data.csv'},
                        commands=['source conda_env/bin/activate',
                                  'start-jupyter-notebook-and-register-address'])

scheduler = skein.Service(resources=skein.Resources(memory=2048, vcores=1),
                          files={'conda_env': 'env.zip'},
                          commands=['source conda_env/bin/activate',
                                    'start-dask-scheduler-and-register-address'])

worker = skein.Service(instances=4,
                       max_restarts=8,
                       resources=skein.Resources(memory=2048, vcores=4),
                       files={'conda_env': 'env.zip'},
                       depends=['dask.scheduler'],
                       commands=['source conda_env/bin/activate',
                                 'get-dask-scheduler-address-and-start-worker'])

spec = skein.ApplicationSpec(name="dask-with-jupyter",
                             queue="default",
                             services={'jupyter': jupyter,
                                       'dask.scheduler': scheduler,
                                       'dask.worker': worker})