DagsterDocs

Source code for dagster.core.scheduler.scheduler

import abc
import os
from collections import namedtuple

from dagster import check
from dagster.config import Field
from dagster.config.source import IntSource
from dagster.core.definitions.job import JobType
from dagster.core.errors import DagsterError
from dagster.core.host_representation import ExternalSchedule
from dagster.core.instance import DagsterInstance
from dagster.core.scheduler.job import JobState, JobStatus, ScheduleJobData
from dagster.serdes import ConfigurableClass
from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime
from dagster.utils import mkdir_p


class DagsterSchedulerError(DagsterError):
    """Base class for all Dagster Scheduler errors"""


class DagsterScheduleReconciliationError(DagsterError):
    """Error raised during schedule state reconcilation. During reconcilation, exceptions that are
    raised when trying to start or stop a schedule are collected and passed to this wrapper exception.
    The individual exceptions can be accessed by the `errors` property."""

    def __init__(self, preamble, errors, *args, **kwargs):
        self.errors = errors

        error_msg = preamble
        error_messages = []
        for i_error, error in enumerate(self.errors):
            error_messages.append(str(error))
            error_msg += "\n    Error {i_error}: {error_message}".format(
                i_error=i_error + 1, error_message=str(error)
            )

        self.message = error_msg
        self.error_messages = error_messages

        super(DagsterScheduleReconciliationError, self).__init__(error_msg, *args, **kwargs)


class DagsterScheduleDoesNotExist(DagsterSchedulerError):
    """Errors raised when ending a job for a schedule."""


class SchedulerDebugInfo(
    namedtuple("SchedulerDebugInfo", "errors scheduler_config_info scheduler_info schedule_storage")
):
    def __new__(cls, errors, scheduler_config_info, scheduler_info, schedule_storage):
        return super(SchedulerDebugInfo, cls).__new__(
            cls,
            errors=check.list_param(errors, "errors", of_type=str),
            scheduler_config_info=check.str_param(scheduler_config_info, "scheduler_config_info"),
            scheduler_info=check.str_param(scheduler_info, "scheduler_info"),
            schedule_storage=check.list_param(schedule_storage, "schedule_storage", of_type=str),
        )


[docs]class Scheduler(abc.ABC): """Abstract base class for a scheduler. This component is responsible for interfacing with an external system such as cron to ensure scheduled repeated execution according. """ def _get_schedule_state(self, instance, external_origin_id): schedule_state = instance.get_job_state(external_origin_id) if not schedule_state: raise DagsterScheduleDoesNotExist( "You have attempted to start the job for schedule id {id}, but its state is not in storage.".format( id=external_origin_id ) ) return schedule_state def _create_new_schedule_state(self, instance, external_schedule): schedule_state = JobState( external_schedule.get_external_origin(), JobType.SCHEDULE, JobStatus.STOPPED, ScheduleJobData(external_schedule.cron_schedule, scheduler=self.__class__.__name__), ) instance.add_job_state(schedule_state) return schedule_state def reconcile_scheduler_state(self, instance, external_repository): """Reconcile the ExternalSchedule list from the repository and ScheduleStorage on the instance to ensure there is a 1-1 correlation between ExternalSchedule and JobStates of type JobType.SCHEDULE, where the ExternalSchedule list is the source of truth. If a new ExternalSchedule is introduced, a new JobState is added to storage with status JobStatus.STOPPED. For every previously existing ExternalSchedule (where target id is the primary key), any changes to the definition are persisted in the corresponding JobState and the status is left unchanged. The schedule is also restarted to make sure the external artifacts (such as a cron job) are up to date. For every ScheduleDefinitions that is removed, the corresponding JobState is removed from the storage and the corresponding job is ended. """ schedules_to_restart = [] for external_schedule in external_repository.get_external_schedules(): # If a schedule already exists for schedule_def, overwrite bash script and # metadata file existing_schedule_state = instance.get_job_state( external_schedule.get_external_origin_id() ) if existing_schedule_state: new_timestamp = existing_schedule_state.job_specific_data.start_timestamp if not new_timestamp and existing_schedule_state.status == JobStatus.RUNNING: new_timestamp = get_timestamp_from_utc_datetime(get_current_datetime_in_utc()) # Keep the status, update target and cron schedule schedule_state = JobState( external_schedule.get_external_origin(), JobType.SCHEDULE, existing_schedule_state.status, ScheduleJobData( external_schedule.cron_schedule, new_timestamp, scheduler=self.__class__.__name__, ), ) instance.update_job_state(schedule_state) schedules_to_restart.append((existing_schedule_state, external_schedule)) else: self._create_new_schedule_state(instance, external_schedule) # Delete all existing schedules that are not in external schedules external_schedule_origin_ids = { s.get_external_origin_id() for s in external_repository.get_external_schedules() } existing_schedule_origin_ids = set( [ job.job_origin_id for job in instance.all_stored_job_state( external_repository.get_external_origin_id() ) if job.job_type == JobType.SCHEDULE ] ) schedule_origin_ids_to_delete = existing_schedule_origin_ids - external_schedule_origin_ids schedule_reconciliation_errors = [] for schedule_state, external_schedule in schedules_to_restart: # Restart is only needed if the schedule was previously running if schedule_state.status == JobStatus.RUNNING: try: self.refresh_schedule(instance, external_schedule) except DagsterSchedulerError as e: schedule_reconciliation_errors.append(e) if schedule_state.status == JobStatus.STOPPED: try: self.stop_schedule(instance, external_schedule.get_external_origin_id()) except DagsterSchedulerError as e: schedule_reconciliation_errors.append(e) for schedule_origin_id in schedule_origin_ids_to_delete: try: instance.stop_schedule_and_delete_from_storage(schedule_origin_id) except DagsterSchedulerError as e: schedule_reconciliation_errors.append(e) if len(schedule_reconciliation_errors): raise DagsterScheduleReconciliationError( "One or more errors were encountered by the Scheduler while starting or stopping schedules. " "Individual error messages follow:", errors=schedule_reconciliation_errors, ) def start_schedule_and_update_storage_state(self, instance, external_schedule): """ Updates the status of the given schedule to `JobStatus.RUNNING` in schedule storage, then calls `start_schedule`. This should not be overridden by subclasses. Args: instance (DagsterInstance): The current instance. external_schedule (ExternalSchedule): The schedule to start """ check.inst_param(instance, "instance", DagsterInstance) check.inst_param(external_schedule, "external_schedule", ExternalSchedule) schedule_state = instance.get_job_state(external_schedule.get_external_origin_id()) if not schedule_state: schedule_state = self._create_new_schedule_state(instance, external_schedule) if schedule_state.status == JobStatus.RUNNING: raise DagsterSchedulerError( "You have attempted to start schedule {name}, but it is already running".format( name=external_schedule.name ) ) self.start_schedule(instance, external_schedule) started_schedule = schedule_state.with_status(JobStatus.RUNNING).with_data( ScheduleJobData( external_schedule.cron_schedule, get_current_datetime_in_utc().timestamp(), scheduler=self.__class__.__name__, ) ) instance.update_job_state(started_schedule) return started_schedule def stop_schedule_and_update_storage_state(self, instance, schedule_origin_id): """ Updates the status of the given schedule to `JobStatus.STOPPED` in schedule storage, then calls `stop_schedule`. This should not be overridden by subclasses. Args: schedule_origin_id (string): The id of the schedule target to stop running. """ check.str_param(schedule_origin_id, "schedule_origin_id") schedule_state = self._get_schedule_state(instance, schedule_origin_id) self.stop_schedule(instance, schedule_origin_id) stopped_schedule = schedule_state.with_status(JobStatus.STOPPED).with_data( ScheduleJobData( cron_schedule=schedule_state.job_specific_data.cron_schedule, scheduler=self.__class__.__name__, ) ) instance.update_job_state(stopped_schedule) return stopped_schedule def stop_schedule_and_delete_from_storage(self, instance, schedule_origin_id): """ Deletes a schedule from schedule storage, then calls `stop_schedule`. This should not be overridden by subclasses. Args: instance (DagsterInstance): The current instance. schedule_origin_id (string): The id of the schedule target to start running. """ check.inst_param(instance, "instance", DagsterInstance) check.str_param(schedule_origin_id, "schedule_origin_id") schedule = self._get_schedule_state(instance, schedule_origin_id) self.stop_schedule(instance, schedule_origin_id) instance.delete_job_state(schedule_origin_id) return schedule def refresh_schedule(self, instance, external_schedule): """Refresh a running schedule. This is called when user reconciles the schedule state. By default, this method will call stop_schedule and then start_schedule but can be overriden. For example, in the K8s Scheduler we patch the existing cronjob (without stopping it) to minimize downtime. Args: instance (DagsterInstance): The current instance. external_schedule (ExternalSchedule): The schedule to start running. """ check.inst_param(instance, "instance", DagsterInstance) check.inst_param(external_schedule, "external_schedule", ExternalSchedule) self.stop_schedule(instance, external_schedule.get_external_origin_id()) self.start_schedule(instance, external_schedule) @abc.abstractmethod def debug_info(self): """Returns debug information about the scheduler""" @abc.abstractmethod def start_schedule(self, instance, external_schedule): """Start running a schedule. This method is called by `start_schedule_and_update_storage_state`, which first updates the status of the schedule in schedule storage to `JobStatus.RUNNING`, then calls this method. For example, in the cron scheduler, this method writes a cron job to the cron tab for the given schedule. Args: instance (DagsterInstance): The current instance. external_schedule (ExternalSchedule): The schedule to start running. """ @abc.abstractmethod def stop_schedule(self, instance, schedule_origin_id): """Stop running a schedule. This method is called by 1) `stop_schedule_and_update_storage_state`, which first updates the status of the schedule in schedule storage to `JobStatus.STOPPED`, then calls this method. 2) `stop_schedule_and_delete_from_storage`, which deletes the schedule from schedule storage then calls this method. For example, in the cron scheduler, this method deletes the cron job for a given scheduler from the cron tab. Args: instance (DagsterInstance): The current instance. schedule_origin_id (string): The id of the schedule target to stop running. """ @abc.abstractmethod def running_schedule_count(self, instance, schedule_origin_id): """Returns the number of jobs currently running for the given schedule. This method is used for detecting when the scheduler is out of sync with schedule storage. For example, when: - There are duplicate jobs runnning for a single schedule - There are no jobs runnning for a schedule that is set to be running - There are still jobs running for a schedule that is set to be stopped When the scheduler and schedule storage are in sync, this method should return: - 1 when a schedule is set to be running - 0 when a schedule is set to be stopped Args: instance (DagsterInstance): The current instance. schedule_origin_id (string): The id of the schedule target to return the number of jobs for """ @abc.abstractmethod def get_logs_path(self, instance, schedule_origin_id): """Get path to store logs for schedule Args: schedule_origin_id (string): The id of the schedule target to retrieve the log path for """
DEFAULT_MAX_CATCHUP_RUNS = 5
[docs]class DagsterDaemonScheduler(Scheduler, ConfigurableClass): """Default scheduler implementation that submits runs from the `dagster-daemon` long-lived process. Periodically checks each running schedule for execution times that don't have runs yet and launches them. Args: max_catchup_runs (int): For partitioned schedules, controls the maximum number of past partitions for each schedule that will be considered when looking for missing runs (defaults to 5). Generally this parameter will only come into play if the scheduler falls behind or launches after experiencing downtime. This parameter will not be checked for schedules without partition sets (for example, schedules created using the @schedule decorator) - only the most recent execution time will be considered for those schedules. Note that no matter what this value is, the scheduler will never launch a run from a time before the schedule was turned on (even if the start_date on the schedule is earlier) - if you want to launch runs for earlier partitions, launch a backfill. """ def __init__(self, max_catchup_runs=DEFAULT_MAX_CATCHUP_RUNS, inst_data=None): self.max_catchup_runs = check.opt_int_param(max_catchup_runs, "max_catchup_runs", 5) self._inst_data = inst_data @property def inst_data(self): return self._inst_data @classmethod def config_type(cls): return {"max_catchup_runs": Field(IntSource, is_required=False)} @staticmethod def from_config_value(inst_data, config_value): return DagsterDaemonScheduler( inst_data=inst_data, max_catchup_runs=config_value.get("max_catchup_runs") ) def debug_info(self): return "" def start_schedule(self, instance, external_schedule): # Automatically picked up by the `dagster scheduler run` command pass def stop_schedule(self, instance, schedule_origin_id): # Automatically picked up by the `dagster scheduler run` command pass def running_schedule_count(self, instance, schedule_origin_id): state = instance.get_job_state(schedule_origin_id) if not state: return 0 return 1 if state.status == JobStatus.RUNNING else 0 def wipe(self, instance): pass def _get_or_create_logs_directory(self, instance, schedule_origin_id): check.inst_param(instance, "instance", DagsterInstance) check.str_param(schedule_origin_id, "schedule_origin_id") logs_directory = os.path.join(instance.schedules_directory(), "logs", schedule_origin_id) if not os.path.isdir(logs_directory): mkdir_p(logs_directory) return logs_directory def get_logs_path(self, instance, schedule_origin_id): check.inst_param(instance, "instance", DagsterInstance) check.str_param(schedule_origin_id, "schedule_origin_id") logs_directory = self._get_or_create_logs_directory(instance, schedule_origin_id) return os.path.join(logs_directory, "scheduler.log")