Hi, I'm starting with Dagster and wrestling with D...
# ask-community
z
Hi, I'm starting with Dagster and wrestling with DBT module. I need to invoke 2 dbt run ops that are stored in different project dirs: "bronze" and "silver". The silver depends on bronze. Any idea how to do it?
s
One thing you can do is pass in a
Nothing
input. This way you create a dependency, but it isn't invoked in the second op
Copy code
@op()
def dbt_bronze():
    # code to run first op
    return results

@op(ins={"depends_on": In(Nothing)})
def dbt_gold():
    # code to run second_op

@job
def run_dbt():
    results = dbt_bronze()
    dbt_gold(depends_on=results)
z
Thanks Stephen! This is helpful, however, my problem is also that the debt stores the profile path in resources. I need to execute the same type of op with two different versions of resources.
I've hacked it this way:
Copy code
from dagster import job, op

from dagster_dbt import dbt_cli_resource


@op(required_resource_keys={'dbt'})
def elt_bronze(context, dependent_job=None):
    context.resources.dbt._default_flags['project-dir'] = context.op_config['project_dir']
    context.resources.dbt.run()


@op(required_resource_keys={'dbt'})
def elt_silver(context, dependent_job=None):
    context.resources.dbt._default_flags['project-dir'] = context.op_config['project_dir']
    context.resources.dbt.run()


@job(resource_defs={'dbt': dbt_cli_resource})
def elt():
    elt_silver(elt_bronze())


if __name__ == "__main__":
    result = elt.execute_in_process(run_config={
        "resources": {
            "dbt": {
                "config": {
                    "project_dir": "../dbt/kt_bronze",
                    "profiles_dir": "../dbt"
                }
            }
        },
        "ops": {
            "elt_bronze": {
                "config": {
                    "project_dir": "../dbt/kt_br"
                }
            },
            "elt_silver": {
                "config": {
                    "project_dir": "../dbt/kt_si"
                }
            }
        }})
ugly 🤷‍♂️
o
hi @Zdenek Svoboda! the context.resources.dbt.run() method can accept a
project_dir
parameter which will overwrite any flags on the resource, so if you don't mind hardcoding, you could just have
Copy code
@op(required_resource_keys={'dbt'})
def elt_silver(context, dependent_job=None):
    context.resources.dbt.run(project_dir="../dbt/kt_si")
❤️ 1
s
if you are okay setting two different resources, you can do :
Copy code
from dagster_dbt import dbt_cli_resource


@op(required_resource_keys={'dbt_bronze'})
def elt_bronze(context, dependent_job=None):
    context.resources.dbt_bronze.run()


@op(required_resource_keys={'dbt_silver'})
def elt_silver(context, dependent_job=None):
    context.resources.dbt_silver.run()

dbt_bronze_resource = dbt_cli_resource.configured(**{...})
dbt_silver_resource = dbt_cli_resource.configured(**{...})


@job(resource_defs={'dbt_bronze': dbt_bronze_resource, 'dbt_silver': dbt_silver_resource})
def elt():
    elt_silver(elt_bronze())


if __name__ == "__main__":
    result = elt.execute_in_process()
o
this also works ^ 🙂
z
Thank you!!!