Tiri Georgiou
05/10/2022, 5:04 PMload_assets_from_dbt_project()
function to map these as assets on dagster side then create a python asset which sends over the end model by slack using a custom IOManager. However, it seems that many of these models are given the same step id and when I attempt to materialise, I am faced with a
dagster.core.errors.DagsterExecutionStepNotFoundError: Can not build subset plan from unknown steps:
I have attached a subset of the dag to show that so many of these models are given run_dbt_dw_prod_2b1b8
which I think leads to this error. Here are some snippets of code used to generate the map
# loading asset with select statement that grabs all dependency from end table
DBT_LIDL_FAST = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_DIR,
select="+lidl_uptime_fast",
node_info_to_asset_key=callbacks._get_node_asset_key_dev,
io_manager_key="dbt_pandas_io_manager",
)
# asset which exports to slack via custom io manager
@dag.asset(
compute_kind="python",
io_manager_key="dbt_slack_io_manager",
)
def lidl_uptime_fast_slack(context, published_lidl_uptime_fast):
"""DBTSlackIOManager does the heavy lifting of quering the upstream dbt model
from the correct schema and the returned value is sent to slack This will vary depending on environments."""
<http://context.log.info|context.log.info>(type(published_lidl_uptime_fast))
return published_lidl_uptime_fast
# build job from assets
dbt_lidl_fast_job = dag.AssetGroup(
assets=[*dbt_uptime.DBT_LIDL_FAST, dbt_uptime.lidl_uptime_fast_slack],
source_assets=[*dbt_uptime.MIS_SOURCE_ASSETS,
*dbt_uptime.NA_APPROVAL_SOURCE_ASSETS],
resource_defs={
**RESOURCES_PROD,
"dbt": dbt_cli_resource.configured(DBT_CONFIG),
"dbt_slack_io_manager": dbt_slack_io_manager.configured({
..credentials
}),
"dbt_pandas_io_manager": dbt_pandas_io_manager.configured({
"schema": "dbt_tgeorgiou"
})
}
).build_job(
name="dbt_lidl_fast_job",
description="Runs the dbt models for lidl uptime fast. Exports to slack."
)
Tiri Georgiou
05/10/2022, 5:06 PMtrm
models here are views
and not tables, does this have an impact on dagsters side?Tiri Georgiou
05/10/2022, 5:14 PMTiri Georgiou
05/10/2022, 5:20 PMrun_dbt_dw_prod_2b1b8
so that each asset being mapped from a model is unique?Tiri Georgiou
05/10/2022, 5:31 PMbuild_asset_job()
vs AssetGroup.build_job()
as this correctly separates the step_ids that are independent in a DBT tree with overlapping model. It allows me to materialise a independent branches unlike the above ^^Tiri Georgiou
05/10/2022, 5:32 PMyuhan
05/10/2022, 7:24 PMowen
05/10/2022, 8:38 PMdbt run
command, which means that a single operational step can map to many different assets. I'm not sure exactly what is causing the issue you're seeing (is it possible that your dbt project changed from when you first loaded your repository?), but either in this release or the next we'll be rolling out a new system for executing subsets of an asset graph which should be more consistentowen
05/10/2022, 8:39 PMTiri Georgiou
05/11/2022, 8:07 AM# example models as assets using load_dbt_assets
model1 -> model2 -> model3 -> published1
model1 -> model2 -> model4 -> published2
so we were sort of using the select statement as +published1
and +published2
from the load_assets_from_dbt_project()
and then creating a separate AssetGroup()
for both of these to use the build_job()
method and schedule them separately. But the build_job()
method seemed to group model2, model3, model4
with the same step_id
. So when we wanted to materialise on side of the branch i.e.
model1 -> model2 -> model3 -> published1
it would complain about can not build subset plan from unknown steps: <step_id>
and I think this is because model4
has the same step_id as model2,model3,model4
but because its not being materialised in the same run (as part of the subset of the dependency tree) it throws an error. I hope that example sheds some more light on this issue 😂
Edit: this issue was (sort of) solved with the build_asset_job()
function because it would put the model4
as a different step_id in this case, so dagster wouldn’t complain when materialising the path of assets
model1 -> model2 -> model3 -> published1
or
model1 -> model2 -> model4 -> published2
owen
05/11/2022, 4:31 PMmy_group = AssetGroup([*dbt_assets, published1, published2], ...)
published1_job = my_group.build_job("published1", selection="*published1")
published2_job = my_group.build_job("published2", selection="*published2")
This should prevent you from manually having to segment out the specific dependencies for each of these assets. Under the hood, when we create these two different jobs, we're configuring the step that produces the dbt assets to run a different command (i.e dbt run --select ...
instead of just dbt run
)