Source code for dagster_postgres.schedule_storage.schedule_storage
import sqlalchemy as db
from dagster import check
from dagster.core.storage.schedules import ScheduleStorageSqlMetadata, SqlScheduleStorage
from dagster.core.storage.sql import create_engine, run_alembic_upgrade, stamp_alembic_rev
from dagster.serdes import ConfigurableClass, ConfigurableClassData
from ..utils import (
create_pg_connection,
pg_alembic_config,
pg_config,
pg_statement_timeout,
pg_url_from_config,
retry_pg_connection_fn,
retry_pg_creation_fn,
)
[docs]class PostgresScheduleStorage(SqlScheduleStorage, ConfigurableClass):
"""Postgres-backed run storage.
Users should not directly instantiate this class; it is instantiated by internal machinery when
``dagit`` and ``dagster-graphql`` load, based on the values in the ``dagster.yaml`` file in
``$DAGSTER_HOME``. Configuration of this class should be done by setting values in that file.
To use Postgres for schedule storage, you can add a block such as the following to your
``dagster.yaml``:
.. literalinclude:: ../../../../../../examples/docs_snippets/docs_snippets/deploying/dagster-pg.yaml
:caption: dagster.yaml
:lines: 23-32
:language: YAML
Note that the fields in this config are :py:class:`~dagster.StringSource` and
:py:class:`~dagster.IntSource` and can be configured from environment variables.
"""
def __init__(self, postgres_url, should_autocreate_tables=True, inst_data=None):
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)
self.postgres_url = postgres_url
self.should_autocreate_tables = check.bool_param(
should_autocreate_tables, "should_autocreate_tables"
)
# Default to not holding any connections open to prevent accumulating connections per DagsterInstance
self._engine = create_engine(
self.postgres_url, isolation_level="AUTOCOMMIT", poolclass=db.pool.NullPool
)
table_names = retry_pg_connection_fn(lambda: db.inspect(self._engine).get_table_names())
missing_main_table = "schedules" not in table_names and "jobs" not in table_names
if self.should_autocreate_tables and missing_main_table:
with self.connect() as conn:
alembic_config = pg_alembic_config(__file__)
retry_pg_creation_fn(lambda: ScheduleStorageSqlMetadata.create_all(conn))
# This revision may be shared by any other dagster storage classes using the same DB
stamp_alembic_rev(alembic_config, conn)
super().__init__()
def optimize_for_dagit(self, statement_timeout):
# When running in dagit, hold an open connection and set statement_timeout
self._engine = create_engine(
self.postgres_url,
isolation_level="AUTOCOMMIT",
pool_size=1,
connect_args={"options": pg_statement_timeout(statement_timeout)},
)
@property
def inst_data(self):
return self._inst_data
@classmethod
def config_type(cls):
return pg_config()
@staticmethod
def from_config_value(inst_data, config_value):
return PostgresScheduleStorage(
inst_data=inst_data,
postgres_url=pg_url_from_config(config_value),
should_autocreate_tables=config_value.get("should_autocreate_tables", True),
)
@staticmethod
def create_clean_storage(postgres_url, should_autocreate_tables=True):
engine = create_engine(
postgres_url, isolation_level="AUTOCOMMIT", poolclass=db.pool.NullPool
)
try:
ScheduleStorageSqlMetadata.drop_all(engine)
finally:
engine.dispose()
return PostgresScheduleStorage(postgres_url, should_autocreate_tables)
def connect(self, run_id=None): # pylint: disable=arguments-differ, unused-argument
return create_pg_connection(self._engine, __file__, "schedule")
def upgrade(self):
alembic_config = pg_alembic_config(__file__)
with self.connect() as conn:
run_alembic_upgrade(alembic_config, conn)