Local execution#

Dask offers several scheduler options that allow workflow execution without requiring external services. These include:

  • Sequential: Tasks are executed one after another, in a single thread.

  • Multi-threading: Tasks are executed concurrently in multiple threads within the same process.

  • Multi-processing: Tasks are executed concurrently in separate subprocesses of the current process.

  • Local Cluster: Tasks are executed by a Dask cluster started in the current process.

  • Slurm Cluster: Tasks are executed by a SLurm cluster started in the current process.

Example Workflow#

In the following example, we define a workflow with two independent nodes. Since the nodes are not connected, they can execute in parallel:

from ewoksdask import execute_graph

example_workflow = {
    "graph": {"id": "test"},
    "nodes": [
        {
            "id": "node1",
            "task_type": "method",
            "task_identifier": "time.sleep",
            "default_inputs": [{"name": 0, "value": 10}],
        },
        {
            "id": "node2",
            "task_type": "method",
            "task_identifier": "time.sleep",
            "default_inputs": [{"name": 0, "value": 10}],
        }
    ]
}

result = execute_graph(example_workflow, scheduler=..., scheduler_options=...)

You can choose different schedulers depending on your requirements:

Sequential#

Tasks are run one after another in a single thread.

result = execute_graph(example_workflow, scheduler=None)

Multi-Threading#

Runs tasks in parallel using multiple threads. This example completes in ~10 seconds using two threads:

result = execute_graph(
    workflow, scheduler="multithreading", scheduler_options={"num_workers": 2}
)

Multi-Processing#

Runs tasks in parallel using multiple subprocesses. This example also completes in ~10 seconds using two processes:

if __name__ == "__main__":
    result = execute_graph(
        workflow,
        scheduler="multiprocessing",
        scheduler_options={"num_workers": 2, "context": "spawn"},
    )

Note

The if __name__ == “__main__”: guard is required when using the spawn context.

By default:

  • On Linux, the default context is fork.

  • On Windows/macOS, the default context is spawn.

See the Python multiprocessing docs for details on contexts.

Local cluster#

Creates a temporary local Dask cluster with the specified number of workers for the duration of the workflow execution. This example completes in ~10 seconds using two workers:

result = execute_graph(
    workflow, scheduler="cluster", scheduler_options={"n_workers": 2}
)

Additional cluster configuration options are available in the Dask distributed.Client documentation.

In case the same cluster should be used by multiple workflows (provides a status dashboard as well)

from ewoksdask.schedulers import local_scheduler

cluster = local_scheduler(n_workers=2)

result = execute_graph(
    workflow, inputs=inputs, scheduler=cluster.scheduler_address
)

Slurm cluster#

A Slurm cluster with status dashboard can be instantiated in the same process that executes workflows

from ewoksdask.schedulers import slurm_scheduler

cluster = slurm_scheduler(
    queue="gpu",
    walltime="00:20:00",
    memory="256GB",
    cores=16,
    job_extra_directives=["--gres=gpu:1"]
)

result = execute_graph(
    workflow, inputs=inputs, scheduler=cluster.scheduler_address
)

Refer to the Dask JobQueue SlurmCluster documentation for more scheduler configuration options.