DagsterDocs

Source code for dagster.core.definitions.sensor

import inspect
from contextlib import ExitStack

from dagster import check
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.instance import DagsterInstance
from dagster.core.instance.ref import InstanceRef
from dagster.utils import ensure_gen

from .job import JobType, RunRequest, SkipReason
from .mode import DEFAULT_MODE_NAME
from .utils import check_valid_name

DEFAULT_SENSOR_DAEMON_INTERVAL = 30


[docs]class SensorExecutionContext: """Sensor execution context. An instance of this class is made available as the first argument to the evaluation function on SensorDefinition. Attributes: instance_ref (InstanceRef): The serialized instance configured to run the schedule last_completion_time (float): The last time that the sensor was evaluated (UTC). last_run_key (str): The run key of the RunRequest most recently created by this sensor. """ __slots__ = [ "_instance_ref", "_last_completion_time", "_last_run_key", "_exit_stack", "_instance", ] def __init__(self, instance_ref, last_completion_time, last_run_key): self._exit_stack = ExitStack() self._instance = None self._instance_ref = check.inst_param(instance_ref, "instance_ref", InstanceRef) self._last_completion_time = check.opt_float_param( last_completion_time, "last_completion_time" ) self._last_run_key = check.opt_str_param(last_run_key, "last_run_key") self._instance = None def __enter__(self): return self def __exit__(self, _exception_type, _exception_value, _traceback): self._exit_stack.close() @property def instance(self): if not self._instance: self._instance = self._exit_stack.enter_context( DagsterInstance.from_ref(self._instance_ref) ) return self._instance @property def last_completion_time(self): return self._last_completion_time @property def last_run_key(self): return self._last_run_key
[docs]class SensorDefinition: """Define a sensor that initiates a set of runs based on some external state Args: name (str): The name of the sensor to create. pipeline_name (str): The name of the pipeline to execute when the sensor fires. evaluation_fn (Callable[[SensorExecutionContext]]): The core evaluation function for the sensor, which is run at an interval to determine whether a run should be launched or not. Takes a :py:class:`~dagster.SensorExecutionContext`. This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects. solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute when the sensor runs. e.g. ``['*some_solid+', 'other_solid']`` mode (Optional[str]): The mode to apply when executing runs triggered by this sensor. (default: 'default') minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. """ __slots__ = [ "_name", "_pipeline_name", "_tags_fn", "_run_config_fn", "_mode", "_solid_selection", "_description", "_evaluation_fn", "_min_interval", ] def __init__( self, name, pipeline_name, evaluation_fn, solid_selection=None, mode=None, minimum_interval_seconds=None, description=None, ): self._name = check_valid_name(name) self._pipeline_name = check.str_param(pipeline_name, "pipeline_name") self._mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) self._solid_selection = check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ) self._description = check.opt_str_param(description, "description") self._evaluation_fn = check.callable_param(evaluation_fn, "evaluation_fn") self._min_interval = check.opt_int_param( minimum_interval_seconds, "minimum_interval_seconds", DEFAULT_SENSOR_DAEMON_INTERVAL ) @property def name(self): return self._name @property def pipeline_name(self): return self._pipeline_name @property def job_type(self): return JobType.SENSOR @property def solid_selection(self): return self._solid_selection @property def mode(self): return self._mode @property def description(self): return self._description def get_execution_data(self, context): check.inst_param(context, "context", SensorExecutionContext) result = list(ensure_gen(self._evaluation_fn(context))) if not result or result == [None]: return [] if len(result) == 1: return check.is_list(result, of_type=(RunRequest, SkipReason)) return check.is_list(result, of_type=RunRequest) @property def minimum_interval_seconds(self): return self._min_interval
def wrap_sensor_evaluation(sensor_name, result): if inspect.isgenerator(result): for item in result: yield item elif isinstance(result, (SkipReason, RunRequest)): yield result elif result is not None: raise DagsterInvariantViolationError( f"Error in sensor {sensor_name}: Sensor unexpectedly returned output " f"{result} of type {type(result)}. Should only return SkipReason or " "RunRequest objects." )