Source code for dagster.core.definitions.hook
from collections import namedtuple
from typing import Any, Callable, Optional, Set
from dagster import check
from dagster.core.errors import DagsterInvalidDefinitionError
from .utils import check_valid_name
[docs]class HookDefinition(namedtuple("_HookDefinition", "name hook_fn required_resource_keys")):
"""Define a hook which can be triggered during a solid execution (e.g. a callback on the step
execution failure event during a solid execution).
Args:
name (str): The name of this hook.
hook_fn (Callable): The callback function that will be triggered.
required_resource_keys (Optional[Set[str]]): Keys for the resources required by the
hook.
"""
def __new__(
cls,
name: str,
hook_fn: Callable[..., Any],
required_resource_keys: Optional[Set[str]] = None,
):
return super(HookDefinition, cls).__new__(
cls,
name=check_valid_name(name),
hook_fn=check.callable_param(hook_fn, "hook_fn"),
required_resource_keys=frozenset(
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
),
)
def __call__(self, obj):
"""This is invoked when the hook is used as a decorator.
We currently support hooks to decorate the following:
- PipelineDefinition: when the hook decorates a pipeline definition, it will be added to
all the solid invocations within the pipeline.
Example:
.. code-block:: python
@success_hook
def slack_message_on_success(_):
...
@slack_message_on_success
@pipeline
def a_pipeline():
foo(bar())
"""
from .pipeline import PipelineDefinition
if isinstance(obj, PipelineDefinition):
# when it decorates a pipeline, we apply this hook to all the solid invocations within
# the pipeline.
return obj.with_hooks({self})
else:
raise DagsterInvalidDefinitionError(
(
'Hook "{hook_name}" should decorate a pipeline definition, '
"or be applied on a solid invocation."
).format(hook_name=self.name)
)