DagsterDocs

Source code for dagster_ge.factory

import datetime

import great_expectations as ge
from dagster import (
    EventMetadataEntry,
    ExpectationResult,
    InputDefinition,
    Noneable,
    Output,
    OutputDefinition,
    StringSource,
    check,
    resource,
    solid,
)
from dagster_pandas import DataFrame
from great_expectations.render.page_renderer_util import (
    render_multiple_validation_result_pages_markdown,
)

try:
    # ge < v0.13.0
    from great_expectations.core import convert_to_json_serializable
except ImportError:
    # ge >= v0.13.0
    from great_expectations.core.util import convert_to_json_serializable


@resource(config_schema={"ge_root_dir": Noneable(StringSource)})
def ge_data_context(context):
    if context.resource_config["ge_root_dir"] is None:
        yield ge.data_context.DataContext()
    else:
        yield ge.data_context.DataContext(context_root_dir=context.resource_config["ge_root_dir"])


[docs]def ge_validation_solid_factory( name, datasource_name, suite_name, validation_operator_name=None, input_dagster_type=DataFrame, batch_kwargs=None, ): """ Generates solids for interacting with GE. Args: name (str): the name of the solid datasource_name (str): the name of your DataSource, see your great_expectations.yml suite_name (str): the name of your expectation suite, see your great_expectations.yml validation_operator_name (Optional[str]): what validation operator to run -- defaults to None, which generates an ephemeral validator. If you want to save data docs, use 'action_list_operator'. See https://docs.greatexpectations.io/en/latest/reference/core_concepts/validation_operators_and_actions.html input_dagster_type (DagsterType): the Dagster type used to type check the input to the solid. Defaults to `dagster_pandas.DataFrame`. batch_kwargs (Optional[dict]): overrides the `batch_kwargs` parameter when calling the `ge_data_context`'s `get_batch` method. Defaults to `{"dataset": dataset}`, where `dataset` is the input to the generated solid. Returns: A solid that takes in a set of data and yields both an expectation with relevant metadata and an output with all the metadata (for user processing) """ check.str_param(datasource_name, "datasource_name") check.str_param(suite_name, "suite_name") check.opt_str_param(validation_operator_name, "validation_operator_name") batch_kwargs = check.opt_dict_param(batch_kwargs, "batch_kwargs") @solid( name=name, input_defs=[InputDefinition("dataset", input_dagster_type)], output_defs=[ OutputDefinition( dagster_type=dict, description=""" This solid yields an expectationResult with a structured dict of metadata from the GE suite, as well as the full result in case a user wants to process it differently. The structured dict contains both summary stats from the suite as well as expectation by expectation results/details. """, ) ], required_resource_keys={"ge_data_context"}, tags={"kind": "ge"}, ) def ge_validation_solid(context, dataset): data_context = context.resources.ge_data_context if validation_operator_name is not None: validation_operator = validation_operator_name else: data_context.add_validation_operator( "ephemeral_validation", {"class_name": "ActionListValidationOperator", "action_list": []}, ) validation_operator = "ephemeral_validation" suite = data_context.get_expectation_suite(suite_name) final_batch_kwargs = batch_kwargs or {"dataset": dataset} if "datasource" in batch_kwargs: context.log.warning( "`datasource` field of `batch_kwargs` will be ignored; use the `datasource_name` " "parameter of the solid factory instead." ) final_batch_kwargs["datasource"] = datasource_name batch = data_context.get_batch(final_batch_kwargs, suite) run_id = { "run_name": datasource_name + " run", "run_time": datetime.datetime.utcnow(), } results = data_context.run_validation_operator( validation_operator, assets_to_validate=[batch], run_id=run_id ) res = convert_to_json_serializable(results.list_validation_results())[0] md_str = render_multiple_validation_result_pages_markdown( validation_operator_result=results, run_info_at_end=True, ) meta_stats = EventMetadataEntry.md(md_str=md_str, label="Expectation Results") yield ExpectationResult( success=res["success"], metadata_entries=[ meta_stats, ], ) yield Output(res) return ge_validation_solid