DagsterDocs

Using Dagster with Great Expectations#

You can find the code for this example on Github

This example demonstrates how to use the GE solid factory dagster-ge to test incoming data against a set of expectations built through Great Expectations' tooling.

For this example, we'll be using two versions of a dataset of baseball team payroll and wins, with one version modified to hold incorrect data.

You can use ge_validation_solid_factory to generate Dagster solids that integrate with Great Expectations. For example, here we show a basic call to this GE solid factory, with two required arguments: datasource_name and expectation suite_name.

payroll_expectations = ge_validation_solid_factory(
    name="ge_validation_solid", datasource_name="getest", suite_name="basic.warning"
)

The GE validations will happen inside the solids created above. Each of the solids will yield an ExpectationResult with a structured dict of metadata from the GE suite. The structured metadata contain both summary stats from the suite and expectation by expectation results. The solids will output the full result in case you want to process it differently. Here's how other solids could use the full result, where expectation is the result:

@solid
def postprocess_payroll(_, numrows, expectation):
    if expectation["success"]:
        return numrows
    else:
        raise ValueError

Now let's take a brief look at the configurations in this example:

preset_defs = [
    PresetDefinition(
        "sample_preset_success",
        mode="basic",
        run_config={
            "resources": {
                "ge_data_context": {
                    "config": {"ge_root_dir": file_relative_path(__file__, "./great_expectations")}
                }
            },
            "solids": {
                "read_in_datafile": {
                    "inputs": {
                        "csv_path": {"value": file_relative_path(__file__, "./data/succeed.csv")}
                    }
                }
            },
        },
    ),
    PresetDefinition(
        "sample_preset_fail",
        mode="basic",
        run_config={
            "resources": {
                "ge_data_context": {
                    "config": {"ge_root_dir": file_relative_path(__file__, "./great_expectations")}
                }
            },
            "solids": {
                "read_in_datafile": {
                    "inputs": {
                        "csv_path": {"value": file_relative_path(__file__, "./data/fail.csv")}
                    }
                }
            },
        },
    ),
]

We've got two presets: one inputs a data file which will pass the expectation, while another won't. You can configure the GE Data Context via the ge_data_context resource from dagster-ge integration package. All we need to do to expose GE to Dagster is to provide the root of the GE directory (the path to the great_expectations file on your machine).

Finally, here's the full pipeline using the GE solid, with presets to use both the correct and incorrect data:

@pipeline(
    mode_defs=[ModeDefinition("basic", resource_defs={"ge_data_context": ge_data_context})],
    preset_defs=preset_defs,
)
def payroll_data_pipeline():
    output_df = read_in_datafile()

    return postprocess_payroll(process_payroll(output_df), payroll_expectations(output_df))