DagsterDocs

Source code for dagster.core.types.config_schema

import hashlib

from dagster import check
from dagster.config.config_type import ConfigType
from dagster.core.decorator_utils import (
    split_function_parameters,
    validate_decorated_fn_positionals,
)
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.utils import ensure_gen
from dagster.utils.backcompat import experimental_arg_warning


[docs]class DagsterTypeLoader: """ Dagster type loaders are used to load unconnected inputs of the dagster type they are attached to. The recommended way to define a type loader is with the :py:func:`@dagster_type_loader <dagster_type_loader>` decorator. """ @property def schema_type(self): check.not_implemented( "Must override schema_type in {klass}".format(klass=type(self).__name__) ) @property def loader_version(self): return None def compute_loaded_input_version(self, _config_value): return None def construct_from_config_value(self, _context, config_value): """ How to create a runtime value from config data. """ return config_value def required_resource_keys(self): return frozenset()
[docs]class DagsterTypeMaterializer: """ Dagster type materializers are used to materialize outputs of the dagster type they are attached to. The recommended way to define a type loader is with the :py:func:`@dagster_type_materializer <dagster_type_materializer>` decorator. """ @property def schema_type(self): check.not_implemented( "Must override schema_type in {klass}".format(klass=type(self).__name__) ) def materialize_runtime_values(self, _context, _config_value, _runtime_value): """ How to materialize a runtime value given configuration. """ check.not_implemented("Must implement") def required_resource_keys(self): return frozenset()
class DagsterTypeLoaderFromDecorator(DagsterTypeLoader): def __init__( self, config_type, func, required_resource_keys, loader_version=None, external_version_fn=None, ): self._config_type = check.inst_param(config_type, "config_type", ConfigType) self._func = check.callable_param(func, "func") self._required_resource_keys = check.opt_set_param( required_resource_keys, "required_resource_keys", of_type=str ) self._loader_version = check.opt_str_param(loader_version, "loader_version") if self._loader_version: experimental_arg_warning("loader_version", "DagsterTypeLoaderFromDecorator.__init__") self._external_version_fn = check.opt_callable_param( external_version_fn, "external_version_fn" ) if self._external_version_fn: experimental_arg_warning( "external_version_fn", "DagsterTypeLoaderFromDecorator.__init__" ) @property def schema_type(self): return self._config_type @property def loader_version(self): return self._loader_version def compute_loaded_input_version(self, config_value): """Compute the type-loaded input from a given config_value. Args: config_value (Union[Any, Dict]): Config value to be ingested by the external version loading function. Returns: Optional[str]: Hash of concatenated loader version and external input version if both are provided, else None. """ version = "" if self.loader_version: version += str(self.loader_version) if self._external_version_fn: ext_version = self._external_version_fn(config_value) version += str(ext_version) if version == "": return None # Sentinel value for no version provided. else: return hashlib.sha1(version.encode("utf-8")).hexdigest() def construct_from_config_value(self, context, config_value): return self._func(context, config_value) def required_resource_keys(self): return frozenset(self._required_resource_keys) def _create_type_loader_for_decorator( config_type, func, required_resource_keys, loader_version=None, external_version_fn=None, ): return DagsterTypeLoaderFromDecorator( config_type, func, required_resource_keys, loader_version, external_version_fn )
[docs]def dagster_type_loader( config_schema, required_resource_keys=None, loader_version=None, external_version_fn=None, ): """Create an dagster type loader that maps config data to a runtime value. The decorated function should take the execution context and parsed config value and return the appropriate runtime value. Args: config_schema (ConfigSchema): The schema for the config that's passed to the decorated function. loader_version (str): (Experimental) The version of the decorated compute function. Two loading functions should have the same version if and only if they deterministically produce the same outputs when provided the same inputs. external_version_fn (Callable): (Experimental) A function that takes in the same parameters as the loader function (config_value) and returns a representation of the version of the external asset (str). Two external assets with identical versions are treated as identical to one another. Examples: .. code-block:: python @dagster_type_loader(Permissive()) def load_dict(_context, value): return value """ from dagster.config.field import resolve_to_config_type config_type = resolve_to_config_type(config_schema) EXPECTED_POSITIONALS = ["context", "*"] def wrapper(func): fn_positionals, _ = split_function_parameters(func, EXPECTED_POSITIONALS) missing_positional = validate_decorated_fn_positionals(fn_positionals, EXPECTED_POSITIONALS) if missing_positional: raise DagsterInvalidDefinitionError( "@dagster_type_loader '{solid_name}' decorated function does not have required positional " "parameter '{missing_param}'. Solid functions should only have keyword arguments " "that match input names and a first positional parameter named 'context'.".format( solid_name=func.__name__, missing_param=missing_positional ) ) return _create_type_loader_for_decorator( config_type, func, required_resource_keys, loader_version, external_version_fn ) return wrapper
class DagsterTypeMaterializerForDecorator(DagsterTypeMaterializer): def __init__(self, config_type, func, required_resource_keys): self._config_type = check.inst_param(config_type, "config_type", ConfigType) self._func = check.callable_param(func, "func") self._required_resource_keys = check.opt_set_param( required_resource_keys, "required_resource_keys", of_type=str ) @property def schema_type(self): return self._config_type def materialize_runtime_values(self, context, config_value, runtime_value): return ensure_gen(self._func(context, config_value, runtime_value)) def required_resource_keys(self): return frozenset(self._required_resource_keys) def _create_output_materializer_for_decorator(config_type, func, required_resource_keys): return DagsterTypeMaterializerForDecorator(config_type, func, required_resource_keys)
[docs]def dagster_type_materializer(config_schema, required_resource_keys=None): """Create an output materialization hydration config that configurably materializes a runtime value. The decorated function should take the execution context, the parsed config value, and the runtime value and the parsed config data, should materialize the runtime value, and should return an appropriate :py:class:`AssetMaterialization`. Args: config_schema (Any): The type of the config data expected by the decorated function. Examples: .. code-block:: python # Takes a list of dicts such as might be read in using csv.DictReader, as well as a config value, and writes @dagster_type_materializer(str) def materialize_df(_context, path, value): with open(path, 'w') as fd: writer = csv.DictWriter(fd, fieldnames=value[0].keys()) writer.writeheader() writer.writerows(rowdicts=value) return AssetMaterialization.file(path) """ from dagster.config.field import resolve_to_config_type config_type = resolve_to_config_type(config_schema) return lambda func: _create_output_materializer_for_decorator( config_type, func, required_resource_keys )