DagsterDocs

Run Coordinators#

In production Dagster deployments, there are often many runs being launched at once. The run coordinator lets you control the policy that Dagster uses to manage the set of runs in your deployment. When you submit a run from Dagit or the Dagster command line, it is first sent to the run coordinator, which applies any limits or prioritization policies before eventually sending it to the run launcher to be launched.

Configuring your Run Coordinator#

The following run coordinators can be configured on your Dagster instance:

DefaultRunCoordinator#

The DefaultRunCoordinator simply calls launch_run on the instance's run launcher immediately in the same process, without applying any limits or prioritization rules. With this run coordinator set, clicking Launch Run in Dagit will cause the run to immediately launch from the Dagit process. Similarly, scheduled runs will launch immediately from the scheduler process.

QueuedRunCoordinator#

The QueuedRunCoordinator sends runs to the Dagster Daemon via a queue. The daemon pulls from the queue, and calls launch_run on the submitted runs. Using this run coordinator enables instance-level limits on run concurrency, as well as custom run prioritization rules.

Limiting Run Concurrency#

Dagster offers several options for limiting the number of concurrent pipeline runs on your instance. This can be useful when:

  • Your cluster has limited capacity and you need to avoid flooding it
  • You want to launch a backfill that only executes when there is capacity to spare, giving priority to other runs
  • Some of your runs connect to an external service that shouldn’t be overloaded

Setup#

Concurrency limits are supported by the QueuedRunCoordinator, which uses the Dagster Daemon to coordinate which runs execute. To use this run coordinator, configure it on your Dagster instance and include the daemon as part of your Dagster deployment.

Usage#

You can set the following parameters on the QueuedRunCoordinator by modifying its configuration in the Dagster instance yaml.

Run limits#

You can place limits on the number of runs that can be in progress at a single time. Any runs beyond this limit will be queued, and won’t use any compute.

  • An overall limit on the number of runs can be placed using the max_concurrent_runs key.
  • More specific limits can be configured based on run tags, with tag_concurrency_limits. Limits can be specified for all runs with a certain tag key or key-value pair.

If any limit would be exceeded by launching a run, then the run will stay queued.

For example, you can add the following to your dagster.yaml:

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 25
    tag_concurrency_limits:
      - key: "database"
        value: "redshift"
        limit: 4
      - key: "dagster/backfill"
        limit: 10

This run queue will only allow a maximum of 25 runs at once. Additionally, only 4 runs with the "database" tag equal to "redshift" can run at once, and at most 10 runs with the "dagster/backfill" tag and any value can run at once.

Priorities#

The run queue is a first-in-first-out priority queue. By default, all runs have a priority of 0. Dagster will launch runs with higher priority first. If multiple runs have the same priority, Dagster will launch the runs in the order you submitted them to the queue. Negative priorities are also allowed and are useful for de-prioritizing sets of runs such as backfills. Priority values must be integers.

You can specify a custom priority using the dagster/priority tag, which you can set in code on definitions or in the Dagit playground. Note that because both tag keys and values must be strings, so you must specify the priority integer as a string. For example, if you wanted to specify a priority of -1, you would set the dagster/priority tag value to be the string "-1".

# Attach a priority to a pipeline
@pipeline(tags={"dagster/priority": "3"})
def important_pipeline():
    ...

# Attach a priority to scheduled runs
@daily_schedule(
    pipeline_name="my_pipeline",
    ...
    tags={"dagster/priority": "-1"}
)
def less_important_schedule(date):
    ...

A priority tag on the pipeline definition can be overriden when launching in the Dagit playground.

Queuing semantics#

In the absence of tag limits and priorities, queued runs are first-in-first-out. However, a run that is blocked by tag limits will not block runs submitted after it.

To illustrate, let’s say we submit 3 runs in order: run A (tagged ‘foo’:‘bar’), run B (tagged ‘foo’:‘bar’), and run C (no tags). With no limits configured, the runs will be launched in the order they were submitted, A → B → C. Next, say we add the following instance configuration:

tag_concurrency_limits:
  - key: "foo"
    limit: 1

Now A and B are not able to execute concurrently, while there is no limit on C. Assume each will execute for at least a few minutes. If we submit in order A → B → C, then

  • A launches
  • B is passed over since A is in progress and there's a limit of 1 to concurrent ‘foo’ runs
  • C launches
  • Once A finishes, B launches

Thus the launch order will be A → C → B.

Troubleshooting queued runs#

Below are some possible issues that you can run into with limiting run concurrency using the QueuedRunCoordinator.

Runs skip the QUEUED status, going straight to STARTED#

This likely means that you are not using the QueuedRunCoordinator, which is responsible for run queuing. You can check the run coordinator on your instance in Dagit by going to Status -> Configuration.

Runs are not being dequeued#

If your runs stay in the QUEUED status, the issue is likely either that the Dagster Daemon is not deployed on your instance, or that the queue configuration is blocking your runs.

Step 1: Check that the Dagster Daemon is running on your instance#

In Dagit, go to Status -> HEALTH, and check that the Run queue is healthy. If not:

Is the Daemon running at all?#

Run queuing depends on the Daemon. See the Daemon Overview for how to set it up on your instance.

Is the Daemon accessing the same storages as the Dagit process?#

If you have started a Daemon process, you should make sure that the Daemon is accessing the same storages as the Dagit process, meaning both processes should be using the same dagster.yaml. Locally, this means both processes should have the same DAGSTER_HOME environment variable set. See the Dagster Instance docs for more information.

Step 2: Check your queue configuration#

If the Daemon is running on your instance, it may be the case that runs are intentionally being left in the queue due to concurrency rules. It may be helpful to look at the logged output from the Daemon process, as this will indicate when runs are skipped.

The run queue is configured in the dagster.yaml under run_coordinator. This can be viewed in Dagit via Status -> Configuration. Runs may be staying in QUEUED due to rules here. The most basic rule that would block the queue would be max_concurrent_runs: 0. In more practical cases, the queue will be blocked by some number of in progress runs. To understand the state of your run queue, it's helpful to view both the 'Queued' and 'In Progress' tabs of the Dagit run page.

Unblocking the queue#

If there are runs in progress that are blocking the queue, you have the option of terminating them via Dagit so that other runs can proceed.

Still stuck?#

If these steps didn't help, reach out in Slack or file an issue and we'll be happy to help investigate.