DagsterDocs

Solid Hooks#

Solid hooks let you define success and failure handling policies on solids.

Relevant APIs#

NameDescription
@failure_hookThe decorator to define a callback on solid failure.
@success_hookThe decorator to define a callback on solid success.
HookContextThe context object available to a hook function.

Overview#

A @success_hook or @failure_hook decorated function is called a solid hook. Solid hooks are designed for generic purposes — it can be anything you would like to do at a per solid level.


Defining a Solid Hook#

from dagster import HookContext, failure_hook, success_hook


@success_hook(required_resource_keys={"slack"})
def slack_message_on_success(context: HookContext):
    message = f"Solid {context.solid.name} finished successfully"
    context.resources.slack.chat.post_message(channel="#foo", text=message)


@failure_hook(required_resource_keys={"slack"})
def slack_message_on_failure(context: HookContext):
    message = f"Solid {context.solid.name} failed"
    context.resources.slack.chat.post_message(channel="#foo", text=message)

Hook context#

As you may have noticed, the hook function takes one argument, which is an instance of HookContext. The available properties on this context are:

  • context.log: loggers
  • context.solid: the solid associated with the hook.
  • context.solid_config: The config specific to the associated solid.
  • context.step: the execution step that triggers the hook.
  • context.resources: the resources the hook can use.

Using Hooks#

Dagster provides different ways to trigger solid hooks.

Applying a hook on every solid in a pipeline#

For example, you want to send a slack message to a channel when any solid fails in a pipeline. In this case, we will be applying a hook on a pipeline, which will apply the hook on every solid instance within in that pipeline.

You can simply use the hook created "slack_message_on_failure" above to decorate the pipeline "notif_all". Then, slack messages will be sent when any solid in the pipeline fails.

@slack_message_on_failure
@pipeline(mode_defs=mode_defs)
def notif_all():
    # the hook "slack_message_on_failure" is applied on every solid instance within this pipeline
    a()
    b()

Applying a hook on a solid#

However, sometimes a pipeline is a shared responsibility or you only want to be alerted on high-priority solid executions. So we also provide a way to set up hooks on solid instances which enables you to apply policies on a per-solid basis.

@pipeline(mode_defs=mode_defs)
def selective_notif():
    # only solid "a" triggers hooks: a slack message will be sent when it fails or succeeds
    a.with_hooks({slack_message_on_failure, slack_message_on_success})()
    # solid "b" won't trigger any hooks
    b()

In this case, solid "b" won't trigger any hooks, while when solid "a" fails or succeeds it will send a slack message.

Patterns#

Environment-specific hooks using modes#

Hooks use resource keys to access resources. After including the resource key in its set of required_resource_keys, the body of the hook can access the corresponding resource via the resources attribute of its context object.

It also enables you to switch resource values in different modes so that, for example, you can send slack messages only when you are in "prod" mode and mock the slack resource when you are in "dev" mode.

In this case, we can mock the slack_resource in the "dev" mode using a helper function ResourceDefinition.hardcoded_resource(), so it won't send slack messages when you are developing the pipeline.

mode_defs = [
    ModeDefinition(
        "dev",
        resource_defs={
            "slack": ResourceDefinition.hardcoded_resource(
                slack_resource_mock, "do not send messages in dev"
            )
        },
    ),
    ModeDefinition("prod", resource_defs={"slack": slack_resource}),
]

When we switch to "prod" mode, we can provide the real slack token in the run_config and therefore enable sending messages to a certain slack channel when a hook is triggered.

resources:
  slack:
    config:
      token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token

Then, we can execute a pipeline with the config through Python API, CLI, or the Dagit UI. Here's an example of using the Python API.

if __name__ == "__main__":
    with open(
        file_relative_path(__file__, "prod.yaml"),
        "r",
    ) as fd:
        run_config = yaml.safe_load(fd.read())
    result = execute_pipeline(notif_all, run_config=run_config, mode="prod")

Pipeline-level hooks#

Dagster currently does not have pipeline-scoped hooks. When you add a hook decorator to a pipeline, the hook is attached to every solid in the pipeline individually. The hook does not track pipeline-scoped events and only tracks solid-level success or failure events.

You may find the need for something like pipeline-level hooks. For example, you may want to run some code for every pipeline success or failure.

Even though Dagster does not have first-class pipeline-level hooks, you can use @sensor to achieve the same functionality. The sensor can monitor pipeline events at the instance level and perform an arbitrary action (such as send an alert or even instigate a new pipeline run).

You can find an example of the sensor at Pipeline failure sensor on the Sensors page.