import warnings
from abc import ABC, abstractmethod, abstractproperty
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union
from dagster.core.definitions.events import AssetKey
from dagster.core.events.log import EventRecord
from dagster.core.execution.stats import (
RunStepKeyStatsSnapshot,
build_run_stats_from_events,
build_run_step_stats_from_events,
)
from dagster.core.instance import MayHaveInstanceWeakref
from dagster.core.storage.pipeline_run import PipelineRunStatsSnapshot
[docs]class EventLogStorage(ABC, MayHaveInstanceWeakref):
"""Abstract base class for storing structured event logs from pipeline runs.
Note that event log storages using SQL databases as backing stores should implement
:py:class:`~dagster.core.storage.event_log.SqlEventLogStorage`.
Users should not directly instantiate concrete subclasses of this class; they are instantiated
by internal machinery when ``dagit`` and ``dagster-graphql`` load, based on the values in the
``dagster.yaml`` file in ``$DAGSTER_HOME``. Configuration of concrete subclasses of this class
should be done by setting values in that file.
"""
@abstractmethod
def get_logs_for_run(self, run_id: str, cursor: Optional[int] = -1) -> Iterable[EventRecord]:
"""Get all of the logs corresponding to a run.
Args:
run_id (str): The id of the run for which to fetch logs.
cursor (Optional[int]): Zero-indexed logs will be returned starting from cursor + 1,
i.e., if cursor is -1, all logs will be returned. (default: -1)
"""
def get_stats_for_run(self, run_id: str) -> PipelineRunStatsSnapshot:
"""Get a summary of events that have ocurred in a run."""
return build_run_stats_from_events(run_id, self.get_logs_for_run(run_id))
def get_step_stats_for_run(self, run_id: str, step_keys=None) -> List[RunStepKeyStatsSnapshot]:
"""Get per-step stats for a pipeline run."""
logs = self.get_logs_for_run(run_id)
if step_keys:
logs = [
event
for event in logs
if event.is_dagster_event and event.get_dagster_event().step_key in step_keys
]
return build_run_step_stats_from_events(run_id, logs)
@abstractmethod
def store_event(self, event: EventRecord):
"""Store an event corresponding to a pipeline run.
Args:
event (EventRecord): The event to store.
"""
@abstractmethod
def delete_events(self, run_id: str):
"""Remove events for a given run id"""
@abstractmethod
def upgrade(self):
"""This method should perform any schema migrations necessary to bring an
out-of-date instance of the storage up to date.
"""
@abstractmethod
def reindex(self, print_fn: Callable = lambda _: None, force: bool = False):
"""Call this method to run any data migrations, reindexing to build summary tables."""
@abstractmethod
def wipe(self):
"""Clear the log storage."""
@abstractmethod
def watch(self, run_id: str, start_cursor: int, callback: Callable):
"""Call this method to start watching."""
@abstractmethod
def end_watch(self, run_id: str, handler: Callable):
"""Call this method to stop watching."""
@abstractproperty
def is_persistent(self) -> bool:
"""bool: Whether the storage is persistent."""
def dispose(self):
"""Explicit lifecycle management."""
def optimize_for_dagit(self, statement_timeout: int):
"""Allows for optimizing database connection / use in the context of a long lived dagit process"""
@abstractmethod
def has_asset_key(self, asset_key: AssetKey) -> bool:
pass
@abstractmethod
def all_asset_keys(self) -> Iterable[AssetKey]:
pass
@abstractmethod
def all_asset_tags(self) -> Dict[AssetKey, Dict[str, str]]:
pass
@abstractmethod
def get_asset_tags(self, asset_key: AssetKey) -> Dict[str, str]:
pass
@abstractmethod
def get_asset_events(
self,
asset_key: AssetKey,
partitions: List[str] = None,
before_cursor: int = None,
after_cursor: int = None,
limit: int = None,
ascending: bool = False,
include_cursor: bool = False,
cursor: int = None, # deprecated
) -> Union[Iterable[EventRecord], Iterable[Tuple[int, EventRecord]]]:
pass
@abstractmethod
def get_asset_run_ids(self, asset_key: AssetKey) -> Iterable[str]:
pass
@abstractmethod
def wipe_asset(self, asset_key: AssetKey):
"""Remove asset index history from event log for given asset_key"""
def extract_asset_events_cursor(cursor, before_cursor, after_cursor, ascending):
if cursor:
warnings.warn(
"Parameter `cursor` is a deprecated for `get_asset_events`. Use `before_cursor` or `after_cursor` instead"
)
if ascending and after_cursor is None:
after_cursor = cursor
if not ascending and before_cursor is None:
before_cursor = cursor
if after_cursor is not None:
try:
after_cursor = int(after_cursor)
except ValueError:
after_cursor = None
if before_cursor is not None:
try:
before_cursor = int(before_cursor)
except ValueError:
before_cursor = None
return before_cursor, after_cursor