DagsterDocs

Source code for dagster.core.definitions.partition

import inspect
from collections import namedtuple

import pendulum
from dagster import check
from dagster.core.definitions.job import RunRequest, SkipReason
from dagster.core.definitions.schedule import ScheduleDefinition, ScheduleExecutionContext
from dagster.core.errors import (
    DagsterInvalidDefinitionError,
    ScheduleExecutionError,
    user_code_error_boundary,
)
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter
from dagster.core.storage.tags import check_tags
from dagster.utils import merge_dicts

from .mode import DEFAULT_MODE_NAME
from .utils import check_valid_name


def by_name(partition):
    return partition.name


[docs]class Partition(namedtuple("_Partition", ("value name"))): """ Partition is the representation of a logical slice across an axis of a pipeline's work Args: value (Any): The object for this partition name (str): Name for this partition """ def __new__(cls, value=None, name=None): return super(Partition, cls).__new__( cls, name=check.opt_str_param(name, "name", str(value)), value=value )
def last_empty_partition(context, partition_set_def): check.inst_param(context, "context", ScheduleExecutionContext) partition_set_def = check.inst_param( partition_set_def, "partition_set_def", PartitionSetDefinition ) partitions = partition_set_def.get_partitions(context.scheduled_execution_time) if not partitions: return None selected = None for partition in reversed(partitions): filters = PipelineRunsFilter.for_partition(partition_set_def, partition) matching = context.instance.get_runs(filters) if not any(run.status == PipelineRunStatus.SUCCESS for run in matching): selected = partition break return selected def first_partition(context, partition_set_def=None): check.inst_param(context, "context", ScheduleExecutionContext) partition_set_def = check.inst_param( partition_set_def, "partition_set_def", PartitionSetDefinition ) partitions = partition_set_def.get_partitions(context.scheduled_execution_time) if not partitions: return None return partitions[0]
[docs]class PartitionSetDefinition( namedtuple( "_PartitionSetDefinition", ( "name pipeline_name partition_fn solid_selection mode " "user_defined_run_config_fn_for_partition user_defined_tags_fn_for_partition" ), ) ): """ Defines a partition set, representing the set of slices making up an axis of a pipeline Args: name (str): Name for this partition set pipeline_name (str): The name of the pipeline definition partition_fn (Callable[void, List[Partition]]): User-provided function to define the set of valid partition objects. solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute with this partition. e.g. ``['*some_solid+', 'other_solid']`` mode (Optional[str]): The mode to apply when executing this partition. (default: 'default') run_config_fn_for_partition (Callable[[Partition], [Dict]]): A function that takes a :py:class:`~dagster.Partition` and returns the run configuration that parameterizes the execution for this partition, as a dict tags_fn_for_partition (Callable[[Partition], Optional[dict[str, str]]]): A function that takes a :py:class:`~dagster.Partition` and returns a list of key value pairs that will be added to the generated run for this partition. """ def __new__( cls, name, pipeline_name, partition_fn, solid_selection=None, mode=None, run_config_fn_for_partition=lambda _partition: {}, tags_fn_for_partition=lambda _partition: {}, ): partition_fn_param_count = len(inspect.signature(partition_fn).parameters) def _wrap_partition(x): if isinstance(x, Partition): return x if isinstance(x, str): return Partition(x) raise DagsterInvalidDefinitionError( "Expected <Partition> | <str>, received {type}".format(type=type(x)) ) def _wrap_partition_fn(current_time=None): if not current_time: current_time = pendulum.now("UTC") check.callable_param(partition_fn, "partition_fn") if partition_fn_param_count == 1: obj_list = partition_fn(current_time) else: obj_list = partition_fn() return [_wrap_partition(obj) for obj in obj_list] return super(PartitionSetDefinition, cls).__new__( cls, name=check_valid_name(name), pipeline_name=check.str_param(pipeline_name, "pipeline_name"), partition_fn=_wrap_partition_fn, solid_selection=check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ), mode=check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME), user_defined_run_config_fn_for_partition=check.callable_param( run_config_fn_for_partition, "run_config_fn_for_partition" ), user_defined_tags_fn_for_partition=check.callable_param( tags_fn_for_partition, "tags_fn_for_partition" ), ) def run_config_for_partition(self, partition): return self.user_defined_run_config_fn_for_partition(partition) def tags_for_partition(self, partition): user_tags = self.user_defined_tags_fn_for_partition(partition) check_tags(user_tags, "user_tags") tags = merge_dicts(user_tags, PipelineRun.tags_for_partition_set(self, partition)) return tags def get_partitions(self, current_time=None): return self.partition_fn(current_time) def get_partition(self, name): for partition in self.get_partitions(): if partition.name == name: return partition check.failed("Partition name {} not found!".format(name)) def get_partition_names(self, current_time=None): return [part.name for part in self.get_partitions(current_time)] def create_schedule_definition( self, schedule_name, cron_schedule, partition_selector, should_execute=None, environment_vars=None, execution_timezone=None, description=None, ): """Create a ScheduleDefinition from a PartitionSetDefinition. Arguments: schedule_name (str): The name of the schedule. cron_schedule (str): A valid cron string for the schedule partition_selector (Callable[ScheduleExecutionContext, PartitionSetDefinition], Partition): Function that determines the partition to use at a given execution time. For time-based partition sets, will likely be either `identity_partition_selector` or a selector returned by `create_offset_partition_selector`. should_execute (Optional[function]): Function that runs at schedule execution time that determines whether a schedule should execute. Defaults to a function that always returns ``True``. environment_vars (Optional[dict]): The environment variables to set for the schedule. execution_timezone (Optional[str]): Timezone in which the schedule should run. Only works with DagsterDaemonScheduler, and must be set when using that scheduler. description (Optional[str]): A human-readable description of the schedule. Returns: ScheduleDefinition: The generated ScheduleDefinition for the partition selector """ check.str_param(schedule_name, "schedule_name") check.str_param(cron_schedule, "cron_schedule") check.opt_callable_param(should_execute, "should_execute") check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) check.callable_param(partition_selector, "partition_selector") check.opt_str_param(execution_timezone, "execution_timezone") check.opt_str_param(description, "description") def _execution_fn(context): check.inst_param(context, "context", ScheduleExecutionContext) with user_code_error_boundary( ScheduleExecutionError, lambda: f"Error occurred during the execution of partition_selector for schedule {schedule_name}", ): selected_partition = partition_selector(context, self) if not selected_partition: yield SkipReason( "Partition selector did not return a partition. Make sure that the timezone " "on your partition set matches your execution timezone." ) return if selected_partition.name not in self.get_partition_names( context.scheduled_execution_time ): yield SkipReason( f"Partition selector returned a partition {selected_partition.name} not in the partition set." ) return with user_code_error_boundary( ScheduleExecutionError, lambda: f"Error occurred during the execution of should_execute for schedule {schedule_name}", ): if should_execute and not should_execute(context): yield SkipReason( "should_execute function for {schedule_name} returned false.".format( schedule_name=schedule_name ) ) return with user_code_error_boundary( ScheduleExecutionError, lambda: f"Error occurred during the execution of run_config_fn for schedule {schedule_name}", ): run_config = self.run_config_for_partition(selected_partition) with user_code_error_boundary( ScheduleExecutionError, lambda: f"Error occurred during the execution of tags_fn for schedule {schedule_name}", ): tags = self.tags_for_partition(selected_partition) yield RunRequest( run_key=None, run_config=run_config, tags=tags, ) return PartitionScheduleDefinition( name=schedule_name, cron_schedule=cron_schedule, pipeline_name=self.pipeline_name, tags_fn=None, solid_selection=self.solid_selection, mode=self.mode, should_execute=None, environment_vars=environment_vars, partition_set=self, execution_timezone=execution_timezone, execution_fn=_execution_fn, description=description, )
[docs]class PartitionScheduleDefinition(ScheduleDefinition): __slots__ = ["_partition_set"] def __init__( self, name, cron_schedule, pipeline_name, tags_fn, solid_selection, mode, should_execute, environment_vars, partition_set, run_config_fn=None, execution_timezone=None, execution_fn=None, description=None, ): super(PartitionScheduleDefinition, self).__init__( name=check_valid_name(name), cron_schedule=cron_schedule, pipeline_name=pipeline_name, run_config_fn=run_config_fn, tags_fn=tags_fn, solid_selection=solid_selection, mode=mode, should_execute=should_execute, environment_vars=environment_vars, execution_timezone=execution_timezone, execution_fn=execution_fn, description=description, ) self._partition_set = check.inst_param( partition_set, "partition_set", PartitionSetDefinition ) def get_partition_set(self): return self._partition_set