DagsterDocs

Source code for dagster.core.events

"""Structured representations of system events."""
import logging
import os
from collections import namedtuple
from enum import Enum

from dagster import check
from dagster.core.definitions import (
    AssetKey,
    AssetMaterialization,
    EventMetadataEntry,
    ExpectationResult,
    Materialization,
    SolidHandle,
)
from dagster.core.definitions.events import AssetLineageInfo, ObjectStoreOperationType
from dagster.core.execution.context.system import (
    BaseStepExecutionContext,
    HookContext,
    PipelineExecutionContext,
    SystemExecutionContext,
)
from dagster.core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle
from dagster.core.execution.plan.outputs import StepOutputData
from dagster.core.log_manager import DagsterLogManager
from dagster.serdes import register_serdes_tuple_fallbacks, whitelist_for_serdes
from dagster.utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info
from dagster.utils.timing import format_duration


[docs]class DagsterEventType(Enum): """The types of events that may be yielded by solid and pipeline execution.""" STEP_OUTPUT = "STEP_OUTPUT" STEP_INPUT = "STEP_INPUT" STEP_FAILURE = "STEP_FAILURE" STEP_START = "STEP_START" STEP_SUCCESS = "STEP_SUCCESS" STEP_SKIPPED = "STEP_SKIPPED" STEP_UP_FOR_RETRY = "STEP_UP_FOR_RETRY" # "failed" but want to retry STEP_RESTARTED = "STEP_RESTARTED" ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION" STEP_EXPECTATION_RESULT = "STEP_EXPECTATION_RESULT" PIPELINE_INIT_FAILURE = "PIPELINE_INIT_FAILURE" PIPELINE_ENQUEUED = "PIPELINE_ENQUEUED" PIPELINE_DEQUEUED = "PIPELINE_DEQUEUED" PIPELINE_STARTING = "PIPELINE_STARTING" # Launch is happening, execution hasn't started yet PIPELINE_START = "PIPELINE_START" # Execution has started PIPELINE_SUCCESS = "PIPELINE_SUCCESS" PIPELINE_FAILURE = "PIPELINE_FAILURE" PIPELINE_CANCELING = "PIPELINE_CANCELING" PIPELINE_CANCELED = "PIPELINE_CANCELED" OBJECT_STORE_OPERATION = "OBJECT_STORE_OPERATION" ASSET_STORE_OPERATION = "ASSET_STORE_OPERATION" LOADED_INPUT = "LOADED_INPUT" HANDLED_OUTPUT = "HANDLED_OUTPUT" ENGINE_EVENT = "ENGINE_EVENT" HOOK_COMPLETED = "HOOK_COMPLETED" HOOK_ERRORED = "HOOK_ERRORED" HOOK_SKIPPED = "HOOK_SKIPPED"
STEP_EVENTS = { DagsterEventType.STEP_INPUT, DagsterEventType.STEP_START, DagsterEventType.STEP_OUTPUT, DagsterEventType.STEP_FAILURE, DagsterEventType.STEP_SUCCESS, DagsterEventType.STEP_SKIPPED, DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.STEP_EXPECTATION_RESULT, DagsterEventType.OBJECT_STORE_OPERATION, DagsterEventType.HANDLED_OUTPUT, DagsterEventType.LOADED_INPUT, DagsterEventType.STEP_RESTARTED, DagsterEventType.STEP_UP_FOR_RETRY, } FAILURE_EVENTS = { DagsterEventType.PIPELINE_INIT_FAILURE, DagsterEventType.PIPELINE_FAILURE, DagsterEventType.STEP_FAILURE, DagsterEventType.PIPELINE_CANCELED, } PIPELINE_EVENTS = { DagsterEventType.PIPELINE_ENQUEUED, DagsterEventType.PIPELINE_DEQUEUED, DagsterEventType.PIPELINE_STARTING, DagsterEventType.PIPELINE_START, DagsterEventType.PIPELINE_SUCCESS, DagsterEventType.PIPELINE_INIT_FAILURE, DagsterEventType.PIPELINE_FAILURE, DagsterEventType.PIPELINE_CANCELING, DagsterEventType.PIPELINE_CANCELED, } HOOK_EVENTS = { DagsterEventType.HOOK_COMPLETED, DagsterEventType.HOOK_ERRORED, DagsterEventType.HOOK_SKIPPED, } def _assert_type(method, expected_type, actual_type): check.invariant( expected_type == actual_type, ( "{method} only callable when event_type is {expected_type}, called on {actual_type}" ).format(method=method, expected_type=expected_type, actual_type=actual_type), ) def _validate_event_specific_data(event_type, event_specific_data): from dagster.core.execution.plan.objects import StepFailureData, StepSuccessData from dagster.core.execution.plan.inputs import StepInputData if event_type == DagsterEventType.STEP_OUTPUT: check.inst_param(event_specific_data, "event_specific_data", StepOutputData) elif event_type == DagsterEventType.STEP_FAILURE: check.inst_param(event_specific_data, "event_specific_data", StepFailureData) elif event_type == DagsterEventType.STEP_SUCCESS: check.inst_param(event_specific_data, "event_specific_data", StepSuccessData) elif event_type == DagsterEventType.ASSET_MATERIALIZATION: check.inst_param(event_specific_data, "event_specific_data", StepMaterializationData) elif event_type == DagsterEventType.STEP_EXPECTATION_RESULT: check.inst_param(event_specific_data, "event_specific_data", StepExpectationResultData) elif event_type == DagsterEventType.STEP_INPUT: check.inst_param(event_specific_data, "event_specific_data", StepInputData) elif event_type == DagsterEventType.ENGINE_EVENT: check.inst_param(event_specific_data, "event_specific_data", EngineEventData) elif event_type == DagsterEventType.HOOK_ERRORED: check.inst_param(event_specific_data, "event_specific_data", HookErroredData) return event_specific_data def log_step_event(step_context, event): check.inst_param(step_context, "step_context", BaseStepExecutionContext) check.inst_param(event, "event", DagsterEvent) event_type = DagsterEventType(event.event_type_value) log_fn = step_context.log.error if event_type in FAILURE_EVENTS else step_context.log.debug log_fn( event.message or "{event_type} for step {step_key}".format( event_type=event_type, step_key=step_context.step.key ), dagster_event=event, pipeline_name=step_context.pipeline_name, ) def log_pipeline_event(pipeline_context, event, step_key): event_type = DagsterEventType(event.event_type_value) log_fn = ( pipeline_context.log.error if event_type in FAILURE_EVENTS else pipeline_context.log.debug ) log_fn( event.message or "{event_type} for pipeline {pipeline_name}".format( event_type=event_type, pipeline_name=pipeline_context.pipeline_name ), dagster_event=event, pipeline_name=pipeline_context.pipeline_name, step_key=step_key, ) def log_resource_event(log_manager, pipeline_name, event): check.inst_param(log_manager, "log_manager", DagsterLogManager) check.inst_param(event, "event", DagsterEvent) check.inst(event.event_specific_data, EngineEventData) log_fn = log_manager.error if event.event_specific_data.error else log_manager.debug log_fn(event.message, dagster_event=event, pipeline_name=pipeline_name, step_key=event.step_key)
[docs]@whitelist_for_serdes class DagsterEvent( namedtuple( "_DagsterEvent", "event_type_value pipeline_name step_handle solid_handle step_kind_value " "logging_tags event_specific_data message pid step_key", ) ): """Events yielded by solid and pipeline execution. Users should not instantiate this class. Attributes: event_type_value (str): Value for a DagsterEventType. pipeline_name (str) step_key (str) solid_handle (SolidHandle) step_kind_value (str): Value for a StepKind. logging_tags (Dict[str, str]) event_specific_data (Any): Type must correspond to event_type_value. message (str) pid (int) step_key (Optional[str]): DEPRECATED """ @staticmethod def from_step(event_type, step_context, event_specific_data=None, message=None): check.inst_param(step_context, "step_context", BaseStepExecutionContext) event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, pipeline_name=step_context.pipeline_name, step_handle=step_context.step.handle, solid_handle=step_context.step.solid_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.logging_tags, event_specific_data=_validate_event_specific_data(event_type, event_specific_data), message=check.opt_str_param(message, "message"), pid=os.getpid(), ) log_step_event(step_context, event) return event @staticmethod def from_pipeline( event_type, pipeline_context, message=None, event_specific_data=None, step_handle=None ): check.inst_param(pipeline_context, "pipeline_context", PipelineExecutionContext) check.opt_inst_param( step_handle, "step_handle", (StepHandle, ResolvedFromDynamicStepHandle) ) pipeline_name = pipeline_context.pipeline_name event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, pipeline_name=check.str_param(pipeline_name, "pipeline_name"), message=check.opt_str_param(message, "message"), event_specific_data=_validate_event_specific_data(event_type, event_specific_data), step_handle=step_handle, pid=os.getpid(), ) step_key = step_handle.to_key() if step_handle else None log_pipeline_event(pipeline_context, event, step_key) return event @staticmethod def from_resource( pipeline_name, execution_plan, log_manager, message=None, event_specific_data=None ): from dagster.core.execution.plan.plan import ExecutionPlan check.inst_param(execution_plan, "execution_plan", ExecutionPlan) event = DagsterEvent( DagsterEventType.ENGINE_EVENT.value, pipeline_name=pipeline_name, message=check.opt_str_param(message, "message"), event_specific_data=_validate_event_specific_data( DagsterEventType.ENGINE_EVENT, event_specific_data ), step_handle=execution_plan.step_handle_for_single_step_plans(), pid=os.getpid(), ) log_resource_event(log_manager, pipeline_name, event) return event def __new__( cls, event_type_value, pipeline_name, step_handle=None, solid_handle=None, step_kind_value=None, logging_tags=None, event_specific_data=None, message=None, pid=None, # legacy step_key=None, ): event_type_value, event_specific_data = _handle_back_compat( event_type_value, event_specific_data ) # old events may contain solid_handle but not step_handle if solid_handle is not None and step_handle is None: step_handle = StepHandle(solid_handle) # Legacy events may have step_key set directly, preserve those to stay in sync # with legacy execution plan snapshots. if step_handle is not None and step_key is None: step_key = step_handle.to_key() return super(DagsterEvent, cls).__new__( cls, check.str_param(event_type_value, "event_type_value"), check.str_param(pipeline_name, "pipeline_name"), check.opt_inst_param( step_handle, "step_handle", (StepHandle, ResolvedFromDynamicStepHandle) ), check.opt_inst_param(solid_handle, "solid_handle", SolidHandle), check.opt_str_param(step_kind_value, "step_kind_value"), check.opt_dict_param(logging_tags, "logging_tags"), _validate_event_specific_data(DagsterEventType(event_type_value), event_specific_data), check.opt_str_param(message, "message"), check.opt_int_param(pid, "pid"), check.opt_str_param(step_key, "step_key"), ) @property def solid_name(self): return self.solid_handle.name @property def event_type(self): """DagsterEventType: The type of this event.""" return DagsterEventType(self.event_type_value) @property def is_step_event(self): return self.event_type in STEP_EVENTS @property def is_hook_event(self): return self.event_type in HOOK_EVENTS @property def step_kind(self): from dagster.core.execution.plan.step import StepKind return StepKind(self.step_kind_value) @property def is_step_success(self): return self.event_type == DagsterEventType.STEP_SUCCESS @property def is_successful_output(self): return self.event_type == DagsterEventType.STEP_OUTPUT @property def is_step_start(self): return self.event_type == DagsterEventType.STEP_START @property def is_step_failure(self): return self.event_type == DagsterEventType.STEP_FAILURE @property def is_step_skipped(self): return self.event_type == DagsterEventType.STEP_SKIPPED @property def is_step_up_for_retry(self): return self.event_type == DagsterEventType.STEP_UP_FOR_RETRY @property def is_step_restarted(self): return self.event_type == DagsterEventType.STEP_RESTARTED @property def is_pipeline_success(self): return self.event_type == DagsterEventType.PIPELINE_SUCCESS @property def is_pipeline_failure(self): return self.event_type == DagsterEventType.PIPELINE_FAILURE @property def is_pipeline_init_failure(self): return self.event_type == DagsterEventType.PIPELINE_INIT_FAILURE @property def is_failure(self): return self.event_type in FAILURE_EVENTS @property def is_pipeline_event(self): return self.event_type in PIPELINE_EVENTS @property def is_engine_event(self): return self.event_type == DagsterEventType.ENGINE_EVENT @property def is_handled_output(self): return self.event_type == DagsterEventType.HANDLED_OUTPUT @property def is_loaded_input(self): return self.event_type == DagsterEventType.LOADED_INPUT @property def is_step_materialization(self): return self.event_type == DagsterEventType.ASSET_MATERIALIZATION @property def asset_key(self): if self.event_type != DagsterEventType.ASSET_MATERIALIZATION: return None return self.step_materialization_data.materialization.asset_key @property def partition(self): if self.event_type != DagsterEventType.ASSET_MATERIALIZATION: return None return self.step_materialization_data.materialization.partition @property def step_input_data(self): _assert_type("step_input_data", DagsterEventType.STEP_INPUT, self.event_type) return self.event_specific_data @property def step_output_data(self): _assert_type("step_output_data", DagsterEventType.STEP_OUTPUT, self.event_type) return self.event_specific_data @property def step_success_data(self): _assert_type("step_success_data", DagsterEventType.STEP_SUCCESS, self.event_type) return self.event_specific_data @property def step_failure_data(self): _assert_type("step_failure_data", DagsterEventType.STEP_FAILURE, self.event_type) return self.event_specific_data @property def step_retry_data(self): _assert_type("step_retry_data", DagsterEventType.STEP_UP_FOR_RETRY, self.event_type) return self.event_specific_data @property def step_materialization_data(self): _assert_type( "step_materialization_data", DagsterEventType.ASSET_MATERIALIZATION, self.event_type ) return self.event_specific_data @property def step_expectation_result_data(self): _assert_type( "step_expectation_result_data", DagsterEventType.STEP_EXPECTATION_RESULT, self.event_type, ) return self.event_specific_data @property def pipeline_init_failure_data(self): _assert_type( "pipeline_init_failure_data", DagsterEventType.PIPELINE_INIT_FAILURE, self.event_type ) return self.event_specific_data @property def pipeline_failure_data(self): _assert_type("pipeline_failure_data", DagsterEventType.PIPELINE_FAILURE, self.event_type) return self.event_specific_data @property def engine_event_data(self): _assert_type("engine_event_data", DagsterEventType.ENGINE_EVENT, self.event_type) return self.event_specific_data @property def hook_completed_data(self): _assert_type("hook_completed_data", DagsterEventType.HOOK_COMPLETED, self.event_type) return self.event_specific_data @property def hook_errored_data(self): _assert_type("hook_errored_data", DagsterEventType.HOOK_ERRORED, self.event_type) return self.event_specific_data @property def hook_skipped_data(self): _assert_type("hook_skipped_data", DagsterEventType.HOOK_SKIPPED, self.event_type) return self.event_specific_data @staticmethod def step_output_event(step_context, step_output_data): check.inst_param(step_output_data, "step_output_data", StepOutputData) output_def = step_context.solid.output_def_named( step_output_data.step_output_handle.output_name ) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_OUTPUT, step_context=step_context, event_specific_data=step_output_data, message='Yielded output "{output_name}"{mapping_clause} of type "{output_type}".{type_check_clause}'.format( output_name=step_output_data.step_output_handle.output_name, output_type=output_def.dagster_type.display_name, type_check_clause=( " Warning! Type check failed." if not step_output_data.type_check_data.success else " (Type check passed)." ) if step_output_data.type_check_data else " (No type check).", mapping_clause=f' mapping key "{step_output_data.step_output_handle.mapping_key}"' if step_output_data.step_output_handle.mapping_key else "", ), ) @staticmethod def step_failure_event(step_context, step_failure_data): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_FAILURE, step_context=step_context, event_specific_data=step_failure_data, message='Execution of step "{step_key}" failed.'.format(step_key=step_context.step.key), ) @staticmethod def step_retry_event(step_context, step_retry_data): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_UP_FOR_RETRY, step_context=step_context, event_specific_data=step_retry_data, message='Execution of step "{step_key}" failed and has requested a retry{wait_str}.'.format( step_key=step_context.step.key, wait_str=" in {n} seconds".format(n=step_retry_data.seconds_to_wait) if step_retry_data.seconds_to_wait else "", ), ) @staticmethod def step_input_event(step_context, step_input_data): step_input = step_context.step.step_input_named(step_input_data.input_name) input_def = step_input.source.get_input_def(step_context.pipeline_def) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_INPUT, step_context=step_context, event_specific_data=step_input_data, message='Got input "{input_name}" of type "{input_type}".{type_check_clause}'.format( input_name=step_input_data.input_name, input_type=input_def.dagster_type.display_name, type_check_clause=( " Warning! Type check failed." if not step_input_data.type_check_data.success else " (Type check passed)." ) if step_input_data.type_check_data else " (No type check).", ), ) @staticmethod def step_start_event(step_context): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_START, step_context=step_context, message='Started execution of step "{step_key}".'.format( step_key=step_context.step.key ), ) @staticmethod def step_restarted_event(step_context, previous_attempts): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_RESTARTED, step_context=step_context, message='Started re-execution (attempt # {n}) of step "{step_key}".'.format( step_key=step_context.step.key, n=previous_attempts + 1 ), ) @staticmethod def step_success_event(step_context, success): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_SUCCESS, step_context=step_context, event_specific_data=success, message='Finished execution of step "{step_key}" in {duration}.'.format( step_key=step_context.step.key, duration=format_duration(success.duration_ms), ), ) @staticmethod def step_skipped_event(step_context): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_SKIPPED, step_context=step_context, message='Skipped execution of step "{step_key}".'.format( step_key=step_context.step.key ), ) @staticmethod def asset_materialization(step_context, materialization, asset_lineage=None): check.inst_param( materialization, "materialization", (AssetMaterialization, Materialization) ) check.opt_list_param(asset_lineage, "asset_lineage", AssetLineageInfo) return DagsterEvent.from_step( event_type=DagsterEventType.ASSET_MATERIALIZATION, step_context=step_context, event_specific_data=StepMaterializationData(materialization, asset_lineage), message=materialization.description if materialization.description else "Materialized value{label_clause}.".format( label_clause=" {label}".format(label=materialization.label) if materialization.label else "" ), ) @staticmethod def step_expectation_result(step_context, expectation_result): check.inst_param(expectation_result, "expectation_result", ExpectationResult) def _msg(): if expectation_result.description: return expectation_result.description return "Expectation{label_clause} {result_verb}".format( label_clause=" " + expectation_result.label if expectation_result.label else "", result_verb="passed" if expectation_result.success else "failed", ) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_EXPECTATION_RESULT, step_context=step_context, event_specific_data=StepExpectationResultData(expectation_result), message=_msg(), ) @staticmethod def pipeline_start(pipeline_context): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_START, pipeline_context, message='Started execution of pipeline "{pipeline_name}".'.format( pipeline_name=pipeline_context.pipeline_name ), ) @staticmethod def pipeline_success(pipeline_context): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_SUCCESS, pipeline_context, message='Finished execution of pipeline "{pipeline_name}".'.format( pipeline_name=pipeline_context.pipeline_name ), ) @staticmethod def pipeline_failure(pipeline_context, context_msg, error_info=None): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_FAILURE, pipeline_context, message='Execution of pipeline "{pipeline_name}" failed. {context_msg}'.format( pipeline_name=pipeline_context.pipeline_name, context_msg=check.str_param(context_msg, "context_msg"), ), event_specific_data=PipelineFailureData( check.opt_inst_param(error_info, "error_info", SerializableErrorInfo) ), ) @staticmethod def pipeline_canceled(pipeline_context, error_info=None): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_CANCELED, pipeline_context, message='Execution of pipeline "{pipeline_name}" canceled.'.format( pipeline_name=pipeline_context.pipeline_name ), event_specific_data=PipelineCanceledData( check.opt_inst_param(error_info, "error_info", SerializableErrorInfo) ), ) @staticmethod def resource_init_start(pipeline_name, execution_plan, log_manager, resource_keys): from dagster.core.execution.plan.plan import ExecutionPlan return DagsterEvent.from_resource( pipeline_name=check.str_param(pipeline_name, "pipeline_name"), execution_plan=check.inst_param(execution_plan, "execution_plan", ExecutionPlan), log_manager=check.inst_param(log_manager, "log_manager", DagsterLogManager), message="Starting initialization of resources [{}].".format( ", ".join(sorted(resource_keys)) ), event_specific_data=EngineEventData(metadata_entries=[], marker_start="resources"), ) @staticmethod def resource_init_success( pipeline_name, execution_plan, log_manager, resource_instances, resource_init_times ): from dagster.core.execution.plan.plan import ExecutionPlan metadata_entries = [] for resource_key in resource_instances.keys(): resource_obj = resource_instances[resource_key] resource_time = resource_init_times[resource_key] metadata_entries.append( EventMetadataEntry.python_artifact( resource_obj.__class__, resource_key, "Initialized in {}".format(resource_time) ) ) return DagsterEvent.from_resource( pipeline_name=check.str_param(pipeline_name, "pipeline_name"), execution_plan=check.inst_param(execution_plan, "execution_plan", ExecutionPlan), log_manager=check.inst_param(log_manager, "log_manager", DagsterLogManager), message="Finished initialization of resources [{}].".format( ", ".join(sorted(resource_init_times.keys())) ), event_specific_data=EngineEventData( metadata_entries=metadata_entries, marker_end="resources", ), ) @staticmethod def resource_init_failure(pipeline_name, execution_plan, log_manager, resource_keys, error): from dagster.core.execution.plan.plan import ExecutionPlan return DagsterEvent.from_resource( pipeline_name=check.str_param(pipeline_name, "pipeline_name"), execution_plan=check.inst_param(execution_plan, "execution_plan", ExecutionPlan), log_manager=check.inst_param(log_manager, "log_manager", DagsterLogManager), message="Initialization of resources [{}] failed.".format(", ".join(resource_keys)), event_specific_data=EngineEventData( metadata_entries=[], marker_end="resources", error=error, ), ) @staticmethod def resource_teardown_failure(pipeline_name, execution_plan, log_manager, resource_keys, error): from dagster.core.execution.plan.plan import ExecutionPlan return DagsterEvent.from_resource( pipeline_name=check.str_param(pipeline_name, "pipeline_name"), execution_plan=check.inst_param(execution_plan, "execution_plan", ExecutionPlan), log_manager=check.inst_param(log_manager, "log_manager", DagsterLogManager), message="Teardown of resources [{}] failed.".format(", ".join(resource_keys)), event_specific_data=EngineEventData( metadata_entries=[], marker_start=None, marker_end=None, error=error, ), ) @staticmethod def pipeline_init_failure(pipeline_name, failure_data, log_manager): check.inst_param(failure_data, "failure_data", PipelineInitFailureData) check.inst_param(log_manager, "log_manager", DagsterLogManager) # this failure happens trying to bring up context so can't use from_pipeline event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_INIT_FAILURE.value, pipeline_name=pipeline_name, event_specific_data=failure_data, message=( 'Pipeline failure during initialization of pipeline "{pipeline_name}". ' "This may be due to a failure in initializing a resource or logger." ).format(pipeline_name=pipeline_name), pid=os.getpid(), ) log_manager.error( event.message or "{event_type} for pipeline {pipeline_name}".format( event_type=DagsterEventType.PIPELINE_INIT_FAILURE, pipeline_name=pipeline_name ), dagster_event=event, pipeline_name=pipeline_name, ) return event @staticmethod def engine_event(pipeline_context, message, event_specific_data=None, step_handle=None): return DagsterEvent.from_pipeline( DagsterEventType.ENGINE_EVENT, pipeline_context, message, event_specific_data=event_specific_data, step_handle=step_handle, ) @staticmethod def object_store_operation(step_context, object_store_operation_result): from dagster.core.definitions.events import ObjectStoreOperation check.inst_param( object_store_operation_result, "object_store_operation_result", ObjectStoreOperation ) object_store_name = ( "{object_store_name} ".format( object_store_name=object_store_operation_result.object_store_name ) if object_store_operation_result.object_store_name else "" ) serialization_strategy_modifier = ( " using {serialization_strategy_name}".format( serialization_strategy_name=object_store_operation_result.serialization_strategy_name ) if object_store_operation_result.serialization_strategy_name else "" ) value_name = object_store_operation_result.value_name if ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.SET_OBJECT ): message = ( "Stored intermediate object for output {value_name} in " "{object_store_name}object store{serialization_strategy_modifier}." ).format( value_name=value_name, object_store_name=object_store_name, serialization_strategy_modifier=serialization_strategy_modifier, ) elif ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.GET_OBJECT ): message = ( "Retrieved intermediate object for input {value_name} in " "{object_store_name}object store{serialization_strategy_modifier}." ).format( value_name=value_name, object_store_name=object_store_name, serialization_strategy_modifier=serialization_strategy_modifier, ) elif ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.CP_OBJECT ): message = ( "Copied intermediate object for input {value_name} from {key} to {dest_key}" ).format( value_name=value_name, key=object_store_operation_result.key, dest_key=object_store_operation_result.dest_key, ) else: message = "" return DagsterEvent.from_step( DagsterEventType.OBJECT_STORE_OPERATION, step_context, event_specific_data=ObjectStoreOperationResultData( op=object_store_operation_result.op, value_name=value_name, address=object_store_operation_result.key, metadata_entries=[ EventMetadataEntry.path(object_store_operation_result.key, label="key") ], version=object_store_operation_result.version, mapping_key=object_store_operation_result.mapping_key, ), message=message, ) @staticmethod def handled_output( step_context, output_name, manager_key, message_override=None, metadata_entries=None, ): check.str_param(output_name, "output_name") check.str_param(manager_key, "manager_key") message = f'Handled output "{output_name}" using IO manager "{manager_key}"' return DagsterEvent.from_step( event_type=DagsterEventType.HANDLED_OUTPUT, step_context=step_context, event_specific_data=HandledOutputData( output_name=output_name, manager_key=manager_key, metadata_entries=metadata_entries if metadata_entries else [], ), message=message_override or message, ) @staticmethod def loaded_input( step_context, input_name, manager_key, upstream_output_name=None, upstream_step_key=None, message_override=None, ): check.str_param(input_name, "input_name") check.str_param(manager_key, "manager_key") check.opt_str_param(upstream_output_name, "upstream_output_name") check.opt_str_param(upstream_step_key, "upstream_step_key") message = f'Loaded input "{input_name}" using input manager "{manager_key}"' if upstream_output_name: message += f', from output "{upstream_output_name}" of step ' f'"{upstream_step_key}"' return DagsterEvent.from_step( event_type=DagsterEventType.LOADED_INPUT, step_context=step_context, event_specific_data=LoadedInputData( input_name=input_name, manager_key=manager_key, upstream_output_name=upstream_output_name, upstream_step_key=upstream_step_key, ), message=message_override or message, ) @staticmethod def hook_completed(hook_context, hook_def): event_type = DagsterEventType.HOOK_COMPLETED check.inst_param(hook_context, "hook_context", HookContext) event = DagsterEvent( event_type_value=event_type.value, pipeline_name=hook_context.pipeline_name, step_handle=hook_context.step.handle, solid_handle=hook_context.step.solid_handle, step_kind_value=hook_context.step.kind.value, logging_tags=hook_context.logging_tags, message=( 'Finished the execution of hook "{hook_name}" triggered for solid "{solid_name}".' ).format(hook_name=hook_def.name, solid_name=hook_context.solid.name), ) hook_context.log.debug( event.message, dagster_event=event, pipeline_name=hook_context.pipeline_name, ) return event @staticmethod def hook_errored(hook_context, error): event_type = DagsterEventType.HOOK_ERRORED check.inst_param(hook_context, "hook_context", HookContext) event = DagsterEvent( event_type_value=event_type.value, pipeline_name=hook_context.pipeline_name, step_handle=hook_context.step.handle, solid_handle=hook_context.step.solid_handle, step_kind_value=hook_context.step.kind.value, logging_tags=hook_context.logging_tags, event_specific_data=_validate_event_specific_data( event_type, HookErroredData( error=serializable_error_info_from_exc_info(error.original_exc_info) ), ), ) hook_context.log.error( str(error), dagster_event=event, pipeline_name=hook_context.pipeline_name, ) return event @staticmethod def hook_skipped(hook_context, hook_def): event_type = DagsterEventType.HOOK_SKIPPED check.inst_param(hook_context, "hook_context", HookContext) event = DagsterEvent( event_type_value=event_type.value, pipeline_name=hook_context.pipeline_name, step_handle=hook_context.step.handle, solid_handle=hook_context.step.solid_handle, step_kind_value=hook_context.step.kind.value, logging_tags=hook_context.logging_tags, message=( 'Skipped the execution of hook "{hook_name}". It did not meet its triggering ' 'condition during the execution of solid "{solid_name}".' ).format(hook_name=hook_def.name, solid_name=hook_context.solid.name), ) hook_context.log.debug( event.message, dagster_event=event, pipeline_name=hook_context.pipeline_name, ) return event
def get_step_output_event(events, step_key, output_name="result"): check.list_param(events, "events", of_type=DagsterEvent) check.str_param(step_key, "step_key") check.str_param(output_name, "output_name") for event in events: if ( event.event_type == DagsterEventType.STEP_OUTPUT and event.step_key == step_key and event.step_output_data.output_name == output_name ): return event return None @whitelist_for_serdes class StepMaterializationData( namedtuple("_StepMaterializationData", "materialization asset_lineage") ): def __new__(cls, materialization, asset_lineage=None): return super(StepMaterializationData, cls).__new__( cls, materialization=materialization, asset_lineage=check.opt_list_param(asset_lineage, "asset_lineage", AssetLineageInfo), ) @whitelist_for_serdes class StepExpectationResultData(namedtuple("_StepExpectationResultData", "expectation_result")): pass @whitelist_for_serdes class ObjectStoreOperationResultData( namedtuple( "_ObjectStoreOperationResultData", "op value_name metadata_entries address version mapping_key", ) ): def __new__( cls, op, value_name, metadata_entries, address=None, version=None, mapping_key=None ): return super(ObjectStoreOperationResultData, cls).__new__( cls, op=check.opt_str_param(op, "op"), value_name=check.opt_str_param(value_name, "value_name"), metadata_entries=check.opt_list_param(metadata_entries, "metadata_entries"), address=check.opt_str_param(address, "address"), version=check.opt_str_param(version, "version"), mapping_key=check.opt_str_param(mapping_key, "mapping_key"), ) @whitelist_for_serdes class EngineEventData( namedtuple("_EngineEventData", "metadata_entries error marker_start marker_end") ): # serdes log # * added optional error # * added marker_start / marker_end # def __new__(cls, metadata_entries=None, error=None, marker_start=None, marker_end=None): return super(EngineEventData, cls).__new__( cls, metadata_entries=check.opt_list_param( metadata_entries, "metadata_entries", EventMetadataEntry ), error=check.opt_inst_param(error, "error", SerializableErrorInfo), marker_start=check.opt_str_param(marker_start, "marker_start"), marker_end=check.opt_str_param(marker_end, "marker_end"), ) @staticmethod def in_process(pid, step_keys_to_execute=None, marker_end=None): check.int_param(pid, "pid") check.opt_list_param(step_keys_to_execute, "step_keys_to_execute") return EngineEventData( metadata_entries=[EventMetadataEntry.text(str(pid), "pid")] + ( [EventMetadataEntry.text(str(step_keys_to_execute), "step_keys")] if step_keys_to_execute else [] ), marker_end=marker_end, ) @staticmethod def multiprocess(pid, step_keys_to_execute=None): check.int_param(pid, "pid") check.opt_list_param(step_keys_to_execute, "step_keys_to_execute") return EngineEventData( metadata_entries=[EventMetadataEntry.text(str(pid), "pid")] + ( [EventMetadataEntry.text(str(step_keys_to_execute), "step_keys")] if step_keys_to_execute else [] ) ) @staticmethod def interrupted(steps_interrupted): check.list_param(steps_interrupted, "steps_interrupted", str) return EngineEventData( metadata_entries=[EventMetadataEntry.text(str(steps_interrupted), "steps_interrupted")] ) @staticmethod def engine_error(error): check.inst_param(error, "error", SerializableErrorInfo) return EngineEventData(metadata_entries=[], error=error) @whitelist_for_serdes class PipelineInitFailureData(namedtuple("_PipelineInitFailureData", "error")): def __new__(cls, error): return super(PipelineInitFailureData, cls).__new__( cls, error=check.inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes class PipelineFailureData(namedtuple("_PipelineFailureData", "error")): def __new__(cls, error): return super(PipelineFailureData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes class PipelineCanceledData(namedtuple("_PipelineCanceledData", "error")): def __new__(cls, error): return super(PipelineCanceledData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes class HookErroredData(namedtuple("_HookErroredData", "error")): def __new__(cls, error): return super(HookErroredData, cls).__new__( cls, error=check.inst_param(error, "error", SerializableErrorInfo), ) @whitelist_for_serdes class HandledOutputData( namedtuple("_HandledOutputData", "output_name manager_key metadata_entries") ): def __new__(cls, output_name, manager_key, metadata_entries=None): return super(HandledOutputData, cls).__new__( cls, output_name=check.str_param(output_name, "output_name"), manager_key=check.str_param(manager_key, "manager_key"), metadata_entries=check.opt_list_param( metadata_entries, "metadata_entries", EventMetadataEntry ), ) @whitelist_for_serdes class LoadedInputData( namedtuple("_LoadedInputData", "input_name manager_key upstream_output_name upstream_step_key") ): def __new__(cls, input_name, manager_key, upstream_output_name=None, upstream_step_key=None): return super(LoadedInputData, cls).__new__( cls, input_name=check.str_param(input_name, "input_name"), manager_key=check.str_param(manager_key, "manager_key"), upstream_output_name=check.opt_str_param(upstream_output_name, "upstream_output_name"), upstream_step_key=check.opt_str_param(upstream_step_key, "upstream_step_key"), ) ################################################################################################### # THE GRAVEYARD # # -|- -|- # | | # _-'~~~~~`-_ . _-'~~~~~`-_ # .' '. .' '. # | R I P | | R I P | # | | | | # | Synthetic | | Asset | # | Process | | Store | # | Events | | Operations | # | | | | ################################################################################################### # Keep these around to prevent issues like https://github.com/dagster-io/dagster/issues/3533 @whitelist_for_serdes class AssetStoreOperationData( namedtuple("_AssetStoreOperationData", "op step_key output_name asset_store_key") ): pass @whitelist_for_serdes class AssetStoreOperationType(Enum): SET_ASSET = "SET_ASSET" GET_ASSET = "GET_ASSET" def _handle_back_compat(event_type_value, event_specific_data): # transform old specific process events in to engine events if event_type_value == "PIPELINE_PROCESS_START": return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) elif event_type_value == "PIPELINE_PROCESS_STARTED": return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) elif event_type_value == "PIPELINE_PROCESS_EXITED": return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) # changes asset store ops in to get/set asset elif event_type_value == "ASSET_STORE_OPERATION": if event_specific_data.op in ("GET_ASSET", AssetStoreOperationType.GET_ASSET): return ( DagsterEventType.LOADED_INPUT.value, LoadedInputData( event_specific_data.output_name, event_specific_data.asset_store_key ), ) if event_specific_data.op in ("SET_ASSET", AssetStoreOperationType.SET_ASSET): return ( DagsterEventType.HANDLED_OUTPUT.value, HandledOutputData( event_specific_data.output_name, event_specific_data.asset_store_key, [] ), ) # previous name for ASSET_MATERIALIZATION was STEP_MATERIALIZATION if event_type_value == "STEP_MATERIALIZATION": return DagsterEventType.ASSET_MATERIALIZATION.value, event_specific_data else: return event_type_value, event_specific_data register_serdes_tuple_fallbacks( { "PipelineProcessStartedData": None, "PipelineProcessExitedData": None, "PipelineProcessStartData": None, } )