This guide explains how you can run a dbt project in your Dagster pipelines.
dbt (data build tool) helps engineers transform data in their warehouses by simply writing SELECT
statements. dbt automatically builds a dependency graph for your transformations and turns these SELECT
statements into tables and views in your data warehouse.
dbt not only runs your data transformations, but also can create data quality tests and generate documentation for your data, right out of the box. To learn more about dbt, visit the official dbt documentation website.
Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single pipeline. Dagster also provides built-in operational and data observability capabilities, like storing dbt run results longitudinally and sending alerts when a dbt run fails.
dagster-dbt
is an integration library that provides pre-built solids and resources for using dbt together with Dagster. The solids in the library are designed to be configurable for any dbt project.
The solids in dagster-dbt
following the following naming convention:
dbt_cli_*
will invoke the dbt command via the dbt CLI. See Use the dbt CLI in a Dagster pipeline for details.dbt_rpc_*
will send a request to a dbt RPC server to run a dbt command and return immediately with a request token. See Use the dbt RPC Server in a Dagster pipeline for details.dbt_rpc_*_and_wait
will send a request to a dbt RPC server to run a dbt command and poll the server until execution is completed or a timeout has been reached. See Use the dbt RPC Server in a Dagster pipeline for details.dagster-dbt
provides solids for running commands through the dbt CLI. By convention, these solids are named dagster_dbt.dbt_cli_*
.
To run the dbt CLI, your dbt project directory must be on your local filesystem and you must have a dbt profile already set up to connect to your data warehouse. Visit the official dbt CLI documentation for more details.
Here are some examples of how to invoke dbt run
with the solid dagster_dbt.dbt_cli_run
. Other dbt commands can be invoked via the CLI with their respective solid dagster_dbt.dbt_cli_*
.
Example: The solid dbt_cli_run
is configured to run your entire dbt project.
from dagster import pipeline
from dagster_dbt import dbt_cli_run
config = {"project-dir": "path/to/dbt/project"}
run_all_models = dbt_cli_run.configured(config, name="run_dbt_project")
@pipeline
def my_dbt_pipeline():
run_all_models()
Example: The solid dbt_cli_run
is configured to run specific models in your dbt project. This is similar to invoking dbt run --models tag:staging
.
from dagster import pipeline
from dagster_dbt import dbt_cli_run
config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]}
run_staging_models = dbt_cli_run.configured(config, name="run_staging_models")
@pipeline
def my_dbt_pipeline():
run_staging_models()
In the code snippet above, the config "models"
takes a list of strings. The string "tag:staging"
uses dbt's node selection syntax to filter models with the tag "staging"
. For more details, visit the official dbt documentation on the node selection syntax.
dagster-dbt
provides solids for running commands through the dbt RPC server. By convention, these solids are named dagster_dbt.dbt_rpc_*
.
Your dbt RPC server can be running locally or remotely. To use the dbt RPC solids in your Dagster pipeline, you will need to create a resource for your dbt RPC server. To learn more about Dagster resources, visit the Resources Overview.
dagster_dbt.dbt_rpc_resource
can be configured with your specific host and port.
from dagster_dbt import dbt_rpc_resource
my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080})
For convenience during local development, you may also use dagster_dbt.local_dbt_rpc_resource
, which is preconfigured for a dbt RPC server that is running on http://localhost:8580
.
Here are some examples of how to send dbt commands to a dbt RPC server with a solid.
Example: The solid dbt_rpc_run
will send a request to run your entire dbt project when you don't use any solid configuration.
from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run
@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
dbt_rpc_run()
The code snippet above shows a Dagster pipeline with a single solid dbt_rpc_run
. The solid dbt_rpc_run
has a required resource key "dbt_rpc"
. So, any pipeline that uses dbt_rpc_run
will need a ModeDefinition that defines a resource under the key "dbt_rpc"
.
Example: The solid dbt_rpc_run
is configured to run specific models in a dbt project. This is similar to having "params": {"models": "tag:staging"}
in your dbt RPC request body.
from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run
run_staging_models = dbt_rpc_run.configured(
{"models": ["tag:staging"]},
name="run_staging_models",
)
@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
run_staging_models()
Note that the solid above will NOT wait until the dbt RPC server has finished executing your request. Instead, it will return immediately with a request token from the dbt RPC server. If you want the solid to wait until execution is finished, see the dagster_dbt.dbt_rpc_run_and_wait
.
Example: The solid dbt_rpc_run_and_wait
will send a request to run specific models in a dbt project and then poll the dbt RPC server until it has finished executing your request.
from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run_and_wait
@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
dbt_rpc_run_and_wait()
dagster_dbt
currently does not provide solids for invoking dbt commands via dbt Cloud. However, this use case is possible by writing your own solid to create and start Jobs via the dbt Cloud API. For more details about each HTTP endpoint, visit the official documentation for the dbt Cloud API.
For full documentation on all available config, visit the API docs for dagster-dbt.
dbt CLI: Set the dbt profile and target to load
config = {"profile": PROFILE_NAME, "target": TARGET_NAME}
from dagster_dbt import dbt_cli_run
custom_solid = dbt_cli_run.configured(config, name="custom_solid")
dbt CLI: Set the path to the dbt executable
config = {"dbt_executable": "path/to/dbt/executable"}
from dagster_dbt import dbt_cli_run
custom_solid = dbt_cli_run.configured(config, name="custom_solid")
dbt CLI: Select specific models to run
config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}
from dagster_dbt import dbt_cli_run
custom_solid = dbt_cli_run.configured(config, name="custom_solid")
For more details, visit the official documentation on dbt's node selection syntax.
dbt CLI: Exclude specific models
config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}
from dagster_dbt import dbt_cli_run
custom_solid = dbt_cli_run.configured(config, name="custom_solid")
For more details, visit the official documentation on dbt's node selection syntax.
dbt CLI: Set key-values for dbt vars
config = {"vars": {"key": "value"}}
from dagster_dbt import dbt_cli_run
custom_solid = dbt_cli_run.configured(config, name="custom_solid")
For more details, visit the official documentation on using variables in dbt.
dbt CLI: Disable default asset materializations
config = {"yield_materializations": False}
from dagster_dbt import dbt_cli_run
custom_solid = dbt_cli_run.configured(config, name="custom_solid")
dbt RPC: Configure a remote dbt RPC resource
from dagster_dbt import dbt_rpc_resource
custom_resource = dbt_rpc_resource.configured({"host": HOST, "post": PORT})
dbt RPC: Select specific models to run
config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}
from dagster_dbt import dbt_rpc_run
custom_solid = dbt_rpc_run.configured(config, name="custom_solid")
For more details, visit the official documentation on dbt's node selection syntax.
dbt RPC: Exclude specific models
config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}
from dagster_dbt import dbt_rpc_run
custom_solid = dbt_rpc_run.configured(config, name="custom_solid")
For more details, visit the official documentation on dbt's node selection syntax.
dbt RPC: Configure polling interval when using a dbt_rpc_*_and_wait
solid
config = {"interval": 3} # Poll the dbt RPC server every 3 seconds.
from dagster_dbt import dbt_rpc_run
custom_solid = dbt_rpc_run.configured(config, name="custom_solid")
dbt RPC: Disable default asset materializations
config = {"yield_materializations": False}
from dagster_dbt import dbt_rpc_run
custom_solid = dbt_rpc_run.configured(config, name="custom_solid")
If you find a bug or want to add a feature to the dagster-dbt
library, we invite you to contribute.
If you have questions on using dbt with Dagster, we'd love to hear from you: