DagsterDocs

Source code for dagster.core.storage.system_storage

from dagster import check
from dagster.config import Field
from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
from dagster.core.definitions.intermediate_storage import (
    intermediate_storage as intermediate_storage_fn,
)
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.storage.io_manager import io_manager
from dagster.core.storage.type_storage import (
    TypeStoragePluginRegistry,
    construct_type_storage_plugin_registry,
)
from dagster.core.system_config.objects import EnvironmentConfig

from .init import InitIntermediateStorageContext
from .intermediate_storage import IntermediateStorageAdapter, ObjectStoreIntermediateStorage
from .object_store import FilesystemObjectStore, InMemoryObjectStore, ObjectStore


def build_intermediate_storage_from_object_store(
    object_store,
    init_context,
    root_for_run_id=lambda _: "",
):
    """constructs an IntermediateStorage object from an object store and an init_context
    Call from within an intermediate_storage_definition
    Args:
        object_store(ObjectStore): The object store on which to base the intermediate store.
        init_context(InitIntermediateStorageContext):  the context from which to create the intermediates manager
        root_for_run_id_creator(Callable[[str], str]):
                    a function that converts from your run ID to the root of your object storage paths
    """
    object_store = check.inst_param(object_store, "object_store", ObjectStore)
    root_for_run_id = check.callable_param(root_for_run_id, "root_for_run_id")
    init_context = check.inst_param(init_context, "init_context", InitIntermediateStorageContext)

    return ObjectStoreIntermediateStorage(
        object_store=object_store,
        run_id=init_context.pipeline_run.run_id,
        root_for_run_id=root_for_run_id,
        type_storage_plugin_registry=init_context.type_storage_plugin_registry
        if init_context.type_storage_plugin_registry
        else TypeStoragePluginRegistry(types_to_register=[]),
    )


[docs]@intermediate_storage_fn(name="in_memory", is_persistent=False, required_resource_keys=set()) def mem_intermediate_storage(init_context): """The default in-memory intermediate storage. In-memory intermediate storage is the default on any pipeline run that does not configure any custom intermediate storage. Keep in mind when using this storage that intermediates will not be persisted after the pipeline run ends. Use a persistent intermediate storage like :py:func:`fs_intermediate_storage` to persist intermediates and take advantage of advanced features like pipeline re-execution. """ object_store = InMemoryObjectStore() return build_intermediate_storage_from_object_store( object_store=object_store, init_context=init_context )
[docs]@intermediate_storage_fn( name="filesystem", is_persistent=True, config_schema={"base_dir": Field(str, is_required=False)}, required_resource_keys=set(), ) def fs_intermediate_storage(init_context): """The default filesystem intermediate storage. Filesystem system storage is available by default on any :py:class:`ModeDefinition` that does not provide custom system storages. To select it, include a fragment such as the following in config: .. code-block:: yaml intermediate_storage: filesystem: base_dir: '/path/to/dir/' You may omit the ``base_dir`` config value, in which case the filesystem storage will use the :py:class:`DagsterInstance`-provided default. """ object_store = FilesystemObjectStore() override_dir = init_context.intermediate_storage_config.get("base_dir") if override_dir: root_for_run_id = lambda _: override_dir else: root_for_run_id = init_context.instance.intermediates_directory return build_intermediate_storage_from_object_store( object_store, init_context, root_for_run_id=root_for_run_id )
default_intermediate_storage_defs = [mem_intermediate_storage, fs_intermediate_storage] """The default 'in_memory' and 'filesystem' intermediate storage definitions. Framework authors seeking to add their own intermediate storage definitions can extend this list as follows: .. code-block:: python custom_storage_mode = ModeDefinition( ..., intermediate_storage_defs=default_intermediate_storage_defs + [custom_intermediate_storage_def] ) """
[docs]def io_manager_from_intermediate_storage(intermediate_storage_def): """Define an :py:class:`IOManagerDefinition` from an existing :py:class:`IntermediateStorageDefinition`. This method is used to adapt an existing user-defined intermediate storage to a IO manager resource, for example: .. code-block:: python my_io_manager_def = io_manager_from_intermediate_storage(my_intermediate_storage_def) @pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": my_io_manager_def})]) def my_pipeline(): ... Args: intermediate_storage_def (IntermediateStorageDefinition): The intermediate storage definition to be converted to an IO manager definition. Returns: IOManagerDefinition """ check.inst_param( intermediate_storage_def, "intermediate_storage_def", IntermediateStorageDefinition ) @io_manager def _io_manager(init_context): if not init_context.pipeline_run: raise DagsterInvariantViolationError( "Attempted to construct intermediate storage outside of execution context. " "Intermediate storage can only be constructed within the context of an execution." ) pipeline_run = init_context.pipeline_run instance = init_context.instance pipeline_def = init_context.pipeline_def_for_backwards_compat # depend on InitResourceContext.instance and pipeline_def_for_backwards_compat environment_config = EnvironmentConfig.build( pipeline_def, pipeline_run.run_config, mode=pipeline_run.mode ) mode_def = pipeline_def.get_mode_definition(pipeline_run.mode) intermediate_storage_context = InitIntermediateStorageContext( pipeline_def=pipeline_def, mode_def=mode_def, intermediate_storage_def=intermediate_storage_def, pipeline_run=pipeline_run, instance=instance, environment_config=environment_config, type_storage_plugin_registry=construct_type_storage_plugin_registry( pipeline_def, intermediate_storage_def ), resources=init_context.resources, intermediate_storage_config=environment_config.intermediate_storage.intermediate_storage_config, ) intermediate_storage = intermediate_storage_def.intermediate_storage_creation_fn( intermediate_storage_context ) return IntermediateStorageAdapter(intermediate_storage) return _io_manager