from abc import ABC, abstractmethod
from collections import defaultdict, namedtuple
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Dict,
FrozenSet,
List,
NamedTuple,
Optional,
Set,
Tuple,
Type,
Union,
cast,
)
from dagster import check
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.serdes import whitelist_for_serdes
from dagster.utils import frozentags
from .hook import HookDefinition
from .input import FanInInputPointer, InputDefinition, InputMapping, InputPointer
from .output import OutputDefinition
from .utils import DEFAULT_OUTPUT, struct_to_string, validate_tags
if TYPE_CHECKING:
from .composition import MappedInputPlaceholder
from .graph import GraphDefinition
from .solid import NodeDefinition
[docs]class SolidInvocation(namedtuple("Solid", "name alias tags hook_defs")):
"""Identifies an instance of a solid in a pipeline dependency structure.
Args:
name (str): Name of the solid of which this is an instance.
alias (Optional[str]): Name specific to this instance of the solid. Necessary when there are
multiple instances of the same solid.
tags (Optional[Dict[str, Any]]): Optional tags values to extend or override those
set on the solid definition.
hook_defs (Optional[Set[HookDefinition]]): A set of hook definitions applied to the
solid instance.
Examples:
.. code-block:: python
pipeline = PipelineDefinition(
solid_defs=[solid_1, solid_2]
dependencies={
SolidInvocation('solid_1', alias='other_name') : {
'input_name' : DependencyDefinition('solid_1'),
},
'solid_2' : {
'input_name': DependencyDefinition('other_name'),
},
}
)
In general, users should prefer not to construct this class directly or use the
:py:class:`PipelineDefinition` API that requires instances of this class. Instead, use the
:py:func:`@pipeline <pipeline>` API:
.. code-block:: python
@pipeline
def pipeline():
other_name = solid_1.alias('other_name')
solid_2(other_name(solid_1))
"""
def __new__(
cls,
name: str,
alias: Optional[str] = None,
tags: Dict[str, str] = None,
hook_defs: FrozenSet[HookDefinition] = None,
):
name = check.str_param(name, "name")
alias = check.opt_str_param(alias, "alias")
tags = frozentags(check.opt_dict_param(tags, "tags", value_type=str, key_type=str))
hook_defs = frozenset(check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition))
return super(cls, SolidInvocation).__new__(cls, name, alias, tags, hook_defs)
class Solid:
"""
Solid invocation within a pipeline. Defined by its name inside the pipeline.
Attributes:
name (str):
Name of the solid inside the pipeline. Must be unique per-pipeline.
definition (NodeDefinition):
Definition of the Node.
"""
def __init__(
self,
name: str,
definition: "NodeDefinition",
graph_definition: "GraphDefinition",
tags: Dict[str, str] = None,
hook_defs: Optional[Set[HookDefinition]] = None,
):
from .graph import GraphDefinition
from .solid import NodeDefinition
self.name = check.str_param(name, "name")
self.definition = check.inst_param(definition, "definition", NodeDefinition)
self.graph_definition = check.inst_param(
graph_definition,
"graph_definition",
GraphDefinition,
)
self._additional_tags = validate_tags(tags)
self._hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition)
input_handles = {}
for name, input_def in self.definition.input_dict.items():
input_handles[name] = SolidInputHandle(self, input_def)
self._input_handles = input_handles
output_handles = {}
for name, output_def in self.definition.output_dict.items():
output_handles[name] = SolidOutputHandle(self, output_def)
self._output_handles = output_handles
def input_handles(self):
return self._input_handles.values()
def output_handles(self):
return self._output_handles.values()
def input_handle(self, name: str) -> "SolidInputHandle":
check.str_param(name, "name")
return self._input_handles[name]
def output_handle(self, name: str) -> "SolidOutputHandle":
check.str_param(name, "name")
return self._output_handles[name]
def has_input(self, name: str) -> bool:
return self.definition.has_input(name)
def input_def_named(self, name: str) -> InputDefinition:
return self.definition.input_def_named(name)
def has_output(self, name: str) -> bool:
return self.definition.has_output(name)
def output_def_named(self, name: str) -> OutputDefinition:
return self.definition.output_def_named(name)
@property
def is_composite(self) -> bool:
from .graph import GraphDefinition
return isinstance(self.definition, GraphDefinition)
@property
def input_dict(self):
return self.definition.input_dict
@property
def output_dict(self):
return self.definition.output_dict
@property
def tags(self) -> frozentags:
return self.definition.tags.updated_with(self._additional_tags)
def container_maps_input(self, input_name: str) -> bool:
return (
self.graph_definition.input_mapping_for_pointer(InputPointer(self.name, input_name))
is not None
)
def container_mapped_input(self, input_name: str) -> InputMapping:
return self.graph_definition.input_mapping_for_pointer(InputPointer(self.name, input_name))
def container_maps_fan_in_input(self, input_name: str, fan_in_index: int) -> bool:
return (
self.graph_definition.input_mapping_for_pointer(
FanInInputPointer(self.name, input_name, fan_in_index)
)
is not None
)
def container_mapped_fan_in_input(self, input_name: str, fan_in_index: int) -> InputMapping:
return self.graph_definition.input_mapping_for_pointer(
FanInInputPointer(self.name, input_name, fan_in_index)
)
@property
def hook_defs(self) -> Set[HookDefinition]:
return self._hook_defs
@whitelist_for_serdes
class SolidHandle(namedtuple("_SolidHandle", "name parent")):
def __new__(cls, name: str, parent: Optional["SolidHandle"]):
return super(SolidHandle, cls).__new__(
cls,
check.str_param(name, "name"),
check.opt_inst_param(parent, "parent", SolidHandle),
)
def __str__(self):
return self.to_string()
@property
def path(self) -> List[str]:
"""Return a list representation of the handle.
Inverse of SolidHandle.from_path.
Returns:
List[str]:
"""
path = []
cur = self
while cur:
path.append(cur.name)
cur = cur.parent
path.reverse()
return path
def to_string(self) -> str:
"""Return a unique string representation of the handle.
Inverse of SolidHandle.from_string.
"""
return self.parent.to_string() + "." + self.name if self.parent else self.name
def is_or_descends_from(self, handle: "SolidHandle") -> bool:
"""Check if the handle is or descends from another handle.
Args:
handle (SolidHandle): The handle to check against.
Returns:
bool:
"""
check.inst_param(handle, "handle", SolidHandle)
for idx in range(len(handle.path)):
if idx >= len(self.path):
return False
if self.path[idx] != handle.path[idx]:
return False
return True
def pop(self, ancestor: "SolidHandle") -> Optional["SolidHandle"]:
"""Return a copy of the handle with some of its ancestors pruned.
Args:
ancestor (SolidHandle): Handle to an ancestor of the current handle.
Returns:
SolidHandle:
Example:
.. code-block:: python
handle = SolidHandle('baz', SolidHandle('bar', SolidHandle('foo', None)))
ancestor = SolidHandle('bar', SolidHandle('foo', None))
assert handle.pop(ancestor) == SolidHandle('baz', None)
"""
check.inst_param(ancestor, "ancestor", SolidHandle)
check.invariant(
self.is_or_descends_from(ancestor),
"Handle {handle} does not descend from {ancestor}".format(
handle=self.to_string(), ancestor=ancestor.to_string()
),
)
return SolidHandle.from_path(self.path[len(ancestor.path) :])
def with_ancestor(self, ancestor: "SolidHandle") -> Optional["SolidHandle"]:
"""Returns a copy of the handle with an ancestor grafted on.
Args:
ancestor (SolidHandle): Handle to the new ancestor.
Returns:
SolidHandle:
Example:
.. code-block:: python
handle = SolidHandle('baz', SolidHandle('bar', SolidHandle('foo', None)))
ancestor = SolidHandle('quux' None)
assert handle.with_ancestor(ancestor) == SolidHandle(
'baz', SolidHandle('bar', SolidHandle('foo', SolidHandle('quux', None)))
)
"""
check.opt_inst_param(ancestor, "ancestor", SolidHandle)
return SolidHandle.from_path((ancestor.path if ancestor else []) + self.path)
@staticmethod
def from_path(path: List[str]) -> Optional["SolidHandle"]:
check.list_param(path, "path", of_type=str)
cur = None
while len(path) > 0:
cur = SolidHandle(name=path.pop(0), parent=cur)
return cur
@staticmethod
def from_string(handle_str: str) -> Optional["SolidHandle"]:
check.str_param(handle_str, "handle_str")
path = handle_str.split(".")
return SolidHandle.from_path(path)
@classmethod
def from_dict(cls, dict_repr: Dict[str, Any]) -> Optional["SolidHandle"]:
"""This method makes it possible to load a potentially nested SolidHandle after a
roundtrip through json.loads(json.dumps(SolidHandle._asdict()))"""
check.dict_param(dict_repr, "dict_repr", key_type=str)
check.invariant(
"name" in dict_repr, "Dict representation of SolidHandle must have a 'name' key"
)
check.invariant(
"parent" in dict_repr, "Dict representation of SolidHandle must have a 'parent' key"
)
if isinstance(dict_repr["parent"], (list, tuple)):
dict_repr["parent"] = SolidHandle.from_dict(
{
"name": dict_repr["parent"][0],
"parent": dict_repr["parent"][1],
}
)
return SolidHandle(**{k: dict_repr[k] for k in ["name", "parent"]})
class SolidInputHandle(namedtuple("_SolidInputHandle", "solid input_def")):
def __new__(cls, solid: Solid, input_def: InputDefinition):
return super(SolidInputHandle, cls).__new__(
cls,
check.inst_param(solid, "solid", Solid),
check.inst_param(input_def, "input_def", InputDefinition),
)
def _inner_str(self) -> str:
return struct_to_string(
"SolidInputHandle",
solid_name=self.solid.name,
input_name=self.input_def.name,
)
def __str__(self):
return self._inner_str()
def __repr__(self):
return self._inner_str()
def __hash__(self):
return hash((self.solid.name, self.input_def.name))
def __eq__(self, other):
return self.solid.name == other.solid.name and self.input_def.name == other.input_def.name
@property
def solid_name(self) -> str:
return self.solid.name
@property
def input_name(self) -> str:
return self.input_def.name
class SolidOutputHandle(namedtuple("_SolidOutputHandle", "solid output_def")):
def __new__(cls, solid: Solid, output_def: OutputDefinition):
return super(SolidOutputHandle, cls).__new__(
cls,
check.inst_param(solid, "solid", Solid),
check.inst_param(output_def, "output_def", OutputDefinition),
)
def _inner_str(self) -> str:
return struct_to_string(
"SolidOutputHandle",
solid_name=self.solid.name,
output_name=self.output_def.name,
)
def __str__(self):
return self._inner_str()
def __repr__(self):
return self._inner_str()
def __hash__(self):
return hash((self.solid.name, self.output_def.name))
def __eq__(self, other: Any):
return self.solid.name == other.solid.name and self.output_def.name == other.output_def.name
def describe(self) -> str:
return f"{self.solid_name}:{self.output_def.name}"
@property
def solid_name(self) -> str:
return self.solid.name
@property
def is_dynamic(self) -> bool:
return self.output_def.is_dynamic
class DependencyType(Enum):
DIRECT = "DIRECT"
FAN_IN = "FAN_IN"
DYNAMIC_COLLECT = "DYNAMIC_COLLECT"
class IDependencyDefinition(ABC): # pylint: disable=no-init
@abstractmethod
def get_solid_dependencies(self) -> List["DependencyDefinition"]:
pass
@abstractmethod
def is_fan_in(self) -> bool:
"""The result passed to the corresponding input will be a List made from different solid outputs"""
[docs]class DependencyDefinition(
namedtuple("_DependencyDefinition", "solid output description"), IDependencyDefinition
):
"""Represents an edge in the DAG of solid instances forming a pipeline.
This object is used at the leaves of a dictionary structure that represents the complete
dependency structure of a pipeline whose keys represent the dependent solid and dependent
input, so this object only contains information about the dependee.
Concretely, if the input named 'input' of solid_b depends on the output named 'result' of
solid_a, this structure will look as follows:
.. code-block:: python
dependency_structure = {
'solid_b': {
'input': DependencyDefinition('solid_a', 'result')
}
}
In general, users should prefer not to construct this class directly or use the
:py:class:`PipelineDefinition` API that requires instances of this class. Instead, use the
:py:func:`@pipeline <pipeline>` API:
.. code-block:: python
@pipeline
def pipeline():
solid_b(solid_a())
Args:
solid (str): The name of the solid that is depended on, that is, from which the value
passed between the two solids originates.
output (Optional[str]): The name of the output that is depended on. (default: "result")
description (Optional[str]): Human-readable description of this dependency.
"""
def __new__(cls, solid: str, output: str = DEFAULT_OUTPUT, description: Optional[str] = None):
return super(DependencyDefinition, cls).__new__(
cls,
check.str_param(solid, "solid"),
check.str_param(output, "output"),
check.opt_str_param(description, "description"),
)
def get_solid_dependencies(self) -> List["DependencyDefinition"]:
return [self]
def is_fan_in(self) -> bool:
return False
[docs]class MultiDependencyDefinition(
namedtuple("_MultiDependencyDefinition", "dependencies"), IDependencyDefinition
):
"""Represents a fan-in edge in the DAG of solid instances forming a pipeline.
This object is used only when an input of type ``List[T]`` is assembled by fanning-in multiple
upstream outputs of type ``T``.
This object is used at the leaves of a dictionary structure that represents the complete
dependency structure of a pipeline whose keys represent the dependent solid and dependent
input, so this object only contains information about the dependee.
Concretely, if the input named 'input' of solid_c depends on the outputs named 'result' of
solid_a and solid_b, this structure will look as follows:
.. code-block:: python
dependency_structure = {
'solid_c': {
'input': MultiDependencyDefinition(
[
DependencyDefinition('solid_a', 'result'),
DependencyDefinition('solid_b', 'result')
]
)
}
}
In general, users should prefer not to construct this class directly or use the
:py:class:`PipelineDefinition` API that requires instances of this class. Instead, use the
:py:func:`@pipeline <pipeline>` API:
.. code-block:: python
@pipeline
def pipeline():
solid_c(solid_a(), solid_b())
Args:
solid (str): The name of the solid that is depended on, that is, from which the value
passed between the two solids originates.
output (Optional[str]): The name of the output that is depended on. (default: "result")
description (Optional[str]): Human-readable description of this dependency.
"""
def __new__(cls, dependencies: List[Union[DependencyDefinition, "MappedInputPlaceholder"]]):
from .composition import MappedInputPlaceholder
deps = check.list_param(dependencies, "dependencies")
seen = {}
for dep in deps:
if isinstance(dep, DependencyDefinition):
key = dep.solid + ":" + dep.output
if key in seen:
raise DagsterInvalidDefinitionError(
'Duplicate dependencies on solid "{dep.solid}" output "{dep.output}" '
"used in the same MultiDependencyDefinition.".format(dep=dep)
)
seen[key] = True
elif dep is MappedInputPlaceholder:
pass
else:
check.failed("Unexpected dependencies entry {}".format(dep))
return super(MultiDependencyDefinition, cls).__new__(cls, deps)
def get_solid_dependencies(self) -> List[DependencyDefinition]:
return [dep for dep in self.dependencies if isinstance(dep, DependencyDefinition)]
def is_fan_in(self) -> bool:
return True
def get_dependencies_and_mappings(self) -> List:
return self.dependencies
class DynamicCollectDependencyDefinition(
NamedTuple("_DynamicCollectDependencyDefinition", [("solid_name", str), ("output_name", str)]),
IDependencyDefinition,
):
def get_solid_dependencies(self) -> List[DependencyDefinition]:
return [DependencyDefinition(self.solid_name, self.output_name)]
def is_fan_in(self) -> bool:
return True
DepTypeAndOutputHandles = Tuple[
DependencyType,
Union[SolidOutputHandle, List[Union[SolidOutputHandle, Type["MappedInputPlaceholder"]]]],
]
InputToOutputHandleDict = Dict[SolidInputHandle, DepTypeAndOutputHandles]
def _create_handle_dict(
solid_dict: Dict[str, Solid],
dep_dict: Dict[str, Dict[str, IDependencyDefinition]],
) -> InputToOutputHandleDict:
from .composition import MappedInputPlaceholder
check.dict_param(solid_dict, "solid_dict", key_type=str, value_type=Solid)
check.two_dim_dict_param(dep_dict, "dep_dict", value_type=IDependencyDefinition)
handle_dict: InputToOutputHandleDict = {}
for solid_name, input_dict in dep_dict.items():
from_solid = solid_dict[solid_name]
for input_name, dep_def in input_dict.items():
if isinstance(dep_def, MultiDependencyDefinition):
handles: List[Union[SolidOutputHandle, Type[MappedInputPlaceholder]]] = []
for inner_dep in dep_def.get_dependencies_and_mappings():
if isinstance(inner_dep, DependencyDefinition):
handles.append(solid_dict[inner_dep.solid].output_handle(inner_dep.output))
elif inner_dep is MappedInputPlaceholder:
handles.append(inner_dep)
else:
check.failed(
"Unexpected MultiDependencyDefinition dependencies type {}".format(
inner_dep
)
)
handle_dict[from_solid.input_handle(input_name)] = (DependencyType.FAN_IN, handles)
elif isinstance(dep_def, DependencyDefinition):
handle_dict[from_solid.input_handle(input_name)] = (
DependencyType.DIRECT,
solid_dict[dep_def.solid].output_handle(dep_def.output),
)
elif isinstance(dep_def, DynamicCollectDependencyDefinition):
handle_dict[from_solid.input_handle(input_name)] = (
DependencyType.DYNAMIC_COLLECT,
solid_dict[dep_def.solid_name].output_handle(dep_def.output_name),
)
else:
check.failed(f"Unknown dependency type {dep_def}")
return handle_dict
class DependencyStructure:
@staticmethod
def from_definitions(solids: Dict[str, Solid], dep_dict: Dict[str, Any]):
return DependencyStructure(list(dep_dict.keys()), _create_handle_dict(solids, dep_dict))
def __init__(self, solid_names: List[str], handle_dict: InputToOutputHandleDict):
self._solid_names = solid_names
self._handle_dict = handle_dict
# Building up a couple indexes here so that one can look up all the upstream output handles
# or downstream input handles in O(1). Without this, this can become O(N^2) where N is solid
# count during the GraphQL query in particular
# solid_name => input_handle => list[output_handle]
self._solid_input_index: dict = defaultdict(dict)
# solid_name => output_handle => list[input_handle]
self._solid_output_index: dict = defaultdict(lambda: defaultdict(list))
# solid_name => dynamic output_handle that this will solid dupe for
self._dynamic_fan_out_index: dict = {}
# solid_name => set of dynamic output_handle this collects over
self._collect_index: Dict[str, set] = defaultdict(set)
for input_handle, (dep_type, output_handle_or_list) in self._handle_dict.items():
if dep_type == DependencyType.FAN_IN:
output_handle_list = []
for handle in output_handle_or_list:
if not isinstance(handle, SolidOutputHandle):
continue
if handle.is_dynamic:
raise DagsterInvalidDefinitionError(
"Currently, items in a fan-in dependency cannot be downstream of dynamic outputs. "
f'Problematic dependency on dynamic output "{handle.describe()}".'
)
if self._dynamic_fan_out_index.get(handle.solid_name):
raise DagsterInvalidDefinitionError(
"Currently, items in a fan-in dependency cannot be downstream of dynamic outputs. "
f'Problematic dependency on output "{handle.describe()}", downstream of '
f'"{self._dynamic_fan_out_index[handle.solid_name].describe()}".'
)
output_handle_list.append(handle)
elif dep_type == DependencyType.DIRECT:
output_handle = cast(SolidOutputHandle, output_handle_or_list)
if output_handle.is_dynamic:
self._validate_and_set_fan_out(input_handle, output_handle)
if self._dynamic_fan_out_index.get(output_handle.solid_name):
self._validate_and_set_fan_out(
input_handle, self._dynamic_fan_out_index[output_handle.solid_name]
)
output_handle_list = [output_handle]
elif dep_type == DependencyType.DYNAMIC_COLLECT:
output_handle = cast(SolidOutputHandle, output_handle_or_list)
if output_handle.is_dynamic:
self._validate_and_set_collect(input_handle, output_handle)
elif self._dynamic_fan_out_index.get(output_handle.solid_name):
self._validate_and_set_collect(
input_handle,
self._dynamic_fan_out_index[output_handle.solid_name],
)
else:
check.failed("Unexpected dynamic fan in dep created")
output_handle_list = [output_handle]
else:
check.failed(f"Unexpected dep type {dep_type}")
self._solid_input_index[input_handle.solid.name][input_handle] = output_handle_list
for output_handle in output_handle_list:
self._solid_output_index[output_handle.solid.name][output_handle].append(
input_handle
)
def _validate_and_set_fan_out(
self, input_handle: SolidInputHandle, output_handle: SolidOutputHandle
) -> Any:
"""Helper function for populating _dynamic_fan_out_index"""
if not input_handle.solid.definition.input_supports_dynamic_output_dep(
input_handle.input_name
):
raise DagsterInvalidDefinitionError(
f'Solid "{input_handle.solid_name}" cannot be downstream of dynamic output '
f'"{output_handle.describe()}" since input "{input_handle.input_name}" maps to a solid '
"that is already downstream of another dynamic output. Solids cannot be downstream of more "
"than one dynamic output"
)
if self._collect_index.get(input_handle.solid_name):
raise DagsterInvalidDefinitionError(
f'Solid "{input_handle.solid_name}" cannot be both downstream of dynamic output '
f"{output_handle.describe()} and collect over dynamic output "
f"{list(self._collect_index[input_handle.solid_name])[0].describe()}."
)
if self._dynamic_fan_out_index.get(input_handle.solid_name) is None:
self._dynamic_fan_out_index[input_handle.solid_name] = output_handle
return
if self._dynamic_fan_out_index[input_handle.solid_name] != output_handle:
raise DagsterInvalidDefinitionError(
f'Solid "{input_handle.solid_name}" cannot be downstream of more than one dynamic output. '
f'It is downstream of both "{output_handle.describe()}" and '
f'"{self._dynamic_fan_out_index[input_handle.solid_name].describe()}"'
)
def _validate_and_set_collect(
self,
input_handle: SolidInputHandle,
output_handle: SolidOutputHandle,
):
if self._dynamic_fan_out_index.get(input_handle.solid_name):
raise DagsterInvalidDefinitionError(
f'Solid "{input_handle.solid_name}" cannot both collect over dynamic output '
f"{output_handle.describe()} and be downstream of the dynamic output "
f"{self._dynamic_fan_out_index[input_handle.solid_name].describe()}."
)
self._collect_index[input_handle.solid_name].add(output_handle)
# if the output is already fanned out
if self._dynamic_fan_out_index.get(output_handle.solid_name):
raise DagsterInvalidDefinitionError(
f'Solid "{input_handle.solid_name}" cannot be downstream of more than one dynamic output. '
f'It is downstream of both "{output_handle.describe()}" and '
f'"{self._dynamic_fan_out_index[output_handle.solid_name].describe()}"'
)
def all_upstream_outputs_from_solid(self, solid_name: str) -> List[SolidOutputHandle]:
check.str_param(solid_name, "solid_name")
# flatten out all outputs that feed into the inputs of this solid
return [
output_handle
for output_handle_list in self._solid_input_index[solid_name].values()
for output_handle in output_handle_list
]
def input_to_upstream_outputs_for_solid(self, solid_name: str) -> Any:
"""
Returns a Dict[SolidInputHandle, List[SolidOutputHandle]] that encodes
where all the the inputs are sourced from upstream. Usually the
List[SolidOutputHandle] will be a list of one, except for the
multi-dependency case.
"""
check.str_param(solid_name, "solid_name")
return self._solid_input_index[solid_name]
def output_to_downstream_inputs_for_solid(self, solid_name: str) -> Any:
"""
Returns a Dict[SolidOutputHandle, List[SolidInputHandle]] that
represents all the downstream inputs for each output in the
dictionary
"""
check.str_param(solid_name, "solid_name")
return self._solid_output_index[solid_name]
def has_direct_dep(self, solid_input_handle: SolidInputHandle) -> bool:
check.inst_param(solid_input_handle, "solid_input_handle", SolidInputHandle)
if solid_input_handle not in self._handle_dict:
return False
dep_type, _ = self._handle_dict[solid_input_handle]
return dep_type == DependencyType.DIRECT
def get_direct_dep(self, solid_input_handle: SolidInputHandle) -> SolidOutputHandle:
check.inst_param(solid_input_handle, "solid_input_handle", SolidInputHandle)
dep_type, dep = self._handle_dict[solid_input_handle]
check.invariant(
dep_type == DependencyType.DIRECT,
f"Cannot call get_direct_dep when dep is not singular, got {dep_type}",
)
return cast(SolidOutputHandle, dep)
def has_fan_in_deps(self, solid_input_handle: SolidInputHandle) -> bool:
check.inst_param(solid_input_handle, "solid_input_handle", SolidInputHandle)
if solid_input_handle not in self._handle_dict:
return False
dep_type, _ = self._handle_dict[solid_input_handle]
return dep_type == DependencyType.FAN_IN
def get_fan_in_deps(
self, solid_input_handle: SolidInputHandle
) -> List[Union[SolidOutputHandle, Type["MappedInputPlaceholder"]]]:
check.inst_param(solid_input_handle, "solid_input_handle", SolidInputHandle)
dep_type, deps = self._handle_dict[solid_input_handle]
check.invariant(
dep_type == DependencyType.FAN_IN,
f"Cannot call get_multi_dep when dep is not fan in, got {dep_type}",
)
return cast(List[Union[SolidOutputHandle, Type["MappedInputPlaceholder"]]], deps)
def has_dynamic_fan_in_dep(self, solid_input_handle: SolidInputHandle) -> bool:
check.inst_param(solid_input_handle, "solid_input_handle", SolidInputHandle)
if solid_input_handle not in self._handle_dict:
return False
dep_type, _ = self._handle_dict[solid_input_handle]
return dep_type == DependencyType.DYNAMIC_COLLECT
def get_dynamic_fan_in_dep(self, solid_input_handle: SolidInputHandle) -> SolidOutputHandle:
check.inst_param(solid_input_handle, "solid_input_handle", SolidInputHandle)
dep_type, dep = self._handle_dict[solid_input_handle]
check.invariant(
dep_type == DependencyType.DYNAMIC_COLLECT,
f"Cannot call get_dynamic_fan_in_dep when dep is not, got {dep_type}",
)
return cast(SolidOutputHandle, dep)
def has_deps(self, solid_input_handle: SolidInputHandle) -> bool:
check.inst_param(solid_input_handle, "solid_input_handle", SolidInputHandle)
return solid_input_handle in self._handle_dict
def get_deps_list(self, solid_input_handle: SolidInputHandle) -> List[SolidOutputHandle]:
check.inst_param(solid_input_handle, "solid_input_handle", SolidInputHandle)
check.invariant(self.has_deps(solid_input_handle))
dep_type, handle_or_list = self._handle_dict[solid_input_handle]
if dep_type == DependencyType.DIRECT:
return [cast(SolidOutputHandle, handle_or_list)]
elif dep_type == DependencyType.DYNAMIC_COLLECT:
return [cast(SolidOutputHandle, handle_or_list)]
elif dep_type == DependencyType.FAN_IN:
return [handle for handle in handle_or_list if isinstance(handle, SolidOutputHandle)]
else:
check.failed(f"Unexpected dep type {dep_type}")
def input_handles(self) -> List[SolidInputHandle]:
return list(self._handle_dict.keys())
def get_upstream_dynamic_handle_for_solid(self, solid_name: str) -> Any:
return self._dynamic_fan_out_index.get(solid_name)
def get_dependency_type(self, solid_input_handle: SolidInputHandle) -> Optional[DependencyType]:
result = self._handle_dict.get(solid_input_handle)
if result is None:
return None
dep_type, _ = result
return dep_type
def is_dynamic_mapped(self, solid_name: str) -> bool:
return solid_name in self._dynamic_fan_out_index