import logging
import zlib
from abc import abstractmethod
from collections import defaultdict
from datetime import datetime
from enum import Enum
import pendulum
import sqlalchemy as db
from dagster import check
from dagster.core.errors import (
DagsterInvariantViolationError,
DagsterRunAlreadyExists,
DagsterSnapshotDoesNotExist,
)
from dagster.core.events import DagsterEvent, DagsterEventType
from dagster.core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster.core.snap import (
ExecutionPlanSnapshot,
PipelineSnapshot,
create_execution_plan_snapshot_id,
create_pipeline_snapshot_id,
)
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG, ROOT_RUN_ID_TAG
from dagster.serdes import deserialize_json_to_dagster_namedtuple, serialize_dagster_namedtuple
from dagster.seven import JSONDecodeError
from dagster.utils import merge_dicts, utc_datetime_from_timestamp
from ..pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter
from .base import RunStorage
from .migration import RUN_DATA_MIGRATIONS, RUN_PARTITIONS
from .schema import (
BulkActionsTable,
DaemonHeartbeatsTable,
RunTagsTable,
RunsTable,
SecondaryIndexMigrationTable,
SnapshotsTable,
)
class SnapshotType(Enum):
PIPELINE = "PIPELINE"
EXECUTION_PLAN = "EXECUTION_PLAN"
[docs]class SqlRunStorage(RunStorage): # pylint: disable=no-init
"""Base class for SQL based run storages"""
@abstractmethod
def connect(self):
"""Context manager yielding a sqlalchemy.engine.Connection."""
@abstractmethod
def upgrade(self):
"""This method should perform any schema or data migrations necessary to bring an
out-of-date instance of the storage up to date.
"""
def fetchall(self, query):
with self.connect() as conn:
result_proxy = conn.execute(query)
res = result_proxy.fetchall()
result_proxy.close()
return res
def fetchone(self, query):
with self.connect() as conn:
result_proxy = conn.execute(query)
row = result_proxy.fetchone()
result_proxy.close()
return row
def add_run(self, pipeline_run):
check.inst_param(pipeline_run, "pipeline_run", PipelineRun)
if pipeline_run.pipeline_snapshot_id and not self.has_pipeline_snapshot(
pipeline_run.pipeline_snapshot_id
):
raise DagsterSnapshotDoesNotExist(
"Snapshot {ss_id} does not exist in run storage".format(
ss_id=pipeline_run.pipeline_snapshot_id
)
)
has_tags = pipeline_run.tags and len(pipeline_run.tags) > 0
partition = pipeline_run.tags.get(PARTITION_NAME_TAG) if has_tags else None
partition_set = pipeline_run.tags.get(PARTITION_SET_TAG) if has_tags else None
with self.connect() as conn:
try:
runs_insert = RunsTable.insert().values( # pylint: disable=no-value-for-parameter
run_id=pipeline_run.run_id,
pipeline_name=pipeline_run.pipeline_name,
status=pipeline_run.status.value,
run_body=serialize_dagster_namedtuple(pipeline_run),
snapshot_id=pipeline_run.pipeline_snapshot_id,
partition=partition,
partition_set=partition_set,
)
conn.execute(runs_insert)
except db.exc.IntegrityError as exc:
raise DagsterRunAlreadyExists from exc
if pipeline_run.tags and len(pipeline_run.tags) > 0:
conn.execute(
RunTagsTable.insert(), # pylint: disable=no-value-for-parameter
[
dict(run_id=pipeline_run.run_id, key=k, value=v)
for k, v in pipeline_run.tags.items()
],
)
return pipeline_run
def handle_run_event(self, run_id, event):
check.str_param(run_id, "run_id")
check.inst_param(event, "event", DagsterEvent)
lookup = {
DagsterEventType.PIPELINE_START: PipelineRunStatus.STARTED,
DagsterEventType.PIPELINE_SUCCESS: PipelineRunStatus.SUCCESS,
DagsterEventType.PIPELINE_FAILURE: PipelineRunStatus.FAILURE,
DagsterEventType.PIPELINE_INIT_FAILURE: PipelineRunStatus.FAILURE,
DagsterEventType.PIPELINE_ENQUEUED: PipelineRunStatus.QUEUED,
DagsterEventType.PIPELINE_STARTING: PipelineRunStatus.STARTING,
DagsterEventType.PIPELINE_CANCELING: PipelineRunStatus.CANCELING,
DagsterEventType.PIPELINE_CANCELED: PipelineRunStatus.CANCELED,
}
if event.event_type not in lookup:
return
run = self.get_run_by_id(run_id)
if not run:
# TODO log?
return
new_pipeline_status = lookup[event.event_type]
with self.connect() as conn:
conn.execute(
RunsTable.update() # pylint: disable=no-value-for-parameter
.where(RunsTable.c.run_id == run_id)
.values(
status=new_pipeline_status.value,
run_body=serialize_dagster_namedtuple(run.with_status(new_pipeline_status)),
update_timestamp=pendulum.now("UTC"),
)
)
def _row_to_run(self, row):
return deserialize_json_to_dagster_namedtuple(row[0])
def _rows_to_runs(self, rows):
return list(map(self._row_to_run, rows))
def _add_cursor_limit_to_query(self, query, cursor, limit):
""" Helper function to deal with cursor/limit pagination args """
if cursor:
cursor_query = db.select([RunsTable.c.id]).where(RunsTable.c.run_id == cursor)
query = query.where(RunsTable.c.id < cursor_query)
if limit:
query = query.limit(limit)
query = query.order_by(RunsTable.c.id.desc())
return query
def _add_filters_to_query(self, query, filters):
check.inst_param(filters, "filters", PipelineRunsFilter)
if filters.run_ids:
query = query.where(RunsTable.c.run_id.in_(filters.run_ids))
if filters.pipeline_name:
query = query.where(RunsTable.c.pipeline_name == filters.pipeline_name)
if filters.statuses:
query = query.where(
RunsTable.c.status.in_([status.value for status in filters.statuses])
)
if filters.tags:
query = query.where(
db.or_(
*(
db.and_(RunTagsTable.c.key == key, RunTagsTable.c.value == value)
for key, value in filters.tags.items()
)
)
).group_by(RunsTable.c.run_body, RunsTable.c.id)
if len(filters.tags) > 0:
query = query.having(db.func.count(RunsTable.c.run_id) == len(filters.tags))
if filters.snapshot_id:
query = query.where(RunsTable.c.snapshot_id == filters.snapshot_id)
return query
def _runs_query(self, filters=None, cursor=None, limit=None, columns=None):
filters = check.opt_inst_param(
filters, "filters", PipelineRunsFilter, default=PipelineRunsFilter()
)
check.opt_str_param(cursor, "cursor")
check.opt_int_param(limit, "limit")
check.opt_list_param(columns, "columns")
if columns is None:
columns = ["run_body"]
base_query_columns = [getattr(RunsTable.c, column) for column in columns]
# If we have a tags filter, then we need to select from a joined table
if filters.tags:
base_query = db.select(base_query_columns).select_from(
RunsTable.join(RunTagsTable, RunsTable.c.run_id == RunTagsTable.c.run_id)
)
else:
base_query = db.select(base_query_columns).select_from(RunsTable)
query = self._add_filters_to_query(base_query, filters)
query = self._add_cursor_limit_to_query(query, cursor, limit)
return query
def get_runs(self, filters=None, cursor=None, limit=None):
query = self._runs_query(filters, cursor, limit)
rows = self.fetchall(query)
return self._rows_to_runs(rows)
def get_runs_count(self, filters=None):
subquery = self._runs_query(filters=filters).alias("subquery")
# We use an alias here because Postgres requires subqueries to be
# aliased.
subquery = subquery.alias("subquery")
query = db.select([db.func.count()]).select_from(subquery)
rows = self.fetchall(query)
count = rows[0][0]
return count
def get_run_by_id(self, run_id):
"""Get a run by its id.
Args:
run_id (str): The id of the run
Returns:
Optional[PipelineRun]
"""
check.str_param(run_id, "run_id")
query = db.select([RunsTable.c.run_body]).where(RunsTable.c.run_id == run_id)
rows = self.fetchall(query)
return deserialize_json_to_dagster_namedtuple(rows[0][0]) if len(rows) else None
def get_run_tags(self):
result = defaultdict(set)
query = db.select([RunTagsTable.c.key, RunTagsTable.c.value]).distinct(
RunTagsTable.c.key, RunTagsTable.c.value
)
rows = self.fetchall(query)
for r in rows:
result[r[0]].add(r[1])
return sorted(list([(k, v) for k, v in result.items()]), key=lambda x: x[0])
def add_run_tags(self, run_id, new_tags):
check.str_param(run_id, "run_id")
check.dict_param(new_tags, "new_tags", key_type=str, value_type=str)
run = self.get_run_by_id(run_id)
current_tags = run.tags if run.tags else {}
all_tags = merge_dicts(current_tags, new_tags)
partition = all_tags.get(PARTITION_NAME_TAG)
partition_set = all_tags.get(PARTITION_SET_TAG)
with self.connect() as conn:
conn.execute(
RunsTable.update() # pylint: disable=no-value-for-parameter
.where(RunsTable.c.run_id == run_id)
.values(
run_body=serialize_dagster_namedtuple(
run.with_tags(merge_dicts(current_tags, new_tags))
),
partition=partition,
partition_set=partition_set,
update_timestamp=pendulum.now("UTC"),
)
)
current_tags_set = set(current_tags.keys())
new_tags_set = set(new_tags.keys())
existing_tags = current_tags_set & new_tags_set
added_tags = new_tags_set.difference(existing_tags)
for tag in existing_tags:
conn.execute(
RunTagsTable.update() # pylint: disable=no-value-for-parameter
.where(db.and_(RunTagsTable.c.run_id == run_id, RunTagsTable.c.key == tag))
.values(value=new_tags[tag])
)
if added_tags:
conn.execute(
RunTagsTable.insert(), # pylint: disable=no-value-for-parameter
[dict(run_id=run_id, key=tag, value=new_tags[tag]) for tag in added_tags],
)
def get_run_group(self, run_id):
check.str_param(run_id, "run_id")
pipeline_run = self.get_run_by_id(run_id)
if not pipeline_run:
return None
# find root_run
root_run_id = pipeline_run.root_run_id if pipeline_run.root_run_id else pipeline_run.run_id
root_run = self.get_run_by_id(root_run_id)
# root_run_id to run_id 1:1 mapping
root_to_run = (
db.select(
[RunTagsTable.c.value.label("root_run_id"), RunTagsTable.c.run_id.label("run_id")]
)
.where(
db.and_(RunTagsTable.c.key == ROOT_RUN_ID_TAG, RunTagsTable.c.value == root_run_id)
)
.alias("root_to_run")
)
# get run group
run_group_query = (
db.select([RunsTable.c.run_body])
.select_from(
root_to_run.join(
RunsTable,
root_to_run.c.run_id == RunsTable.c.run_id,
isouter=True,
)
)
.alias("run_group")
)
with self.connect() as conn:
res = conn.execute(run_group_query)
run_group = self._rows_to_runs(res)
return (root_run_id, [root_run] + run_group)
def get_run_groups(self, filters=None, cursor=None, limit=None):
# The runs that would be returned by calling RunStorage.get_runs with the same arguments
runs = self._runs_query(
filters=filters, cursor=cursor, limit=limit, columns=["run_body", "run_id"]
).alias("runs")
# Gets us the run_id and associated root_run_id for every run in storage that is a
# descendant run of some root
#
# pseudosql:
# with all_descendant_runs as (
# select *
# from run_tags
# where key = @ROOT_RUN_ID_TAG
# )
all_descendant_runs = (
db.select([RunTagsTable])
.where(RunTagsTable.c.key == ROOT_RUN_ID_TAG)
.alias("all_descendant_runs")
)
# Augment the runs in our query, for those runs that are the descendant of some root run,
# with the root_run_id
#
# pseudosql:
#
# with runs_augmented as (
# select
# runs.run_id as run_id,
# all_descendant_runs.value as root_run_id
# from runs
# left outer join all_descendant_runs
# on all_descendant_runs.run_id = runs.run_id
# )
runs_augmented = (
db.select(
[
runs.c.run_id.label("run_id"),
all_descendant_runs.c.value.label("root_run_id"),
]
)
.select_from(
runs.join(
all_descendant_runs,
all_descendant_runs.c.run_id == RunsTable.c.run_id,
isouter=True,
)
)
.alias("runs_augmented")
)
# Get all the runs our query will return. This includes runs as well as their root runs.
#
# pseudosql:
#
# with runs_and_root_runs as (
# select runs.run_id as run_id
# from runs, runs_augmented
# where
# runs.run_id = runs_augmented.run_id or
# runs.run_id = runs_augmented.root_run_id
# )
runs_and_root_runs = (
db.select([RunsTable.c.run_id.label("run_id")])
.select_from(runs_augmented)
.where(
db.or_(
RunsTable.c.run_id == runs_augmented.c.run_id,
RunsTable.c.run_id == runs_augmented.c.root_run_id,
)
)
.distinct(RunsTable.c.run_id)
).alias("runs_and_root_runs")
# We count the descendants of all of the runs in our query that are roots so that
# we can accurately display when a root run has more descendants than are returned by this
# query and afford a drill-down. This might be an unnecessary complication, but the
# alternative isn't obvious -- we could go and fetch *all* the runs in any group that we're
# going to return in this query, and then append those.
#
# pseudosql:
#
# select runs.run_body, count(all_descendant_runs.id) as child_counts
# from runs
# join runs_and_root_runs on runs.run_id = runs_and_root_runs.run_id
# left outer join all_descendant_runs
# on all_descendant_runs.value = runs_and_root_runs.run_id
# group by runs.run_body
# order by child_counts desc
runs_and_root_runs_with_descendant_counts = (
db.select(
[
RunsTable.c.run_body,
db.func.count(all_descendant_runs.c.id).label("child_counts"),
]
)
.select_from(
RunsTable.join(
runs_and_root_runs, RunsTable.c.run_id == runs_and_root_runs.c.run_id
).join(
all_descendant_runs,
all_descendant_runs.c.value == runs_and_root_runs.c.run_id,
isouter=True,
)
)
.group_by(RunsTable.c.run_body)
.order_by(db.desc(db.column("child_counts")))
)
with self.connect() as conn:
res = conn.execute(runs_and_root_runs_with_descendant_counts).fetchall()
# Postprocess: descendant runs get aggregated with their roots
run_groups = defaultdict(lambda: {"runs": [], "count": 0})
for (run_body, count) in res:
row = (run_body,)
pipeline_run = self._row_to_run(row)
root_run_id = pipeline_run.get_root_run_id()
if root_run_id is not None:
run_groups[root_run_id]["runs"].append(pipeline_run)
else:
run_groups[pipeline_run.run_id]["runs"].append(pipeline_run)
run_groups[pipeline_run.run_id]["count"] = count + 1
return run_groups
def has_run(self, run_id):
check.str_param(run_id, "run_id")
return bool(self.get_run_by_id(run_id))
def delete_run(self, run_id):
check.str_param(run_id, "run_id")
query = db.delete(RunsTable).where(RunsTable.c.run_id == run_id)
with self.connect() as conn:
conn.execute(query)
def has_pipeline_snapshot(self, pipeline_snapshot_id):
check.str_param(pipeline_snapshot_id, "pipeline_snapshot_id")
return self._has_snapshot_id(pipeline_snapshot_id)
def add_pipeline_snapshot(self, pipeline_snapshot):
check.inst_param(pipeline_snapshot, "pipeline_snapshot", PipelineSnapshot)
return self._add_snapshot(
snapshot_id=create_pipeline_snapshot_id(pipeline_snapshot),
snapshot_obj=pipeline_snapshot,
snapshot_type=SnapshotType.PIPELINE,
)
def get_pipeline_snapshot(self, pipeline_snapshot_id):
check.str_param(pipeline_snapshot_id, "pipeline_snapshot_id")
return self._get_snapshot(pipeline_snapshot_id)
def has_execution_plan_snapshot(self, execution_plan_snapshot_id):
check.str_param(execution_plan_snapshot_id, "execution_plan_snapshot_id")
return bool(self.get_execution_plan_snapshot(execution_plan_snapshot_id))
def add_execution_plan_snapshot(self, execution_plan_snapshot):
check.inst_param(execution_plan_snapshot, "execution_plan_snapshot", ExecutionPlanSnapshot)
execution_plan_snapshot_id = create_execution_plan_snapshot_id(execution_plan_snapshot)
return self._add_snapshot(
snapshot_id=execution_plan_snapshot_id,
snapshot_obj=execution_plan_snapshot,
snapshot_type=SnapshotType.EXECUTION_PLAN,
)
def get_execution_plan_snapshot(self, execution_plan_snapshot_id):
check.str_param(execution_plan_snapshot_id, "execution_plan_snapshot_id")
return self._get_snapshot(execution_plan_snapshot_id)
def _add_snapshot(self, snapshot_id, snapshot_obj, snapshot_type):
check.str_param(snapshot_id, "snapshot_id")
check.not_none_param(snapshot_obj, "snapshot_obj")
check.inst_param(snapshot_type, "snapshot_type", SnapshotType)
with self.connect() as conn:
snapshot_insert = (
SnapshotsTable.insert().values( # pylint: disable=no-value-for-parameter
snapshot_id=snapshot_id,
snapshot_body=zlib.compress(
serialize_dagster_namedtuple(snapshot_obj).encode("utf-8")
),
snapshot_type=snapshot_type.value,
)
)
conn.execute(snapshot_insert)
return snapshot_id
def _has_snapshot_id(self, snapshot_id):
query = db.select([SnapshotsTable.c.snapshot_id]).where(
SnapshotsTable.c.snapshot_id == snapshot_id
)
row = self.fetchone(query)
return bool(row)
def _get_snapshot(self, snapshot_id):
query = db.select([SnapshotsTable.c.snapshot_body]).where(
SnapshotsTable.c.snapshot_id == snapshot_id
)
row = self.fetchone(query)
return defensively_unpack_pipeline_snapshot_query(logging, row) if row else None
def _get_partition_runs(self, partition_set_name, partition_name):
# utility method to help test reads off of the partition column
if not self.has_built_index(RUN_PARTITIONS):
# query by tags
return self.get_runs(
filters=PipelineRunsFilter(
tags={
PARTITION_SET_TAG: partition_set_name,
PARTITION_NAME_TAG: partition_name,
}
)
)
else:
query = (
self._runs_query()
.where(RunsTable.c.partition == partition_name)
.where(RunsTable.c.partition_set == partition_set_name)
)
rows = self.fetchall(query)
return self._rows_to_runs(rows)
# Tracking data migrations over secondary indexes
def build_missing_indexes(self, print_fn=None, force_rebuild_all=False):
for migration_name, migration_fn in RUN_DATA_MIGRATIONS.items():
if self.has_built_index(migration_name):
if not force_rebuild_all:
continue
if print_fn:
print_fn(f"Starting data migration: {migration_name}")
migration_fn()(self, print_fn)
self.mark_index_built(migration_name)
if print_fn:
print_fn(f"Finished data migration: {migration_name}")
def has_built_index(self, migration_name):
query = (
db.select([1])
.where(SecondaryIndexMigrationTable.c.name == migration_name)
.where(SecondaryIndexMigrationTable.c.migration_completed != None)
.limit(1)
)
with self.connect() as conn:
results = conn.execute(query).fetchall()
return len(results) > 0
def mark_index_built(self, migration_name):
query = (
SecondaryIndexMigrationTable.insert().values( # pylint: disable=no-value-for-parameter
name=migration_name,
migration_completed=datetime.now(),
)
)
with self.connect() as conn:
try:
conn.execute(query)
except db.exc.IntegrityError:
conn.execute(
SecondaryIndexMigrationTable.update() # pylint: disable=no-value-for-parameter
.where(SecondaryIndexMigrationTable.c.name == migration_name)
.values(migration_completed=datetime.now())
)
# Daemon heartbeats
def add_daemon_heartbeat(self, daemon_heartbeat):
with self.connect() as conn:
# insert, or update if already present
try:
conn.execute(
DaemonHeartbeatsTable.insert().values( # pylint: disable=no-value-for-parameter
timestamp=utc_datetime_from_timestamp(daemon_heartbeat.timestamp),
daemon_type=daemon_heartbeat.daemon_type,
daemon_id=daemon_heartbeat.daemon_id,
body=serialize_dagster_namedtuple(daemon_heartbeat),
)
)
except db.exc.IntegrityError:
conn.execute(
DaemonHeartbeatsTable.update() # pylint: disable=no-value-for-parameter
.where(DaemonHeartbeatsTable.c.daemon_type == daemon_heartbeat.daemon_type)
.values( # pylint: disable=no-value-for-parameter
timestamp=utc_datetime_from_timestamp(daemon_heartbeat.timestamp),
daemon_id=daemon_heartbeat.daemon_id,
body=serialize_dagster_namedtuple(daemon_heartbeat),
)
)
def get_daemon_heartbeats(self):
with self.connect() as conn:
rows = conn.execute(db.select(DaemonHeartbeatsTable.columns))
heartbeats = []
for row in rows:
heartbeats.append(deserialize_json_to_dagster_namedtuple(row.body))
return {heartbeat.daemon_type: heartbeat for heartbeat in heartbeats}
def wipe(self):
"""Clears the run storage."""
with self.connect() as conn:
# https://stackoverflow.com/a/54386260/324449
conn.execute(RunsTable.delete()) # pylint: disable=no-value-for-parameter
conn.execute(RunTagsTable.delete()) # pylint: disable=no-value-for-parameter
conn.execute(SnapshotsTable.delete()) # pylint: disable=no-value-for-parameter
conn.execute(DaemonHeartbeatsTable.delete()) # pylint: disable=no-value-for-parameter
def wipe_daemon_heartbeats(self):
with self.connect() as conn:
# https://stackoverflow.com/a/54386260/324449
conn.execute(DaemonHeartbeatsTable.delete()) # pylint: disable=no-value-for-parameter
def get_backfills(self, status=None, cursor=None, limit=None):
check.opt_inst_param(status, "status", BulkActionStatus)
query = db.select([BulkActionsTable.c.body])
if status:
query = query.where(BulkActionsTable.c.status == status.value)
if cursor:
cursor_query = db.select([BulkActionsTable.c.id]).where(
BulkActionsTable.c.key == cursor
)
query = query.where(BulkActionsTable.c.id < cursor_query)
if limit:
query = query.limit(limit)
query = query.order_by(BulkActionsTable.c.id.desc())
rows = self.fetchall(query)
return [deserialize_json_to_dagster_namedtuple(row[0]) for row in rows]
def get_backfill(self, backfill_id):
check.str_param(backfill_id, "backfill_id")
query = db.select([BulkActionsTable.c.body]).where(BulkActionsTable.c.key == backfill_id)
row = self.fetchone(query)
return deserialize_json_to_dagster_namedtuple(row[0]) if row else None
def add_backfill(self, partition_backfill):
check.inst_param(partition_backfill, "partition_backfill", PartitionBackfill)
with self.connect() as conn:
conn.execute(
BulkActionsTable.insert().values( # pylint: disable=no-value-for-parameter
key=partition_backfill.backfill_id,
status=partition_backfill.status.value,
timestamp=utc_datetime_from_timestamp(partition_backfill.backfill_timestamp),
body=serialize_dagster_namedtuple(partition_backfill),
)
)
def update_backfill(self, partition_backfill):
check.inst_param(partition_backfill, "partition_backfill", PartitionBackfill)
backfill_id = partition_backfill.backfill_id
if not self.get_backfill(backfill_id):
raise DagsterInvariantViolationError(
f"Backfill {backfill_id} is not present in storage"
)
with self.connect() as conn:
conn.execute(
BulkActionsTable.update() # pylint: disable=no-value-for-parameter
.where(BulkActionsTable.c.key == backfill_id)
.values(
status=partition_backfill.status.value,
body=serialize_dagster_namedtuple(partition_backfill),
)
)
GET_PIPELINE_SNAPSHOT_QUERY_ID = "get-pipeline-snapshot"
def defensively_unpack_pipeline_snapshot_query(logger, row):
# no checking here because sqlalchemy returns a special
# row proxy and don't want to instance check on an internal
# implementation detail
def _warn(msg):
logger.warning("get-pipeline-snapshot: {msg}".format(msg=msg))
if not isinstance(row[0], bytes):
_warn("First entry in row is not a binary type.")
return None
try:
uncompressed_bytes = zlib.decompress(row[0])
except zlib.error:
_warn("Could not decompress bytes stored in snapshot table.")
return None
try:
decoded_str = uncompressed_bytes.decode("utf-8")
except UnicodeDecodeError:
_warn("Could not unicode decode decompressed bytes stored in snapshot table.")
return None
try:
return deserialize_json_to_dagster_namedtuple(decoded_str)
except JSONDecodeError:
_warn("Could not parse json in snapshot table.")
return None