DagsterDocs

Source code for dagster.core.definitions.preset

from collections import namedtuple

import pkg_resources
import yaml
from dagster import check
from dagster.core.definitions.utils import config_from_files, config_from_yaml_strings
from dagster.core.errors import DagsterInvariantViolationError
from dagster.utils.merger import deep_merge_dicts

from .mode import DEFAULT_MODE_NAME
from .utils import check_valid_name


[docs]class PresetDefinition( namedtuple("_PresetDefinition", "name run_config solid_selection mode tags") ): """Defines a preset configuration in which a pipeline can execute. Presets can be used in Dagit to load predefined configurations into the tool. Presets may also be used from the Python API (in a script, or in test) as follows: .. code-block:: python execute_pipeline(pipeline_def, preset='example_preset') Presets may also be used with the command line tools: .. code-block:: shell $ dagster pipeline execute example_pipeline --preset example_preset Args: name (str): The name of this preset. Must be unique in the presets defined on a given pipeline. run_config (Optional[dict]): A dict representing the config to set with the preset. This is equivalent to the ``run_config`` argument to :py:func:`execute_pipeline`. solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute with the preset. e.g. ``['*some_solid+', 'other_solid']`` mode (Optional[str]): The mode to apply when executing this preset. (default: 'default') tags (Optional[Dict[str, Any]]): The tags to apply when executing this preset. """ def __new__( cls, name, run_config=None, solid_selection=None, mode=None, tags=None, ): return super(PresetDefinition, cls).__new__( cls, name=check_valid_name(name), run_config=run_config, solid_selection=check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ), mode=check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME), tags=check.opt_dict_param(tags, "tags", key_type=str), )
[docs] @staticmethod def from_files(name, config_files=None, solid_selection=None, mode=None, tags=None): """Static constructor for presets from YAML files. Args: name (str): The name of this preset. Must be unique in the presets defined on a given pipeline. config_files (Optional[List[str]]): List of paths or glob patterns for yaml files to load and parse as the environment config for this preset. solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute with the preset. e.g. ``['*some_solid+', 'other_solid']`` mode (Optional[str]): The mode to apply when executing this preset. (default: 'default') tags (Optional[Dict[str, Any]]): The tags to apply when executing this preset. Returns: PresetDefinition: A PresetDefinition constructed from the provided YAML files. Raises: DagsterInvariantViolationError: When one of the YAML files is invalid and has a parse error. """ check.str_param(name, "name") config_files = check.opt_list_param(config_files, "config_files") solid_selection = check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ) mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) merged = config_from_files(config_files) return PresetDefinition(name, merged, solid_selection, mode, tags)
[docs] @staticmethod def from_yaml_strings(name, yaml_strings=None, solid_selection=None, mode=None, tags=None): """Static constructor for presets from YAML strings. Args: name (str): The name of this preset. Must be unique in the presets defined on a given pipeline. yaml_strings (Optional[List[str]]): List of yaml strings to parse as the environment config for this preset. solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute with the preset. e.g. ``['*some_solid+', 'other_solid']`` mode (Optional[str]): The mode to apply when executing this preset. (default: 'default') tags (Optional[Dict[str, Any]]): The tags to apply when executing this preset. Returns: PresetDefinition: A PresetDefinition constructed from the provided YAML strings Raises: DagsterInvariantViolationError: When one of the YAML documents is invalid and has a parse error. """ check.str_param(name, "name") yaml_strings = check.opt_list_param(yaml_strings, "yaml_strings", of_type=str) solid_selection = check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ) mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) merged = config_from_yaml_strings(yaml_strings) return PresetDefinition(name, merged, solid_selection, mode, tags)
[docs] @staticmethod def from_pkg_resources( name, pkg_resource_defs=None, solid_selection=None, mode=None, tags=None ): """Load a preset from a package resource, using :py:func:`pkg_resources.resource_string`. Example: .. code-block:: python PresetDefinition.from_pkg_resources( name='local', mode='local', pkg_resource_defs=[ ('dagster_examples.airline_demo.environments', 'local_base.yaml'), ('dagster_examples.airline_demo.environments', 'local_warehouse.yaml'), ], ) Args: name (str): The name of this preset. Must be unique in the presets defined on a given pipeline. pkg_resource_defs (Optional[List[(str, str)]]): List of pkg_resource modules/files to load as environment config for this preset. solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute with this partition. e.g. ``['*some_solid+', 'other_solid']`` mode (Optional[str]): The mode to apply when executing this preset. (default: 'default') tags (Optional[Dict[str, Any]]): The tags to apply when executing this preset. Returns: PresetDefinition: A PresetDefinition constructed from the provided YAML strings Raises: DagsterInvariantViolationError: When one of the YAML documents is invalid and has a parse error. """ pkg_resource_defs = check.opt_list_param( pkg_resource_defs, "pkg_resource_defs", of_type=tuple ) try: yaml_strings = [ pkg_resources.resource_string(*pkg_resource_def).decode("utf-8") for pkg_resource_def in pkg_resource_defs ] except (ModuleNotFoundError, FileNotFoundError, UnicodeDecodeError) as err: raise DagsterInvariantViolationError( "Encountered error attempting to parse yaml. Loading YAMLs from " f"package resources {pkg_resource_defs} " f'on preset "{name}".' ) from err return PresetDefinition.from_yaml_strings(name, yaml_strings, solid_selection, mode, tags)
[docs] def get_environment_yaml(self): """Get the environment dict set on a preset as YAML. Returns: str: The environment dict as YAML. """ return yaml.dump(self.run_config or {}, default_flow_style=False)
[docs] def with_additional_config(self, run_config): """Return a new PresetDefinition with additional config merged in to the existing config.""" check.opt_nullable_dict_param(run_config, "run_config") if run_config is None: return self else: initial_config = self.run_config or {} return PresetDefinition( name=self.name, solid_selection=self.solid_selection, mode=self.mode, tags=self.tags, run_config=deep_merge_dicts(initial_config, run_config), )