https://dagster.io/ logo
#ask-community
Title
# ask-community
t

Tiri Georgiou

05/10/2022, 5:04 PM
Hi I’ve run into an issue with loading DBT models into dagster. I have quite a complicate model dependency on dbt and decided to use the
load_assets_from_dbt_project()
function to map these as assets on dagster side then create a python asset which sends over the end model by slack using a custom IOManager. However, it seems that many of these models are given the same step id and when I attempt to materialise, I am faced with a
Copy code
dagster.core.errors.DagsterExecutionStepNotFoundError: Can not build subset plan from unknown steps:
I have attached a subset of the dag to show that so many of these models are given
run_dbt_dw_prod_2b1b8
which I think leads to this error. Here are some snippets of code used to generate the map
Copy code
# loading asset with select statement that grabs all dependency from end table
DBT_LIDL_FAST = load_assets_from_dbt_project(
    project_dir=DBT_PROJECT_DIR,
    select="+lidl_uptime_fast",
    node_info_to_asset_key=callbacks._get_node_asset_key_dev,
    io_manager_key="dbt_pandas_io_manager",
)

# asset which exports to slack via custom io manager
@dag.asset(
    compute_kind="python",
    io_manager_key="dbt_slack_io_manager",
)
def lidl_uptime_fast_slack(context, published_lidl_uptime_fast):
    """DBTSlackIOManager does the heavy lifting of quering the upstream dbt model
    from the correct schema and the returned value is sent to slack This will vary depending on environments."""
    <http://context.log.info|context.log.info>(type(published_lidl_uptime_fast))
    return published_lidl_uptime_fast

# build job from assets 
dbt_lidl_fast_job = dag.AssetGroup(
    assets=[*dbt_uptime.DBT_LIDL_FAST, dbt_uptime.lidl_uptime_fast_slack],
    source_assets=[*dbt_uptime.MIS_SOURCE_ASSETS,
                   *dbt_uptime.NA_APPROVAL_SOURCE_ASSETS],
    resource_defs={
        **RESOURCES_PROD,
        "dbt": dbt_cli_resource.configured(DBT_CONFIG),
        "dbt_slack_io_manager": dbt_slack_io_manager.configured({
           ..credentials
        }),
        "dbt_pandas_io_manager": dbt_pandas_io_manager.configured({
            "schema": "dbt_tgeorgiou"
        })
    }
).build_job(
    name="dbt_lidl_fast_job",
    description="Runs the dbt models for lidl uptime fast. Exports to slack."
)
the
trm
models here are
views
and not tables, does this have an impact on dagsters side?
actually, it seems that if I have asset jobs that depend on the same upstream dependencies dagster complains when you try to materialise a subset of them
I suppose my question would be is it possible to generate a unique step id like
run_dbt_dw_prod_2b1b8
so that each asset being mapped from a model is unique?
Weird.. I’m getting different behaviour when using the
build_asset_job()
vs
AssetGroup.build_job()
as this correctly separates the step_ids that are independent in a DBT tree with overlapping model. It allows me to materialise a independent branches unlike the above ^^
Going to leave this up incase someone else has the same issue
y

yuhan

05/10/2022, 7:24 PM
cc @owen
o

owen

05/10/2022, 8:38 PM
hi @Tiri Georgiou! thanks for the detailed report, and I'm glad you found a fix to your issue, although I'm surprised that build_asset_job() had different behavior from AssetGroup.build_job(). To shed some light on why these models all have the same step id, it's because we attempt to (as much as possible) run all the dbt models with a single
dbt run
command, which means that a single operational step can map to many different assets. I'm not sure exactly what is causing the issue you're seeing (is it possible that your dbt project changed from when you first loaded your repository?), but either in this release or the next we'll be rolling out a new system for executing subsets of an asset graph which should be more consistent
happy to chat more about this use case if you run into more issues! the way you're using assets looks super slick 🙂
t

Tiri Georgiou

05/11/2022, 8:07 AM
hey @owen, yeah that makes sense. This one is quite tricky to explain but probably easier with a diagram (the above example has too many assets to make it a simple example lol).
Copy code
# example models as assets using load_dbt_assets

model1 -> model2  -> model3 -> published1

model1 -> model2 -> model4 -> published2
so we were sort of using the select statement as
+published1
and
+published2
from the
load_assets_from_dbt_project()
and then creating a separate
AssetGroup()
for both of these to use the
build_job()
method and schedule them separately. But the
build_job()
method seemed to group
model2, model3, model4
with the same
step_id
. So when we wanted to materialise on side of the branch i.e.
Copy code
model1 -> model2 -> model3 -> published1
it would complain about
can not build subset plan from unknown steps: <step_id>
and I think this is because
model4
has the same step_id as
model2,model3,model4
but because its not being materialised in the same run (as part of the subset of the dependency tree) it throws an error. I hope that example sheds some more light on this issue 😂 Edit: this issue was (sort of) solved with the
build_asset_job()
function because it would put the
model4
as a different step_id in this case, so dagster wouldn’t complain when materialising the path of assets
Copy code
model1 -> model2 -> model3 -> published1
or
Copy code
model1 -> model2 -> model4 -> published2
o

owen

05/11/2022, 4:31 PM
@Tiri Georgiou ahh I see what you mean now! glad you found a workaround, and we're doing some work in this asset selection space which I think should make your life a lot easier here. specifically, we're making it possible to select subsets of a step that produces multiple assets when you're building a job from an asset group. In practice, if you had a dbt project that produces models 1, 2, 3, 4, and then separate assets published1 and published2, you could put all of these in the same AssetGroup, and build two separate jobs:
Copy code
my_group = AssetGroup([*dbt_assets, published1, published2], ...)

published1_job = my_group.build_job("published1", selection="*published1")
published2_job = my_group.build_job("published2", selection="*published2")
This should prevent you from manually having to segment out the specific dependencies for each of these assets. Under the hood, when we create these two different jobs, we're configuring the step that produces the dbt assets to run a different command (i.e
dbt run --select ...
instead of just
dbt run
)
1
2 Views