DagsterDocs

Source code for dagster.core.definitions.decorators.pipeline

from functools import update_wrapper
from typing import Any, Callable, Dict, List, Optional, Set, Union

from dagster import check
from dagster.utils.backcompat import experimental_arg_warning

from ..hook import HookDefinition
from ..input import InputDefinition
from ..mode import ModeDefinition
from ..output import OutputDefinition
from ..pipeline import PipelineDefinition
from ..preset import PresetDefinition


class _Pipeline:
    def __init__(
        self,
        name: Optional[str] = None,
        mode_defs: Optional[List[ModeDefinition]] = None,
        preset_defs: Optional[List[PresetDefinition]] = None,
        description: Optional[str] = None,
        tags: Optional[Dict[str, Any]] = None,
        hook_defs: Optional[Set[HookDefinition]] = None,
        input_defs: Optional[List[InputDefinition]] = None,
        output_defs: Optional[List[OutputDefinition]] = None,
        config_schema: Optional[Dict[str, Any]] = None,
        config_fn: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,
    ):
        self.name = check.opt_str_param(name, "name")
        self.mode_definitions = check.opt_list_param(mode_defs, "mode_defs", ModeDefinition)
        self.preset_definitions = check.opt_list_param(preset_defs, "preset_defs", PresetDefinition)
        self.description = check.opt_str_param(description, "description")
        self.tags = check.opt_dict_param(tags, "tags")
        self.hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition)
        self.input_defs = check.opt_nullable_list_param(
            input_defs, "input_defs", of_type=InputDefinition
        )
        self.did_pass_outputs = output_defs is not None
        self.output_defs = check.opt_nullable_list_param(
            output_defs, "output_defs", of_type=OutputDefinition
        )
        self.config_schema = config_schema
        self.config_fn = check.opt_callable_param(config_fn, "config_fn")

    def __call__(self, fn: Callable[..., Any]) -> PipelineDefinition:
        check.callable_param(fn, "fn")

        if not self.name:
            self.name = fn.__name__

        from dagster.core.definitions.decorators.composite_solid import do_composition

        (
            input_mappings,
            output_mappings,
            dependencies,
            solid_defs,
            config_mapping,
            positional_inputs,
        ) = do_composition(
            "@pipeline",
            self.name,
            fn,
            self.input_defs,
            self.output_defs,
            self.config_schema,
            self.config_fn,
            ignore_output_from_composition_fn=not self.did_pass_outputs,
        )

        pipeline_def = PipelineDefinition(
            name=self.name,
            dependencies=dependencies,
            solid_defs=solid_defs,
            mode_defs=self.mode_definitions,
            preset_defs=self.preset_definitions,
            description=self.description,
            tags=self.tags,
            hook_defs=self.hook_defs,
            input_mappings=input_mappings,
            output_mappings=output_mappings,
            config_mapping=config_mapping,
            positional_inputs=positional_inputs,
        )
        update_wrapper(pipeline_def, fn)
        return pipeline_def


[docs]def pipeline( name: Union[Callable[..., Any], Optional[str]] = None, description: Optional[str] = None, mode_defs: Optional[List[ModeDefinition]] = None, preset_defs: Optional[List[PresetDefinition]] = None, tags: Optional[Dict[str, Any]] = None, hook_defs: Optional[Set[HookDefinition]] = None, input_defs: Optional[List[InputDefinition]] = None, output_defs: Optional[List[OutputDefinition]] = None, config_schema: Optional[Dict[str, Any]] = None, config_fn: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, ) -> Union[PipelineDefinition, _Pipeline]: """Create a pipeline with the specified parameters from the decorated composition function. Using this decorator allows you to build up the dependency graph of the pipeline by writing a function that invokes solids and passes the output to other solids. Args: name (Optional[str]): The name of the pipeline. Must be unique within any :py:class:`RepositoryDefinition` containing the pipeline. description (Optional[str]): A human-readable description of the pipeline. mode_defs (Optional[List[ModeDefinition]]): The set of modes in which this pipeline can operate. Modes are used to attach resources, custom loggers, custom system storage options, and custom executors to a pipeline. Modes can be used, e.g., to vary available resource and logging implementations between local test and production runs. preset_defs (Optional[List[PresetDefinition]]): A set of preset collections of configuration options that may be used to execute a pipeline. A preset consists of an environment dict, an optional subset of solids to execute, and a mode selection. Presets can be used to ship common combinations of options to pipeline end users in Python code, and can be selected by tools like Dagit. tags (Optional[Dict[str, Any]]): Arbitrary metadata for any execution run of the pipeline. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. These tag values may be overwritten by tag values provided at invocation time. hook_defs (Optional[Set[HookDefinition]]): A set of hook definitions applied to the pipeline. When a hook is applied to a pipeline, it will be attached to all solid instances within the pipeline. Example: .. code-block:: python @solid(output_defs=[OutputDefinition(int, "two"), OutputDefinition(int, "four")]) def emit_two_four(_) -> int: yield Output(2, "two") yield Output(4, "four") @lambda_solid def add_one(num: int) -> int: return num + 1 @lambda_solid def mult_two(num: int) -> int: return num * 2 @pipeline def math_pipeline(): two, four = emit_two_four() add_one(two) mult_two(four) """ if input_defs is not None: experimental_arg_warning("input_defs", "pipeline") if output_defs is not None: experimental_arg_warning("output_defs", "pipeline") if config_schema is not None: experimental_arg_warning("config_schema", "pipeline") if config_fn is not None: experimental_arg_warning("config_fn", "pipeline") if callable(name): check.invariant(description is None) return _Pipeline()(name) return _Pipeline( name=name, mode_defs=mode_defs, preset_defs=preset_defs, description=description, tags=tags, hook_defs=hook_defs, input_defs=input_defs, output_defs=output_defs, config_schema=config_schema, config_fn=config_fn, )