
from collections import namedtuple
from typing import Optional, Set
from dagster import check
from dagster.core.definitions.events import AssetKey
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.types.dagster_type import (
    BuiltinScalarDagsterType,
    DagsterType,
    resolve_dagster_type,
)
from dagster.utils.backcompat import experimental_arg_warning
from .utils import check_valid_name
class _NoValueSentinel:
    pass
# unfortunately since type_check functions need TypeCheckContext which is only available
# at runtime, we can only check basic types before runtime
def _check_default_value(input_name, dagster_type, default_value):
    if default_value is not _NoValueSentinel:
        if dagster_type.is_nothing:
            raise DagsterInvalidDefinitionError(
                "Setting a default_value is invalid on InputDefinitions of type Nothing"
            )
        if isinstance(dagster_type, BuiltinScalarDagsterType):
            type_check = dagster_type.type_check_scalar_value(default_value)
            if not type_check.success:
                raise DagsterInvalidDefinitionError(
                    (
                        "Type check failed for the default_value of InputDefinition "
                        "{input_name} of type {dagster_type}. "
                        "Received value {value} of type {type}"
                    ).format(
                        input_name=input_name,
                        dagster_type=dagster_type.display_name,
                        value=default_value,
                        type=type(default_value),
                    ),
                )
    return default_value
[docs]class InputDefinition:
    """Defines an argument to a solid's compute function.
    Inputs may flow from previous solids' outputs, or be stubbed using config. They may optionally
    be typed using the Dagster type system.
    Args:
        name (str): Name of the input.
        dagster_type (Optional[Any]):  The type of this input. Users should provide one of the
            :ref:`built-in types <builtin>`, a dagster type explicitly constructed with
            :py:func:`as_dagster_type`, :py:func:`@usable_as_dagster_type <dagster_type`, or
            :py:func:`PythonObjectDagsterType`, or a Python type. Defaults to :py:class:`Any`.
        description (Optional[str]): Human-readable description of the input.
        default_value (Optional[Any]): The default value to use if no input is provided.
        root_manager_key (Optional[str]): (Experimental) The resource key for the
            :py:class:`RootInputManager` used for loading this input when it is not connected to an
            upstream output.
        metadata (Optional[Dict[str, Any]]): (Experimental) A dict of metadata for the input.
        asset_key (Optional[Union[AssetKey, InputContext -> AssetKey]]): (Experimental) An AssetKey
            (or function that produces an AssetKey from the InputContext) which should be associated
            with this InputDefinition. Used for tracking lineage information through Dagster.
        asset_partitions (Optional[Union[Set[str], InputContext -> Set[str]]]): (Experimental) A
            set of partitions of the given asset_key (or a function that produces this list of
            partitions from the InputContext) which should be associated with this InputDefinition.
    """
    def __init__(
        self,
        name,
        dagster_type=None,
        description=None,
        default_value=_NoValueSentinel,
        root_manager_key=None,
        metadata=None,
        asset_key=None,
        asset_partitions=None,
    ):
        self._name = check_valid_name(name)
        self._dagster_type = check.inst(resolve_dagster_type(dagster_type), DagsterType)
        self._description = check.opt_str_param(description, "description")
        self._default_value = _check_default_value(self._name, self._dagster_type, default_value)
        if root_manager_key:
            experimental_arg_warning("root_manager_key", "InputDefinition.__init__")
        self._root_manager_key = check.opt_str_param(root_manager_key, "root_manager_key")
        if metadata:
            experimental_arg_warning("metadata", "InputDefinition.__init__")
        self._metadata = check.opt_dict_param(metadata, "metadata", key_type=str)
        if asset_key:
            experimental_arg_warning("asset_key", "InputDefinition.__init__")
        self._is_asset = asset_key is not None
        if callable(asset_key):
            self._asset_key_fn = asset_key
        else:
            asset_key = check.opt_inst_param(asset_key, "asset_key", AssetKey)
            self._asset_key_fn = lambda _: asset_key
        if asset_partitions:
            experimental_arg_warning("asset_partitions", "InputDefinition.__init__")
            check.param_invariant(
                asset_key is not None,
                "asset_partitions",
                'Cannot specify "asset_partitions" argument without also specifying "asset_key"',
            )
        if callable(asset_partitions):
            self._asset_partitions_fn = asset_partitions
        else:
            asset_partitions = check.opt_set_param(asset_partitions, "asset_partitions", str)
            self._asset_partitions_fn = lambda _: asset_partitions
    @property
    def name(self):
        return self._name
    @property
    def dagster_type(self):
        return self._dagster_type
    @property
    def description(self):
        return self._description
    @property
    def has_default_value(self):
        return self._default_value is not _NoValueSentinel
    @property
    def default_value(self):
        check.invariant(self.has_default_value, "Can only fetch default_value if has_default_value")
        return self._default_value
    @property
    def root_manager_key(self):
        return self._root_manager_key
    @property
    def metadata(self):
        return self._metadata
    @property
    def is_asset(self):
        return self._is_asset
    def get_asset_key(self, context) -> Optional[AssetKey]:
        """Get the AssetKey associated with this InputDefinition for the given
        :py:class:`InputContext` (if any).
        Args:
            context (InputContext): The InputContext that this OutputDefinition is being evaluated
                in
        """
        return self._asset_key_fn(context)
    def get_asset_partitions(self, context) -> Optional[Set[str]]:
        """Get the set of partitions that this solid will read from this InputDefinition for the given
        :py:class:`InputContext` (if any).
        Args:
            context (InputContext): The InputContext that this InputDefinition is being evaluated
                in
        """
        return self._asset_partitions_fn(context)
    def mapping_to(self, solid_name, input_name, fan_in_index=None):
        """Create an input mapping to an input of a child solid.
        In a CompositeSolidDefinition, you can use this helper function to construct
        an :py:class:`InputMapping` to the input of a child solid.
        Args:
            solid_name (str): The name of the child solid to which to map this input.
            input_name (str): The name of the child solid' input to which to map this input.
            fan_in_index (Optional[int]): The index in to a fanned in input, else None
        Examples:
            .. code-block:: python
                input_mapping = InputDefinition('composite_input', Int).mapping_to(
                    'child_solid', 'int_input'
                )
        """
        check.str_param(solid_name, "solid_name")
        check.str_param(input_name, "input_name")
        check.opt_int_param(fan_in_index, "fan_in_index")
        if fan_in_index is not None:
            maps_to = FanInInputPointer(solid_name, input_name, fan_in_index)
        else:
            maps_to = InputPointer(solid_name, input_name)
        return InputMapping(self, maps_to)
class InputPointer(namedtuple("_InputPointer", "solid_name input_name")):
    def __new__(cls, solid_name, input_name):
        return super(InputPointer, cls).__new__(
            cls,
            check.str_param(solid_name, "solid_name"),
            check.str_param(input_name, "input_name"),
        )
class FanInInputPointer(namedtuple("_FanInInputPointer", "solid_name input_name fan_in_index")):
    def __new__(cls, solid_name, input_name, fan_in_index):
        return super(FanInInputPointer, cls).__new__(
            cls,
            check.str_param(solid_name, "solid_name"),
            check.str_param(input_name, "input_name"),
            check.int_param(fan_in_index, "fan_in_index"),
        )
[docs]class InputMapping(namedtuple("_InputMapping", "definition maps_to")):
    """Defines an input mapping for a composite solid.
    Args:
        definition (InputDefinition): Defines the input to the composite solid.
        solid_name (str): The name of the child solid onto which to map the input.
        input_name (str): The name of the input to the child solid onto which to map the input.
    """
    def __new__(cls, definition, maps_to):
        return super(InputMapping, cls).__new__(
            cls,
            check.inst_param(definition, "definition", InputDefinition),
            check.inst_param(maps_to, "maps_to", (InputPointer, FanInInputPointer)),
        )
    @property
    def maps_to_fan_in(self):
        return isinstance(self.maps_to, FanInInputPointer)