"""
This module contains the execution context objects that are internal to the system.
Not every property on these should be exposed to random Jane or Joe dagster user
so we have a different layer of objects that encode the explicit public API
in the user_context module
"""
from abc import ABC, abstractmethod, abstractproperty
from collections import namedtuple
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Set
from dagster import check
from dagster.core.definitions.hook import HookDefinition
from dagster.core.definitions.mode import ModeDefinition
from dagster.core.definitions.pipeline import PipelineDefinition
from dagster.core.definitions.pipeline_base import IPipeline
from dagster.core.definitions.reconstructable import ReconstructablePipeline
from dagster.core.definitions.resource import ScopedResourcesBuilder
from dagster.core.definitions.solid import SolidDefinition
from dagster.core.definitions.step_launcher import StepLauncher
from dagster.core.errors import DagsterInvalidPropertyError, DagsterInvariantViolationError
from dagster.core.execution.plan.outputs import StepOutputHandle
from dagster.core.execution.plan.step import ExecutionStep
from dagster.core.execution.plan.utils import build_resources_for_manager
from dagster.core.execution.retries import RetryMode
from dagster.core.executor.base import Executor
from dagster.core.log_manager import DagsterLogManager
from dagster.core.storage.io_manager import IOManager
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
from dagster.core.system_config.objects import EnvironmentConfig
from dagster.core.types.dagster_type import DagsterType, resolve_dagster_type
if TYPE_CHECKING:
from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
from dagster.core.definitions.dependency import Solid, SolidHandle
from dagster.core.storage.intermediate_storage import IntermediateStorage
from dagster.core.instance import DagsterInstance
from dagster.core.execution.plan.plan import ExecutionPlan
from dagster.core.definitions.resource import Resources
class PipelineExecutionContext(ABC):
"""
Context with data that can be used during pipeline or step execution, in both host mode
(where the process can't load any user code and has to rely on the ExecutionPlan)
and user mode (where the process can load any user code)
"""
@abstractproperty
def pipeline_run(self) -> PipelineRun:
pass
@abstractproperty
def run_id(self) -> str:
return self.pipeline_run.run_id
@abstractproperty
def run_config(self) -> dict:
return self.pipeline_run.run_config
@abstractproperty
def pipeline_name(self) -> str:
pass
@abstractproperty
def instance(self) -> "DagsterInstance":
pass
@abstractproperty
def raise_on_error(self) -> bool:
pass
@abstractproperty
def retry_mode(self) -> RetryMode:
pass
@abstractproperty
def execution_plan(self):
pass
@abstractproperty
def log(self) -> DagsterLogManager:
pass
@abstractproperty
def logging_tags(self) -> Dict[str, str]:
pass
@abstractmethod
def has_tag(self, key: str) -> bool:
pass
@abstractmethod
def get_tag(self, key: str) -> Optional[str]:
pass
class RunWorkerExecutionContext(PipelineExecutionContext):
"""
Context with data that can be used during the run worker process that's responsible
for orchestrating individual steps. May not be able to load user code.
"""
@abstractproperty
def reconstructable_pipeline(self) -> ReconstructablePipeline:
pass
@abstractproperty
def executor(self) -> Executor:
pass
class BaseStepExecutionContext(PipelineExecutionContext):
"""
Context with data that can be used during a single step's execution.
"""
@abstractproperty
def step(self) -> ExecutionStep:
pass
@property
def solid_handle(self) -> "SolidHandle":
return self.step.solid_handle
class HostModeExecutionContextData(
NamedTuple(
"_HostModeExecutionContextData",
[
("pipeline_run", PipelineRun),
("recon_pipeline", ReconstructablePipeline),
("execution_plan", "ExecutionPlan"),
("instance", "DagsterInstance"),
("raise_on_error", bool),
("retry_mode", RetryMode),
],
)
):
"""
HostModeExecutionContextData is the data that remains constant throughout the entire
execution of a pipeline in host mode.
"""
class HostModeExecutionContext:
"""
Context with data that can be used during pipeline or step execution, in host mode
(where the process can't load any user code and has to rely on the ExecutionPlan).
"""
__slots__ = ["_execution_context_data", "_log_manager"]
def __init__(
self,
execution_context_data: HostModeExecutionContextData,
log_manager: DagsterLogManager,
):
self._execution_context_data = check.inst_param(
execution_context_data, "execution_context_data", HostModeExecutionContextData
)
self._log_manager = check.inst_param(log_manager, "log_manager", DagsterLogManager)
@property
def pipeline_run(self) -> PipelineRun:
return self._execution_context_data.pipeline_run
@property
def run_id(self) -> str:
return self.pipeline_run.run_id
@property
def run_config(self) -> dict:
return self.pipeline_run.run_config
@property
def pipeline_name(self) -> str:
return self.pipeline_run.pipeline_name
@property
def instance(self) -> "DagsterInstance":
return self._execution_context_data.instance
@property
def raise_on_error(self) -> bool:
return self._execution_context_data.raise_on_error
@property
def execution_plan(self) -> "ExecutionPlan":
return self._execution_context_data.execution_plan
@property
def log(self) -> DagsterLogManager:
return self._log_manager
@property
def logging_tags(self) -> Dict[str, str]:
return self.log.logging_tags
def has_tag(self, key: str) -> bool:
check.str_param(key, "key")
return key in self.logging_tags
def get_tag(self, key: str) -> Optional[str]:
check.str_param(key, "key")
return self.logging_tags.get(key)
def for_step(self, step: ExecutionStep) -> "HostModeStepExecutionContext":
return HostModeStepExecutionContext(
execution_context_data=self._execution_context_data,
log_manager=self._log_manager.with_tags(**step.logging_tags),
step=step,
)
@property
def retry_mode(self) -> RetryMode:
return self._execution_context_data.retry_mode
class HostModeRunWorkerExecutionContext(HostModeExecutionContext, RunWorkerExecutionContext):
"""
Context with data that can be used in the run worker in host mode (which can orchestrate
steps using the ExecutionPlan but cannot load any user code).
"""
__slots__ = ["_executor"]
def __init__(
self,
execution_context_data: HostModeExecutionContextData,
log_manager: DagsterLogManager,
executor: Executor,
):
super(HostModeRunWorkerExecutionContext, self).__init__(
execution_context_data=execution_context_data,
log_manager=log_manager,
)
self._executor = check.inst_param(executor, "executor", Executor)
@property
def reconstructable_pipeline(self) -> ReconstructablePipeline:
return self._execution_context_data.recon_pipeline
@property
def executor(self) -> Executor:
return self._executor
class HostModeStepExecutionContext(HostModeExecutionContext, BaseStepExecutionContext):
"""
Context for logging events about a single step in host mode (for logging events within a
host mode run worker).
"""
__slots__ = ["_step"]
def __init__(
self,
execution_context_data: HostModeExecutionContextData,
log_manager: DagsterLogManager,
step: ExecutionStep,
):
self._step = check.inst_param(step, "step", ExecutionStep)
super(HostModeStepExecutionContext, self).__init__(
execution_context_data,
log_manager,
)
@property
def step(self) -> ExecutionStep:
return self._step
class SystemExecutionContextData(
namedtuple(
"_SystemExecutionContextData",
(
"pipeline_run scoped_resources_builder environment_config pipeline "
"mode_def intermediate_storage_def instance intermediate_storage "
"raise_on_error retry_mode execution_plan"
),
)
):
"""
SystemExecutionContextData is the data that remains constant throughout the entire
execution of a pipeline or plan.
"""
def __new__(
cls,
pipeline_run: PipelineRun,
scoped_resources_builder: ScopedResourcesBuilder,
environment_config: EnvironmentConfig,
pipeline: IPipeline,
mode_def: ModeDefinition,
intermediate_storage_def: Optional["IntermediateStorageDefinition"],
instance: "DagsterInstance",
intermediate_storage: "IntermediateStorage",
raise_on_error: bool,
retry_mode: RetryMode,
execution_plan: "ExecutionPlan",
):
from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
from dagster.core.storage.intermediate_storage import IntermediateStorage
from dagster.core.instance import DagsterInstance
from dagster.core.execution.plan.plan import ExecutionPlan
return super(SystemExecutionContextData, cls).__new__(
cls,
pipeline_run=check.inst_param(pipeline_run, "pipeline_run", PipelineRun),
scoped_resources_builder=check.inst_param(
scoped_resources_builder, "scoped_resources_builder", ScopedResourcesBuilder
),
environment_config=check.inst_param(
environment_config, "environment_config", EnvironmentConfig
),
pipeline=check.inst_param(pipeline, "pipeline", IPipeline),
mode_def=check.inst_param(mode_def, "mode_def", ModeDefinition),
intermediate_storage_def=check.opt_inst_param(
intermediate_storage_def, "intermediate_storage_def", IntermediateStorageDefinition
),
instance=check.inst_param(instance, "instance", DagsterInstance),
intermediate_storage=check.inst_param(
intermediate_storage, "intermediate_storage", IntermediateStorage
),
raise_on_error=check.bool_param(raise_on_error, "raise_on_error"),
retry_mode=check.inst_param(retry_mode, "retry_mode", RetryMode),
execution_plan=check.inst_param(execution_plan, "execution_plan", ExecutionPlan),
)
@property
def run_id(self) -> str:
return self.pipeline_run.run_id
@property
def run_config(self) -> dict:
return self.environment_config.original_config_dict
@property
def pipeline_name(self) -> str:
return self.pipeline_run.pipeline_name
class SystemExecutionContext:
__slots__ = ["_execution_context_data", "_log_manager", "_output_capture"]
def __init__(
self,
execution_context_data: SystemExecutionContextData,
log_manager: DagsterLogManager,
output_capture: Optional[Dict[StepOutputHandle, Any]] = None,
):
self._execution_context_data = check.inst_param(
execution_context_data, "execution_context_data", SystemExecutionContextData
)
self._log_manager = check.inst_param(log_manager, "log_manager", DagsterLogManager)
self._output_capture = output_capture
@property
def pipeline_run(self) -> PipelineRun:
return self._execution_context_data.pipeline_run
@property
def scoped_resources_builder(self) -> ScopedResourcesBuilder:
return self._execution_context_data.scoped_resources_builder
@property
def run_id(self) -> str:
return self._execution_context_data.run_id
@property
def run_config(self) -> dict:
return self._execution_context_data.run_config
@property
def environment_config(self) -> EnvironmentConfig:
return self._execution_context_data.environment_config
@property
def pipeline_name(self) -> str:
return self._execution_context_data.pipeline_name
@property
def mode_def(self) -> ModeDefinition:
return self._execution_context_data.mode_def
@property
def intermediate_storage_def(self) -> "IntermediateStorageDefinition":
return self._execution_context_data.intermediate_storage_def
@property
def instance(self) -> "DagsterInstance":
return self._execution_context_data.instance
@property
def intermediate_storage(self):
return self._execution_context_data.intermediate_storage
@property
def file_manager(self) -> None:
raise DagsterInvalidPropertyError(
"You have attempted to access the file manager which has been moved to resources in 0.10.0. "
"Please access it via `context.resources.file_manager` instead."
)
@property
def raise_on_error(self) -> bool:
return self._execution_context_data.raise_on_error
@property
def retry_mode(self) -> RetryMode:
return self._execution_context_data.retry_mode
@property
def log(self) -> DagsterLogManager:
return self._log_manager
@property
def logging_tags(self) -> Dict[str, str]:
return self._log_manager.logging_tags
@property
def execution_plan(self):
return self._execution_context_data.execution_plan
@property
def output_capture(self) -> Optional[Dict[StepOutputHandle, Any]]:
return self._output_capture
def has_tag(self, key: str) -> bool:
check.str_param(key, "key")
return key in self.logging_tags
def get_tag(self, key: str) -> Optional[str]:
check.str_param(key, "key")
return self.logging_tags.get(key)
def for_step(self, step: ExecutionStep) -> "SystemStepExecutionContext":
check.inst_param(step, "step", ExecutionStep)
return SystemStepExecutionContext(
self._execution_context_data,
self._log_manager.with_tags(**step.logging_tags),
step,
self.output_capture,
)
def for_type(self, dagster_type: DagsterType) -> "TypeCheckContext":
return TypeCheckContext(self._execution_context_data, self.log, dagster_type)
class SystemPipelineExecutionContext(SystemExecutionContext, RunWorkerExecutionContext):
__slots__ = ["_executor"]
def __init__(
self,
execution_context_data: SystemExecutionContextData,
log_manager: DagsterLogManager,
executor: Executor,
output_capture: Optional[Dict[StepOutputHandle, Any]] = None,
):
super(SystemPipelineExecutionContext, self).__init__(
execution_context_data, log_manager, output_capture=output_capture
)
self._executor = check.inst_param(executor, "executor", Executor)
@property
def reconstructable_pipeline(self) -> ReconstructablePipeline:
if not isinstance(self._execution_context_data.pipeline, ReconstructablePipeline):
raise DagsterInvariantViolationError(
"reconstructable_pipeline property must be a ReconstructablePipeline"
)
return self._execution_context_data.pipeline
@property
def executor(self) -> Executor:
return self._executor
class SystemStepExecutionContext(SystemExecutionContext, BaseStepExecutionContext):
__slots__ = ["_step", "_resources", "_required_resource_keys", "_step_launcher"]
def __init__(
self,
execution_context_data: SystemExecutionContextData,
log_manager: DagsterLogManager,
step: ExecutionStep,
output_capture: Optional[Dict[StepOutputHandle, Any]] = None,
):
from dagster.core.execution.resources_init import get_required_resource_keys_for_step
self._step = check.inst_param(step, "step", ExecutionStep)
super(SystemStepExecutionContext, self).__init__(execution_context_data, log_manager)
self._required_resource_keys = get_required_resource_keys_for_step(
execution_context_data.pipeline.get_definition(),
step,
execution_context_data.execution_plan,
execution_context_data.environment_config,
execution_context_data.intermediate_storage_def,
)
self._resources = self._execution_context_data.scoped_resources_builder.build(
self._required_resource_keys
)
step_launcher_resources = [
resource for resource in self._resources if isinstance(resource, StepLauncher)
]
self._step_launcher: Optional[StepLauncher] = None
if len(step_launcher_resources) > 1:
raise DagsterInvariantViolationError(
"Multiple required resources for solid {solid_name} have inherit StepLauncher"
"There should be at most one step launcher resource per solid.".format(
solid_name=step.solid_handle.name
)
)
elif len(step_launcher_resources) == 1:
self._step_launcher = step_launcher_resources[0]
self._log_manager = log_manager
self._output_capture = output_capture
def for_compute(self) -> "SystemComputeExecutionContext":
return SystemComputeExecutionContext(self._execution_context_data, self.log, self.step)
@property
def step(self) -> ExecutionStep:
return self._step
@property
def step_launcher(self) -> Optional[StepLauncher]:
return self._step_launcher
@property
def solid_def(self) -> SolidDefinition:
return self.solid.definition
@property
def pipeline(self) -> IPipeline:
return self._execution_context_data.pipeline
@property
def pipeline_def(self) -> PipelineDefinition:
return self._execution_context_data.pipeline.get_definition()
@property
def solid(self) -> "Solid":
return self.pipeline_def.get_solid(self._step.solid_handle)
@property
def resources(self) -> NamedTuple:
return self._resources
@property
def required_resource_keys(self) -> Set[str]:
return self._required_resource_keys
@property
def log(self) -> DagsterLogManager:
return self._log_manager
def for_hook(self, hook_def: HookDefinition) -> "HookContext":
return HookContext(self._execution_context_data, self.log, hook_def, self.step)
def _get_source_run_id(self, step_output_handle: StepOutputHandle) -> str:
# determine if the step is skipped
if (
# this is re-execution
self.pipeline_run.parent_run_id
# we are not re-executing the entire pipeline
and self.pipeline_run.step_keys_to_execute is not None
# this step is not being executed
and step_output_handle.step_key not in self.pipeline_run.step_keys_to_execute
):
return self.pipeline_run.parent_run_id
else:
return self.pipeline_run.run_id
def get_output_context(self, step_output_handle) -> "OutputContext":
return get_output_context(
self.execution_plan,
self.pipeline_def,
self.environment_config,
step_output_handle,
self._get_source_run_id(step_output_handle),
log_manager=self._log_manager,
step_context=self,
)
def for_input_manager(
self,
name: str,
config: dict,
metadata: Any,
dagster_type: DagsterType,
source_handle: Optional[StepOutputHandle] = None,
resource_config: Any = None,
resources: Optional["Resources"] = None,
) -> "InputContext":
return InputContext(
pipeline_name=self.pipeline_def.name,
name=name,
solid_def=self.solid_def,
config=config,
metadata=metadata,
upstream_output=self.get_output_context(source_handle) if source_handle else None,
dagster_type=dagster_type,
log_manager=self._log_manager,
step_context=self,
resource_config=resource_config,
resources=resources,
)
def using_default_intermediate_storage(self) -> bool:
from dagster.core.storage.system_storage import mem_intermediate_storage
# pylint: disable=comparison-with-callable
return (
self.intermediate_storage_def is None
or self.intermediate_storage_def == mem_intermediate_storage
)
def get_io_manager(self, step_output_handle) -> IOManager:
step_output = self.execution_plan.get_step_output(step_output_handle)
io_manager_key = (
self.pipeline_def.get_solid(step_output.solid_handle)
.output_def_named(step_output.name)
.io_manager_key
)
# backcompat: if intermediate storage is specified and the user hasn't overridden
# io_manager_key on the output, use the intermediate storage.
if io_manager_key == "io_manager" and not self.using_default_intermediate_storage():
from dagster.core.storage.intermediate_storage import IntermediateStorageAdapter
output_manager = IntermediateStorageAdapter(self.intermediate_storage)
else:
output_manager = getattr(self.resources, io_manager_key)
return check.inst(output_manager, IOManager)
[docs]class SystemComputeExecutionContext(SystemStepExecutionContext):
@property
def solid_config(self) -> Any:
solid_config = self.environment_config.solids.get(str(self.solid_handle))
return solid_config.config if solid_config else None
[docs]class TypeCheckContext(SystemExecutionContext):
"""The ``context`` object available to a type check function on a DagsterType.
Attributes:
log (DagsterLogManager): Centralized log dispatch from user code.
resources (Any): An object whose attributes contain the resources available to this solid.
run_id (str): The id of this pipeline run.
"""
def __init__(
self,
execution_context_data: SystemExecutionContextData,
log_manager: DagsterLogManager,
dagster_type: DagsterType,
):
super(TypeCheckContext, self).__init__(execution_context_data, log_manager)
self._resources = self._execution_context_data.scoped_resources_builder.build(
dagster_type.required_resource_keys
)
self._log_manager = log_manager
@property
def resources(self) -> NamedTuple:
return self._resources
[docs]class HookContext(SystemExecutionContext):
"""The ``context`` object available to a hook function on an DagsterEvent.
Attributes:
log (DagsterLogManager): Centralized log dispatch from user code.
hook_def (HookDefinition): The hook that the context object belongs to.
step (ExecutionStep): The compute step associated with the hook.
solid (Solid): The solid instance associated with the hook.
resources (NamedTuple): Resources available in the hook context.
solid_config (Any): The parsed config specific to this solid.
"""
def __init__(
self,
execution_context_data: SystemExecutionContextData,
log_manager: DagsterLogManager,
hook_def: HookDefinition,
step: ExecutionStep,
):
super(HookContext, self).__init__(execution_context_data, log_manager)
self._log_manager = log_manager
self._hook_def = check.inst_param(hook_def, "hook_def", HookDefinition)
self._step = check.inst_param(step, "step", ExecutionStep)
self._required_resource_keys = hook_def.required_resource_keys
self._resources = self._execution_context_data.scoped_resources_builder.build(
self._required_resource_keys
)
@property
def hook_def(self) -> HookDefinition:
return self._hook_def
@property
def step(self) -> ExecutionStep:
return self._step
@property
def pipeline_def(self) -> PipelineDefinition:
return self._execution_context_data.pipeline.get_definition()
@property
def solid(self) -> "Solid":
return self.pipeline_def.get_solid(self._step.solid_handle)
@property
def resources(self) -> NamedTuple:
return self._resources
@property
def required_resource_keys(self) -> Set[str]:
return self._required_resource_keys
@property
def solid_config(self) -> Any:
solid_config = self.environment_config.solids.get(str(self._step.solid_handle))
return solid_config.config if solid_config else None
[docs]class OutputContext(
namedtuple(
"_OutputContext",
"step_key name pipeline_name run_id metadata mapping_key config solid_def dagster_type log version step_context resource_config resources",
)
):
"""
The context object that is available to the `handle_output` method of an :py:class:`IOManager`.
Attributes:
step_key (str): The step_key for the compute step that produced the output.
name (str): The name of the output that produced the output.
pipeline_name (str): The name of the pipeline definition.
run_id (Optional[str]): The id of the run that produced the output.
metadata (Optional[Dict[str, Any]]): A dict of the metadata that is assigned to the
OutputDefinition that produced the output.
mapping_key (Optional[str]): The key that identifies a unique mapped output. None for regular outputs.
config (Optional[Any]): The configuration for the output.
solid_def (Optional[SolidDefinition]): The definition of the solid that produced the output.
dagster_type (Optional[DagsterType]): The type of this output.
log (Optional[DagsterLogManager]): The log manager to use for this output.
version (Optional[str]): (Experimental) The version of the output.
resources (Optional[ScopedResources]): The resources required by the output manager, specified by the
`required_resource_keys` parameter.
"""
def __new__(
cls,
step_key: str,
name: str,
pipeline_name: Optional[str] = None,
run_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
mapping_key: Optional[str] = None,
config: Any = None,
solid_def: Optional[SolidDefinition] = None,
dagster_type: Optional[DagsterType] = None,
log_manager: Optional[DagsterLogManager] = None,
version: Optional[str] = None,
# This is used internally by the intermediate storage adapter, we don't usually expect users to mock this.
step_context: Optional[SystemStepExecutionContext] = None,
resource_config: Optional[Any] = None,
resources: Optional["Resources"] = None,
):
return super(OutputContext, cls).__new__(
cls,
step_key=check.str_param(step_key, "step_key"),
name=check.str_param(name, "name"),
pipeline_name=check.opt_str_param(pipeline_name, "pipeline_name"),
run_id=check.opt_str_param(run_id, "run_id"),
metadata=check.opt_dict_param(metadata, "metadata"),
mapping_key=check.opt_str_param(mapping_key, "mapping_key"),
config=config,
solid_def=check.opt_inst_param(solid_def, "solid_def", SolidDefinition),
dagster_type=check.inst_param(
resolve_dagster_type(dagster_type), "dagster_type", DagsterType
), # this allows the user to mock the context with unresolved dagster type
log=check.opt_inst_param(log_manager, "log_manager", DagsterLogManager),
version=check.opt_str_param(version, "version"),
step_context=check.opt_inst_param(
step_context, "step_context", SystemStepExecutionContext
),
resource_config=resource_config,
resources=resources,
)
[docs] def get_run_scoped_output_identifier(self) -> List[str]:
"""Utility method to get a collection of identifiers that as a whole represent a unique
step output.
The unique identifier collection consists of
- ``run_id``: the id of the run which generates the output.
Note: This method also handles the re-execution memoization logic. If the step that
generates the output is skipped in the re-execution, the ``run_id`` will be the id
of its parent run.
- ``step_key``: the key for a compute step.
- ``name``: the name of the output. (default: 'result').
Returns:
List[str, ...]: A list of identifiers, i.e. run id, step key, and output name
"""
if self.mapping_key:
return [self.run_id, self.step_key, self.name, self.mapping_key]
return [self.run_id, self.step_key, self.name]
[docs]class InputContext(
namedtuple(
"_InputContext",
"name pipeline_name solid_def config metadata upstream_output dagster_type log step_context resource_config resources",
)
):
"""
The ``context`` object available to the load_input method of :py:class:`RootInputManager`.
Attributes:
name (Optional[str]): The name of the input that we're loading.
pipeline_name (Optional[str]): The name of the pipeline.
solid_def (Optional[SolidDefinition]): The definition of the solid that's loading the input.
config (Optional[Any]): The config attached to the input that we're loading.
metadata (Optional[Dict[str, Any]]): A dict of metadata that is assigned to the
InputDefinition that we're loading for.
upstream_output (Optional[OutputContext]): Info about the output that produced the object
we're loading.
dagster_type (Optional[DagsterType]): The type of this input.
log (Optional[DagsterLogManager]): The log manager to use for this input.
resource_config (Optional[Dict[str, Any]]): The config associated with the resource that
initializes the RootInputManager.
resources (ScopedResources): The resources required by the resource that initializes the
input manager. If using the :py:func:`@root_input_manager` decorator, these resources
correspond to those requested with the `required_resource_keys` parameter.
"""
def __new__(
cls,
pipeline_name: Optional[str] = None,
# This will be None when called from calling SolidExecutionResult.output_value
name: Optional[str] = None,
solid_def: Optional[SolidDefinition] = None,
config: Any = None,
metadata: Optional[Dict[str, Any]] = None,
upstream_output: Optional[OutputContext] = None,
dagster_type: Optional[DagsterType] = None,
log_manager: Optional[DagsterLogManager] = None,
# This is used internally by the intermediate storage adapter, we don't expect users to mock this.
step_context: Optional[SystemStepExecutionContext] = None,
resource_config: Any = None,
resources: Optional["Resources"] = None,
):
return super(InputContext, cls).__new__(
cls,
name=check.opt_str_param(name, "name"),
pipeline_name=check.opt_str_param(pipeline_name, "pipeline_name"),
solid_def=check.opt_inst_param(solid_def, "solid_def", SolidDefinition),
config=config,
metadata=metadata,
upstream_output=check.opt_inst_param(upstream_output, "upstream_output", OutputContext),
dagster_type=check.inst_param(
resolve_dagster_type(dagster_type), "dagster_type", DagsterType
), # this allows the user to mock the context with unresolved dagster type
log=check.opt_inst_param(log_manager, "log_manager", DagsterLogManager),
step_context=check.opt_inst_param(
step_context, "step_context", SystemStepExecutionContext
),
resource_config=resource_config,
resources=resources,
)
def _step_output_version(
pipeline_def: PipelineDefinition,
execution_plan: "ExecutionPlan",
environment_config: "EnvironmentConfig",
step_output_handle: StepOutputHandle,
) -> Optional[str]:
from dagster.core.execution.resolve_versions import resolve_step_output_versions
step_output_versions = resolve_step_output_versions(
pipeline_def, execution_plan, environment_config
)
return (
step_output_versions[step_output_handle]
if step_output_handle in step_output_versions
else None
)
def get_output_context(
execution_plan: "ExecutionPlan",
pipeline_def: PipelineDefinition,
environment_config: EnvironmentConfig,
step_output_handle: StepOutputHandle,
run_id: Optional[str] = None,
log_manager: Optional[DagsterLogManager] = None,
step_context: Optional[SystemStepExecutionContext] = None,
) -> OutputContext:
"""
Args:
run_id (str): The run ID of the run that produced the output, not necessarily the run that
the context will be used in.
"""
from dagster.core.execution.plan.plan import ExecutionPlan
check.inst_param(execution_plan, "execution_plan", ExecutionPlan)
check.inst_param(environment_config, "environment_config", EnvironmentConfig)
check.inst_param(step_output_handle, "step_output_handle", StepOutputHandle)
check.opt_str_param(run_id, "run_id")
step = execution_plan.get_step_by_key(step_output_handle.step_key)
# get config
solid_config = environment_config.solids.get(step.solid_handle.to_string())
outputs_config = solid_config.outputs
if outputs_config:
output_config = outputs_config.get_output_manager_config(step_output_handle.output_name)
else:
output_config = None
step_output = execution_plan.get_step_output(step_output_handle)
output_def = pipeline_def.get_solid(step_output.solid_handle).output_def_named(step_output.name)
io_manager_key = output_def.io_manager_key
resource_config = environment_config.resources[io_manager_key].config
resources = build_resources_for_manager(io_manager_key, step_context) if step_context else None
return OutputContext(
step_key=step_output_handle.step_key,
name=step_output_handle.output_name,
pipeline_name=pipeline_def.name,
run_id=run_id,
metadata=output_def.metadata,
mapping_key=step_output_handle.mapping_key,
config=output_config,
solid_def=pipeline_def.get_solid(step.solid_handle).definition,
dagster_type=output_def.dagster_type,
log_manager=log_manager,
version=(
_step_output_version(
pipeline_def, execution_plan, environment_config, step_output_handle
)
if MEMOIZED_RUN_TAG in pipeline_def.tags
else None
),
step_context=step_context,
resource_config=resource_config,
resources=resources,
)