DagsterDocs

Dagster Lakehouse
Experimental
#

You can find the code for this example on Github

Lakehouse is set of APIs for defining pipelines that puts "assets", like database tables and ML models, at the center.

Lakehouse is built on top of Dagster's core abstractions, and is currently an experimental API.

In this example, we'll define some tables and generate a Dagster pipeline that updates them. We have a table of temperature samples collected in five-minute increments, and we want to compute a table of the highest temperatures for each day.

Data Assets#

Here are our asset (aka table) definitions.

"""Asset definitions for the simple_lakehouse example."""
import pandas as pd
from lakehouse import Column, computed_table, source_table
from pyarrow import date32, float64, string

sfo_q2_weather_sample_table = source_table(
    path="data",
    columns=[Column("tmpf", float64()), Column("valid_date", string())],
)


@computed_table(
    input_assets=[sfo_q2_weather_sample_table],
    columns=[Column("valid_date", date32()), Column("max_tmpf", float64())],
)
def daily_temperature_highs_table(sfo_q2_weather_sample: pd.DataFrame) -> pd.DataFrame:
    """Computes the temperature high for each day"""
    sfo_q2_weather_sample["valid_date"] = pd.to_datetime(sfo_q2_weather_sample["valid"])
    return sfo_q2_weather_sample.groupby("valid_date").max().rename(columns={"tmpf": "max_tmpf"})

They're pure functions that describe how the asset is derived from parent assets. They intentionally omit code for storing and retrieving the assets, because that code often varies across environments - e.g. we might want to store data in a local csv file for easy testing, but store data a data warehouse in production.

sfo_q2_weather_sample_table represents our base temperature table. "filesystem" is a name we have chosen to identify the storage system where this table lives. The path argument gives the path to the data asset itself within that storage system.

daily_temperature_highs_table represents our computed asset. We explicitly define the dependency on the original table by passing sfo_q2_weather_sample_table as the value for the input_deps argument.

Storage#

"""
This defines a Lakehouse with local storage and Pandas data processing.

Data is locally stored in csv files.

Pandas is used for data processing.  Data can be read from CSV files into a
pandas dataframe, and exported back into pandas dataframes.
"""
import os
from typing import Tuple

import pandas as pd
from dagster import ModeDefinition, StringSource, resource
from lakehouse import AssetStorage, Lakehouse


class LocalFileSystemStorage(AssetStorage):
    def __init__(self, root):
        self._root = root

    def _get_fs_path(self, path: Tuple[str, ...]) -> str:
        rpath = os.path.join(self._root, *path) + ".csv"
        return os.path.abspath(rpath)

    def save(self, obj: pd.DataFrame, path: Tuple[str, ...], _resources) -> None:
        """This saves the dataframe as a CSV."""
        fpath = self._get_fs_path(path)
        obj.to_csv(fpath)

    def load(self, _python_type, path: Tuple[str, ...], _resources):
        """This reads a dataframe from a CSV."""
        fpath = self._get_fs_path(path)
        return pd.read_csv(fpath)


@resource(config_schema={"root": StringSource})
def local_fs_storage(init_context):
    return LocalFileSystemStorage(init_context.resource_config["root"])


simple_lakehouse = Lakehouse(
    mode_defs=[
        ModeDefinition(
            resource_defs={"default_storage": local_fs_storage.configured({"root": "."})},
        )
    ]
)

We want to persist the data to disk using csv files. Then, we need to create an AssetStorage to describe the conversion between pandas dataframes and csv files.

The load function converts inputs to the required format for an asset. Since our base asset will represent a csv file, and our second asset will be processing a pandas dataframe, load will convert a csv to a dataframe. Likewise, we want to persist the results of our second asset as a csv file, so our save method converts a pandas dataframe to a csv.

Then, we construct our Lakehouse, which delegates conversion and storage between assets by utilizing the AssetStorage we just defined.

Pipeline#

The data assets, combined with the storage for handling conversion between data formats, completely define a computation graph. As a result, we can use the assets and storage to construct a pipeline.

"""Pipeline definitions for the simple_lakehouse example.
"""
from simple_lakehouse.assets import daily_temperature_highs_table
from simple_lakehouse.lakehouse_def import simple_lakehouse

computed_assets = [daily_temperature_highs_table]
simple_lakehouse_pipeline = simple_lakehouse.build_pipeline_definition(
    "simple_lakehouse_pipeline",
    computed_assets,
)

Note that the assets don't have to be provided in order. Lakehouse is able to determine asset ordering by resolving input asset dependencies.