Hi, how does a dbt model work as a upstream depend...
# integration-dbt
j
Hi, how does a dbt model work as a upstream dependency of a monthly partitioned Dagster asset? Should I somehow make the dbt model partitioned monthly?
o
hi @jasono! it's allowable for an unpartitioned asset to be upstream of a monthly-partitioned asset. however, the default loading behavior of an unpartitioned asset is to just load all of it, which might not be exactly what you want for your downstream monthly-partitioned asset (e.g. you might want to load just the matching month's worth of data). This is probably easiest to handle by filtering out unwanted data inside the body of your monthly-partitioned asset. However, you could also imagine partitioning your dbt asset. This is generally done by making it into an incremental model, then using variables to pass in the date range that it's intended to load. your dbt asset could be partitioned daily (or even hourly), and still be compatible with the downstream monthly-partitioned asset
j
@owen Thanks. Is there an example of how to define (a subset of) dbt models as monthly partitioned? The tutorials don't seem to show partitioned scenarios so I'm having trouble wrapping my head around it.
o
Hi @jasono! This discussion might be helpful: https://github.com/dagster-io/dagster/discussions/14507
j
@owen Thank you!
@owen After adapting my code to follow the pattern in the above hooli example, I get `dagster._check_CheckError: Failure condition: The output has no asset partitions". error. The dbt_upstream_asset selects a set of dbt models with a monthly partition.
Copy code
@dbt_assets(select="summary.base",
    partitions_def=monthly_partitions)
def dbt_upstream_assets(context ...)
  yield from _process_partitioned_dbt_assets(context= ...

@dbt_assets(select="summary.base",
    partitions_def=monthly_partitions)
def dbt_downstream_assets(context ...)
  yield from _process_partitioned_dbt_assets(context= ...
I thought passing partitions_def in @dbt_assets will make its outputs monthly-partitioned. Not true?
I just tried to add
dagster_dbt_translator=CustomDagsterDbtTranslator()
in the @dbt_assets decorator hoping it will solve the issue. In fact, it gave me a different error stating "{folder name}\{subfolder name}\{sql filename}.sql' is not a valid name in Dagster". Before adding CustomDagsterDbtTranslator, Dagster automatically picked up the {sql filename} as the asset name, so I'm guessing CustomDagsterDbtTranslator should drop the folder names and file extension from the name somehow?
j
@owen I'm getting the same error: "dagster._check.CheckError: Failure condition: The output has no asset partitions" while creating a dynamic partitioning
I replaced the asset_partitions_time_window_for_output with
Copy code
partition = context.asset_partition_key_for_output(output_name=context.partition_key)
j
@owen n It appears that
asset_partition_key_for_output
doesn't work for
dbt_assets
, but
<https://github.com/dagster-io/dagster/discussions/14507>
does. Does it sound right?
o
@jasono could you say a bit more / give a code example of what's not working?
j
@owen Sorry there was a typo in my earlier message. The error
The output has no asset partitions
occurs if I call
context.asset_partition_key_for_output
inside the dbt_assets block, but
context.asset_partitions_time_window_for_output
returns the selected partition as expected.
o
hm can you provide some sample code? under the hood, both of those functions are doing the same check, and so if one errors, the other one should as well. However, if you just want the partition key, you could also just directly use
context.partition_key
j
Here are the dbt_assets definition in one python module. The dbt models in t1503_trend_loss refer to those in base_models, and the dagster diagram automatically shows that link. But when I "materialize all" the assets this module in Dagit, it skips materializing t1503_trend_loss with no clear explanation.
Copy code
@dbt_assets(
    manifest=h.DBT_MANIFEST,
    select="claims.base_claims",
    partitions_def=monthly_partitions
)
def base_models(context: OpExecutionContext,
    dbt2: DbtCliResource):
    first_partition, last_partition = context.asset_partitions_time_window_for_output(list(context.selected_output_names)[0])

    yymm = int(h.yyyy_mm_dd_to_yymm(str(first_partition)))
 
    dbt_vars = {"EndingYYMM": yymm, "b001_CM": h.yymm_to_sys_close_date_str("epic", yymm), "mga_yymm":  h.period_shift(yymm, -1) }

    dbt_args = ["run", "--vars", json.dumps(dbt_vars)]

    yield from _process_partitioned_dbt_assets(context=context, dbt2=dbt2, dbt_args=dbt_args)

 
@dbt_assets(
    manifest=h.DBT_MANIFEST,
    select="claims.loss_trend",
    partitions_def=monthly_partitions
)
def t1503_trend_loss(context: OpExecutionContext,

    dbt2: DbtCliResource, a000_cdw_paid_source):

    first_partition, last_partition = context.asset_partitions_time_window_for_output(list(context.selected_output_names)[0])

    yymm = int(h.yyyy_mm_dd_to_yymm(str(first_partition)))

    dbt_vars = {"EndingYYMM": yymm, "b001_CM": h.yymm_to_sys_close_date_str("epic", yymm), "mga_yymm":  h.period_shift(yymm, -1) }

    dbt_args = ["run", "--vars", json.dumps(dbt_vars)]

    yield from _process_partitioned_dbt_assets(context=context, dbt2=dbt2, dbt_args=dbt_args)
o
I see -- currently it is not supported to pass additional assets (such as a000_cdw_paid_source) via the dbt_assets decorator, and this should raise an error (https://github.com/dagster-io/dagster/issues/15800) is it possible to add that a source to your dbt project instead of making this dependency via dagster?
j
Not in my use case. I think it will be very useful to allow users to group dbt models separately and specify dependency between them, just like any other assets
Dbt graphs tend to be very large and it it very common different parts of dbt dag are run one after another in a particular order.
o
are there dbt dependencies between these models? even if just one model in the "downstream" set depends on a model in the "upstream" set, then a job that's created to materialize both the downstream and upstream sets would run all the upstream models first, then all the downstream models after, if that makes sense