DagsterDocs

Source code for dagster.core.definitions.output

from collections import namedtuple
from typing import Optional, Set

from dagster import check
from dagster.core.definitions.events import AssetKey
from dagster.core.types.dagster_type import resolve_dagster_type
from dagster.utils.backcompat import experimental_arg_warning

from .utils import DEFAULT_OUTPUT, check_valid_name


[docs]class OutputDefinition: """Defines an output from a solid's compute function. Solids can have multiple outputs, in which case outputs cannot be anonymous. Many solids have only one output, in which case the user can provide a single output definition that will be given the default name, "result". Output definitions may be typed using the Dagster type system. Args: dagster_type (Optional[Any]): The type of this output. Users should provide one of the :ref:`built-in types <builtin>`, a dagster type explicitly constructed with :py:func:`as_dagster_type`, :py:func:`@usable_as_dagster_type <dagster_type`, or :py:func:`PythonObjectDagsterType`, or a Python type. Defaults to :py:class:`Any`. name (Optional[str]): Name of the output. (default: "result") description (Optional[str]): Human-readable description of the output. is_required (Optional[bool]): Whether the presence of this field is required. (default: True) io_manager_key (Optional[str]): The resource key of the output manager used for this output. (default: "io_manager"). metadata (Optional[Dict[str, Any]]): (Experimental) A dict of the metadata for the output. For example, users can provide a file path if the data object will be stored in a filesystem, or provide information of a database table when it is going to load the data into the table. asset_key (Optional[Union[AssetKey, OutputContext -> AssetKey]]): (Experimental) An AssetKey (or function that produces an AssetKey from the OutputContext) which should be associated with this OutputDefinition. Used for tracking lineage information through Dagster. asset_partitions (Optional[Union[Set[str], OutputContext -> Set[str]]]): (Experimental) A set of partitions of the given asset_key (or a function that produces this list of partitions from the OutputContext) which should be associated with this OutputDefinition. """ def __init__( self, dagster_type=None, name=None, description=None, is_required=None, io_manager_key=None, metadata=None, asset_key=None, asset_partitions=None, ): self._name = check_valid_name(check.opt_str_param(name, "name", DEFAULT_OUTPUT)) self._dagster_type = resolve_dagster_type(dagster_type) self._description = check.opt_str_param(description, "description") self._is_required = check.opt_bool_param(is_required, "is_required", default=True) self._manager_key = check.opt_str_param( io_manager_key, "io_manager_key", default="io_manager" ) if metadata: experimental_arg_warning("metadata", "OutputDefinition.__init__") self._metadata = metadata if asset_key: experimental_arg_warning("asset_key", "OutputDefinition.__init__") self._is_asset = asset_key is not None if callable(asset_key): self._asset_key_fn = asset_key else: asset_key = check.opt_inst_param(asset_key, "asset_key", AssetKey) self._asset_key_fn = lambda _: asset_key if asset_partitions: experimental_arg_warning("asset_partitions", "OutputDefinition.__init__") check.param_invariant( asset_key is not None, "asset_partitions", 'Cannot specify "asset_partitions" argument without also specifying "asset_key"', ) if callable(asset_partitions): self._asset_partitions_fn = asset_partitions else: asset_partitions = check.opt_set_param(asset_partitions, "asset_partitions", str) self._asset_partitions_fn = lambda _: asset_partitions @property def name(self): return self._name @property def dagster_type(self): return self._dagster_type @property def description(self): return self._description @property def optional(self): return not self._is_required @property def is_required(self): return self._is_required @property def io_manager_key(self): return self._manager_key @property def metadata(self): return self._metadata @property def is_dynamic(self): return False @property def is_asset(self): return self._is_asset def get_asset_key(self, context) -> Optional[AssetKey]: """Get the AssetKey associated with this OutputDefinition for the given :py:class:`OutputContext` (if any). Args: context (OutputContext): The OutputContext that this OutputDefinition is being evaluated in """ return self._asset_key_fn(context) def get_asset_partitions(self, context) -> Optional[Set[str]]: """Get the set of partitions associated with this OutputDefinition for the given :py:class:`OutputContext` (if any). Args: context (OutputContext): The OutputContext that this OutputDefinition is being evaluated in """ return self._asset_partitions_fn(context) def mapping_from(self, solid_name, output_name=None): """Create an output mapping from an output of a child solid. In a CompositeSolidDefinition, you can use this helper function to construct an :py:class:`OutputMapping` from the output of a child solid. Args: solid_name (str): The name of the child solid from which to map this output. input_name (str): The name of the child solid's output from which to map this output. Examples: .. code-block:: python output_mapping = OutputDefinition(Int).mapping_from('child_solid') """ return OutputMapping(self, OutputPointer(solid_name, output_name))
[docs]class DynamicOutputDefinition(OutputDefinition): """ (Experimental) Variant of :py:class:`OutputDefinition <dagster.OutputDefinition>` for an output that will dynamically alter the graph at runtime. When using in a composition function such as :py:func:`@pipeline <dagster.pipeline>`, dynamic outputs must be used with either * ``map`` - clone downstream solids for each separate :py:class:`DynamicOutput` * ``collect`` - gather across all :py:class:`DynamicOutput` in to a list Uses the same constructor as :py:class:`OutputDefinition <dagster.OutputDefinition>` .. code-block:: python @solid( config_schema={ "path": Field(str, default_value=file_relative_path(__file__, "sample")) }, output_defs=[DynamicOutputDefinition(str)], ) def files_in_directory(context): path = context.solid_config["path"] dirname, _, filenames = next(os.walk(path)) for file in filenames: yield DynamicOutput(os.path.join(dirname, file), mapping_key=_clean(file)) @pipeline def process_directory(): files = files_in_directory() # use map to invoke a solid on each dynamic output file_results = files.map(process_file) # use collect to gather the results in to a list summarize_directory(file_results.collect()) """ @property def is_dynamic(self): return True
class OutputPointer(namedtuple("_OutputPointer", "solid_name output_name")): def __new__(cls, solid_name, output_name=None): return super(OutputPointer, cls).__new__( cls, check.str_param(solid_name, "solid_name"), check.opt_str_param(output_name, "output_name", DEFAULT_OUTPUT), )
[docs]class OutputMapping(namedtuple("_OutputMapping", "definition maps_from")): """Defines an output mapping for a composite solid. Args: definition (OutputDefinition): Defines the output of the composite solid. solid_name (str): The name of the child solid from which to map the output. output_name (str): The name of the child solid's output from which to map the output. """ def __new__(cls, definition, maps_from): return super(OutputMapping, cls).__new__( cls, check.inst_param(definition, "definition", OutputDefinition), check.inst_param(maps_from, "maps_from", OutputPointer), )