from collections import namedtuple
from typing import Optional, Set
from dagster import check
from dagster.core.definitions.events import AssetKey
from dagster.core.types.dagster_type import resolve_dagster_type
from dagster.utils.backcompat import experimental_arg_warning
from .utils import DEFAULT_OUTPUT, check_valid_name
[docs]class OutputDefinition:
    """Defines an output from a solid's compute function.
    Solids can have multiple outputs, in which case outputs cannot be anonymous.
    Many solids have only one output, in which case the user can provide a single output definition
    that will be given the default name, "result".
    Output definitions may be typed using the Dagster type system.
    Args:
        dagster_type (Optional[Any]): The type of this output. Users should provide one of the
            :ref:`built-in types <builtin>`, a dagster type explicitly constructed with
            :py:func:`as_dagster_type`, :py:func:`@usable_as_dagster_type <dagster_type`, or
            :py:func:`PythonObjectDagsterType`, or a Python type. Defaults to :py:class:`Any`.
        name (Optional[str]): Name of the output. (default: "result")
        description (Optional[str]): Human-readable description of the output.
        is_required (Optional[bool]): Whether the presence of this field is required. (default: True)
        io_manager_key (Optional[str]): The resource key of the output manager used for this output.
            (default: "io_manager").
        metadata (Optional[Dict[str, Any]]): (Experimental) A dict of the metadata for the output.
            For example, users can provide a file path if the data object will be stored in a
            filesystem, or provide information of a database table when it is going to load the data
            into the table.
        asset_key (Optional[Union[AssetKey, OutputContext -> AssetKey]]): (Experimental) An AssetKey
            (or function that produces an AssetKey from the OutputContext) which should be associated
            with this OutputDefinition. Used for tracking lineage information through Dagster.
        asset_partitions (Optional[Union[Set[str], OutputContext -> Set[str]]]): (Experimental) A
            set of partitions of the given asset_key (or a function that produces this list of
            partitions from the OutputContext) which should be associated with this OutputDefinition.
    """
    def __init__(
        self,
        dagster_type=None,
        name=None,
        description=None,
        is_required=None,
        io_manager_key=None,
        metadata=None,
        asset_key=None,
        asset_partitions=None,
    ):
        self._name = check_valid_name(check.opt_str_param(name, "name", DEFAULT_OUTPUT))
        self._dagster_type = resolve_dagster_type(dagster_type)
        self._description = check.opt_str_param(description, "description")
        self._is_required = check.opt_bool_param(is_required, "is_required", default=True)
        self._manager_key = check.opt_str_param(
            io_manager_key, "io_manager_key", default="io_manager"
        )
        if metadata:
            experimental_arg_warning("metadata", "OutputDefinition.__init__")
        self._metadata = metadata
        if asset_key:
            experimental_arg_warning("asset_key", "OutputDefinition.__init__")
        self._is_asset = asset_key is not None
        if callable(asset_key):
            self._asset_key_fn = asset_key
        else:
            asset_key = check.opt_inst_param(asset_key, "asset_key", AssetKey)
            self._asset_key_fn = lambda _: asset_key
        if asset_partitions:
            experimental_arg_warning("asset_partitions", "OutputDefinition.__init__")
            check.param_invariant(
                asset_key is not None,
                "asset_partitions",
                'Cannot specify "asset_partitions" argument without also specifying "asset_key"',
            )
        if callable(asset_partitions):
            self._asset_partitions_fn = asset_partitions
        else:
            asset_partitions = check.opt_set_param(asset_partitions, "asset_partitions", str)
            self._asset_partitions_fn = lambda _: asset_partitions
    @property
    def name(self):
        return self._name
    @property
    def dagster_type(self):
        return self._dagster_type
    @property
    def description(self):
        return self._description
    @property
    def optional(self):
        return not self._is_required
    @property
    def is_required(self):
        return self._is_required
    @property
    def io_manager_key(self):
        return self._manager_key
    @property
    def metadata(self):
        return self._metadata
    @property
    def is_dynamic(self):
        return False
    @property
    def is_asset(self):
        return self._is_asset
    def get_asset_key(self, context) -> Optional[AssetKey]:
        """Get the AssetKey associated with this OutputDefinition for the given
        :py:class:`OutputContext` (if any).
        Args:
            context (OutputContext): The OutputContext that this OutputDefinition is being evaluated
            in
        """
        return self._asset_key_fn(context)
    def get_asset_partitions(self, context) -> Optional[Set[str]]:
        """Get the set of partitions associated with this OutputDefinition for the given
        :py:class:`OutputContext` (if any).
        Args:
            context (OutputContext): The OutputContext that this OutputDefinition is being evaluated
            in
        """
        return self._asset_partitions_fn(context)
    def mapping_from(self, solid_name, output_name=None):
        """Create an output mapping from an output of a child solid.
        In a CompositeSolidDefinition, you can use this helper function to construct
        an :py:class:`OutputMapping` from the output of a child solid.
        Args:
            solid_name (str): The name of the child solid from which to map this output.
            input_name (str): The name of the child solid's output from which to map this output.
        Examples:
            .. code-block:: python
                output_mapping = OutputDefinition(Int).mapping_from('child_solid')
        """
        return OutputMapping(self, OutputPointer(solid_name, output_name)) 
[docs]class DynamicOutputDefinition(OutputDefinition):
    """
    (Experimental) Variant of :py:class:`OutputDefinition <dagster.OutputDefinition>` for an
    output that will dynamically alter the graph at runtime.
    When using in a composition function such as :py:func:`@pipeline <dagster.pipeline>`,
    dynamic outputs must be used with either
    * ``map`` - clone downstream solids for each separate :py:class:`DynamicOutput`
    * ``collect`` - gather across all :py:class:`DynamicOutput` in to a list
    Uses the same constructor as :py:class:`OutputDefinition <dagster.OutputDefinition>`
        .. code-block:: python
            @solid(
                config_schema={
                    "path": Field(str, default_value=file_relative_path(__file__, "sample"))
                },
                output_defs=[DynamicOutputDefinition(str)],
            )
            def files_in_directory(context):
                path = context.solid_config["path"]
                dirname, _, filenames = next(os.walk(path))
                for file in filenames:
                    yield DynamicOutput(os.path.join(dirname, file), mapping_key=_clean(file))
            @pipeline
            def process_directory():
                files = files_in_directory()
                # use map to invoke a solid on each dynamic output
                file_results = files.map(process_file)
                # use collect to gather the results in to a list
                summarize_directory(file_results.collect())
    """
    @property
    def is_dynamic(self):
        return True 
class OutputPointer(namedtuple("_OutputPointer", "solid_name output_name")):
    def __new__(cls, solid_name, output_name=None):
        return super(OutputPointer, cls).__new__(
            cls,
            check.str_param(solid_name, "solid_name"),
            check.opt_str_param(output_name, "output_name", DEFAULT_OUTPUT),
        )
[docs]class OutputMapping(namedtuple("_OutputMapping", "definition maps_from")):
    """Defines an output mapping for a composite solid.
    Args:
        definition (OutputDefinition): Defines the output of the composite solid.
        solid_name (str): The name of the child solid from which to map the output.
        output_name (str): The name of the child solid's output from which to map the output.
    """
    def __new__(cls, definition, maps_from):
        return super(OutputMapping, cls).__new__(
            cls,
            check.inst_param(definition, "definition", OutputDefinition),
            check.inst_param(maps_from, "maps_from", OutputPointer),
        )