DagsterDocs

Composite Solids#

Dagster provides Composite Solids, which are a unit of abstraction for composing a solid from other solids.

Relevant APIs#

NameDescription
@composite_solidThe decorator used to define a composite solid.

Overview#

Solids are linked together by defining the dependencies between their inputs and outputs. Defining dependencies is usually done at the Pipeline level.

Composite solids also let you define solid dependencies to form an entirely new solid. This is useful for:

  • Organizing large or complicated graphs
  • Abstracting away complexity
  • Wrapping re-usable solids with domain-specific information

Refactoring a DAG of solids using composites is a very similar experience to refactoring code with functions. Defining a composite solid is similar to defining a pipeline, but composite solids can also provide mapping information to control how data and configuration enters and exists its inner graph of solids.


Defining a Composite Solid#

To define a composite solid, use the @composite_solid decorator.

We use function calls within the decorated function body to indicate the dependency structure between the solids making up the composite solid.

In this example, the add_one solid depends on the return_one solid's output. Because this data dependency exists, the return_one solid executes after add_one runs successfully and emits the required output.

@solid
def add_one(_, number: int):
    return number + 1


@solid
def multiply_by_three(_, number: int):
    return number * 3


@composite_solid(input_defs=[InputDefinition("number", int)])
def add_one_times_three_solid(number):
    return multiply_by_three(add_one(number))

Composite Solid Configuration#

By default, the config schemas for each solid in a composite solid is hoisted up to the composite solid itself, under the solid key.

In this example, you have two solids that both take config and are wrapped by a composite solid.

@solid(config_schema={"n": int})
def add_n(context, number: int):
    return number + context.solid_config["n"]


@solid(config_schema={"m": int})
def multiply_by_m(context, number: int):
    return number * context.solid_config["m"]


@composite_solid(input_defs=[InputDefinition("number", int)])
def add_n_times_m_solid(number):
    return multiply_by_m(add_n(number))

To run a pipeline with this composite solid, you will need to specify the config for both add_n and multiply_by_m through the composite solid:

solids:
  add_n_times_m_solid:
    inputs:
      number: 0
    solids:
      add_n:
        config:
          n: 3
      multiply_by_m:
        config:
          m: 2

Configuration Mapping#

Composite solids can also define a config schema. When a composite solid defines a config schema, it must also define a config_mapping_fn to map the composite solids config to the wrapped solids' config.

def config_mapping_fn(config):
    x = config["x"]
    return {"add_n": {"config": {"n": x}}, "multiply_by_m": {"config": {"m": x}}}


@composite_solid(
    config_fn=config_mapping_fn,
    config_schema={"x": int},
    input_defs=[InputDefinition("number", int)],
)
def add_x_multiply_by_x(number):
    return multiply_by_m(add_n(number))

In this example, the composite solid has only one field in the config schema: x. The config mapping function takes config provided to the composite solid and maps it to the wrapped solids.