DagsterDocs

Source code for dagster.core.definitions.input

from collections import namedtuple
from typing import Optional, Set

from dagster import check
from dagster.core.definitions.events import AssetKey
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.types.dagster_type import (
    BuiltinScalarDagsterType,
    DagsterType,
    resolve_dagster_type,
)
from dagster.utils.backcompat import experimental_arg_warning

from .utils import check_valid_name


class _NoValueSentinel:
    pass


# unfortunately since type_check functions need TypeCheckContext which is only available
# at runtime, we can only check basic types before runtime
def _check_default_value(input_name, dagster_type, default_value):
    if default_value is not _NoValueSentinel:
        if dagster_type.is_nothing:
            raise DagsterInvalidDefinitionError(
                "Setting a default_value is invalid on InputDefinitions of type Nothing"
            )

        if isinstance(dagster_type, BuiltinScalarDagsterType):
            type_check = dagster_type.type_check_scalar_value(default_value)
            if not type_check.success:
                raise DagsterInvalidDefinitionError(
                    (
                        "Type check failed for the default_value of InputDefinition "
                        "{input_name} of type {dagster_type}. "
                        "Received value {value} of type {type}"
                    ).format(
                        input_name=input_name,
                        dagster_type=dagster_type.display_name,
                        value=default_value,
                        type=type(default_value),
                    ),
                )

    return default_value


[docs]class InputDefinition: """Defines an argument to a solid's compute function. Inputs may flow from previous solids' outputs, or be stubbed using config. They may optionally be typed using the Dagster type system. Args: name (str): Name of the input. dagster_type (Optional[Any]): The type of this input. Users should provide one of the :ref:`built-in types <builtin>`, a dagster type explicitly constructed with :py:func:`as_dagster_type`, :py:func:`@usable_as_dagster_type <dagster_type`, or :py:func:`PythonObjectDagsterType`, or a Python type. Defaults to :py:class:`Any`. description (Optional[str]): Human-readable description of the input. default_value (Optional[Any]): The default value to use if no input is provided. root_manager_key (Optional[str]): (Experimental) The resource key for the :py:class:`RootInputManager` used for loading this input when it is not connected to an upstream output. metadata (Optional[Dict[str, Any]]): (Experimental) A dict of metadata for the input. asset_key (Optional[Union[AssetKey, InputContext -> AssetKey]]): (Experimental) An AssetKey (or function that produces an AssetKey from the InputContext) which should be associated with this InputDefinition. Used for tracking lineage information through Dagster. asset_partitions (Optional[Union[Set[str], InputContext -> Set[str]]]): (Experimental) A set of partitions of the given asset_key (or a function that produces this list of partitions from the InputContext) which should be associated with this InputDefinition. """ def __init__( self, name, dagster_type=None, description=None, default_value=_NoValueSentinel, root_manager_key=None, metadata=None, asset_key=None, asset_partitions=None, ): self._name = check_valid_name(name) self._dagster_type = check.inst(resolve_dagster_type(dagster_type), DagsterType) self._description = check.opt_str_param(description, "description") self._default_value = _check_default_value(self._name, self._dagster_type, default_value) if root_manager_key: experimental_arg_warning("root_manager_key", "InputDefinition.__init__") self._root_manager_key = check.opt_str_param(root_manager_key, "root_manager_key") if metadata: experimental_arg_warning("metadata", "InputDefinition.__init__") self._metadata = check.opt_dict_param(metadata, "metadata", key_type=str) if asset_key: experimental_arg_warning("asset_key", "InputDefinition.__init__") self._is_asset = asset_key is not None if callable(asset_key): self._asset_key_fn = asset_key else: asset_key = check.opt_inst_param(asset_key, "asset_key", AssetKey) self._asset_key_fn = lambda _: asset_key if asset_partitions: experimental_arg_warning("asset_partitions", "InputDefinition.__init__") check.param_invariant( asset_key is not None, "asset_partitions", 'Cannot specify "asset_partitions" argument without also specifying "asset_key"', ) if callable(asset_partitions): self._asset_partitions_fn = asset_partitions else: asset_partitions = check.opt_set_param(asset_partitions, "asset_partitions", str) self._asset_partitions_fn = lambda _: asset_partitions @property def name(self): return self._name @property def dagster_type(self): return self._dagster_type @property def description(self): return self._description @property def has_default_value(self): return self._default_value is not _NoValueSentinel @property def default_value(self): check.invariant(self.has_default_value, "Can only fetch default_value if has_default_value") return self._default_value @property def root_manager_key(self): return self._root_manager_key @property def metadata(self): return self._metadata @property def is_asset(self): return self._is_asset def get_asset_key(self, context) -> Optional[AssetKey]: """Get the AssetKey associated with this InputDefinition for the given :py:class:`InputContext` (if any). Args: context (InputContext): The InputContext that this OutputDefinition is being evaluated in """ return self._asset_key_fn(context) def get_asset_partitions(self, context) -> Optional[Set[str]]: """Get the set of partitions that this solid will read from this InputDefinition for the given :py:class:`InputContext` (if any). Args: context (InputContext): The InputContext that this InputDefinition is being evaluated in """ return self._asset_partitions_fn(context) def mapping_to(self, solid_name, input_name, fan_in_index=None): """Create an input mapping to an input of a child solid. In a CompositeSolidDefinition, you can use this helper function to construct an :py:class:`InputMapping` to the input of a child solid. Args: solid_name (str): The name of the child solid to which to map this input. input_name (str): The name of the child solid' input to which to map this input. fan_in_index (Optional[int]): The index in to a fanned in input, else None Examples: .. code-block:: python input_mapping = InputDefinition('composite_input', Int).mapping_to( 'child_solid', 'int_input' ) """ check.str_param(solid_name, "solid_name") check.str_param(input_name, "input_name") check.opt_int_param(fan_in_index, "fan_in_index") if fan_in_index is not None: maps_to = FanInInputPointer(solid_name, input_name, fan_in_index) else: maps_to = InputPointer(solid_name, input_name) return InputMapping(self, maps_to)
class InputPointer(namedtuple("_InputPointer", "solid_name input_name")): def __new__(cls, solid_name, input_name): return super(InputPointer, cls).__new__( cls, check.str_param(solid_name, "solid_name"), check.str_param(input_name, "input_name"), ) class FanInInputPointer(namedtuple("_FanInInputPointer", "solid_name input_name fan_in_index")): def __new__(cls, solid_name, input_name, fan_in_index): return super(FanInInputPointer, cls).__new__( cls, check.str_param(solid_name, "solid_name"), check.str_param(input_name, "input_name"), check.int_param(fan_in_index, "fan_in_index"), )
[docs]class InputMapping(namedtuple("_InputMapping", "definition maps_to")): """Defines an input mapping for a composite solid. Args: definition (InputDefinition): Defines the input to the composite solid. solid_name (str): The name of the child solid onto which to map the input. input_name (str): The name of the input to the child solid onto which to map the input. """ def __new__(cls, definition, maps_to): return super(InputMapping, cls).__new__( cls, check.inst_param(definition, "definition", InputDefinition), check.inst_param(maps_to, "maps_to", (InputPointer, FanInInputPointer)), ) @property def maps_to_fan_in(self): return isinstance(self.maps_to, FanInInputPointer)