jasono
08/04/2023, 5:28 AMowen
08/04/2023, 5:24 PMjasono
08/04/2023, 6:32 PMowen
08/04/2023, 8:51 PMjasono
08/04/2023, 8:52 PMjasono
08/07/2023, 12:42 AM@dbt_assets(select="summary.base",
partitions_def=monthly_partitions)
def dbt_upstream_assets(context ...)
yield from _process_partitioned_dbt_assets(context= ...
@dbt_assets(select="summary.base",
partitions_def=monthly_partitions)
def dbt_downstream_assets(context ...)
yield from _process_partitioned_dbt_assets(context= ...
jasono
08/07/2023, 12:44 AMjasono
08/07/2023, 4:32 PMdagster_dbt_translator=CustomDagsterDbtTranslator()
in the @dbt_assets decorator hoping it will solve the issue. In fact, it gave me a different error stating "{folder name}\{subfolder name}\{sql filename}.sql' is not a valid name in Dagster". Before adding CustomDagsterDbtTranslator, Dagster automatically picked up the {sql filename} as the asset name, so I'm guessing CustomDagsterDbtTranslator should drop the folder names and file extension from the name somehow?Julien DEBLANDER
08/07/2023, 8:51 PMJulien DEBLANDER
08/07/2023, 8:58 PMpartition = context.asset_partition_key_for_output(output_name=context.partition_key)
jasono
08/08/2023, 5:49 PMasset_partition_key_for_output
doesn't work for dbt_assets
, but <https://github.com/dagster-io/dagster/discussions/14507>
does. Does it sound right?owen
08/08/2023, 6:27 PMjasono
08/08/2023, 7:03 PMThe output has no asset partitions
occurs if I call context.asset_partition_key_for_output
inside the dbt_assets block, but context.asset_partitions_time_window_for_output
returns the selected partition as expected.owen
08/09/2023, 6:54 PMcontext.partition_key
jasono
08/09/2023, 8:15 PM@dbt_assets(
manifest=h.DBT_MANIFEST,
select="claims.base_claims",
partitions_def=monthly_partitions
)
def base_models(context: OpExecutionContext,
dbt2: DbtCliResource):
first_partition, last_partition = context.asset_partitions_time_window_for_output(list(context.selected_output_names)[0])
yymm = int(h.yyyy_mm_dd_to_yymm(str(first_partition)))
dbt_vars = {"EndingYYMM": yymm, "b001_CM": h.yymm_to_sys_close_date_str("epic", yymm), "mga_yymm": h.period_shift(yymm, -1) }
dbt_args = ["run", "--vars", json.dumps(dbt_vars)]
yield from _process_partitioned_dbt_assets(context=context, dbt2=dbt2, dbt_args=dbt_args)
@dbt_assets(
manifest=h.DBT_MANIFEST,
select="claims.loss_trend",
partitions_def=monthly_partitions
)
def t1503_trend_loss(context: OpExecutionContext,
dbt2: DbtCliResource, a000_cdw_paid_source):
first_partition, last_partition = context.asset_partitions_time_window_for_output(list(context.selected_output_names)[0])
yymm = int(h.yyyy_mm_dd_to_yymm(str(first_partition)))
dbt_vars = {"EndingYYMM": yymm, "b001_CM": h.yymm_to_sys_close_date_str("epic", yymm), "mga_yymm": h.period_shift(yymm, -1) }
dbt_args = ["run", "--vars", json.dumps(dbt_vars)]
yield from _process_partitioned_dbt_assets(context=context, dbt2=dbt2, dbt_args=dbt_args)
owen
08/10/2023, 9:07 PMjasono
08/10/2023, 9:11 PMjasono
08/10/2023, 9:12 PMowen
08/11/2023, 8:53 PM