Zdenek Svoboda
06/08/2022, 2:42 PMStephen Bailey
06/08/2022, 3:17 PMNothing
input. This way you create a dependency, but it isn't invoked in the second op
@op()
def dbt_bronze():
# code to run first op
return results
@op(ins={"depends_on": In(Nothing)})
def dbt_gold():
# code to run second_op
@job
def run_dbt():
results = dbt_bronze()
dbt_gold(depends_on=results)
Stephen Bailey
06/08/2022, 3:17 PMZdenek Svoboda
06/08/2022, 5:12 PMZdenek Svoboda
06/08/2022, 5:14 PMfrom dagster import job, op
from dagster_dbt import dbt_cli_resource
@op(required_resource_keys={'dbt'})
def elt_bronze(context, dependent_job=None):
context.resources.dbt._default_flags['project-dir'] = context.op_config['project_dir']
context.resources.dbt.run()
@op(required_resource_keys={'dbt'})
def elt_silver(context, dependent_job=None):
context.resources.dbt._default_flags['project-dir'] = context.op_config['project_dir']
context.resources.dbt.run()
@job(resource_defs={'dbt': dbt_cli_resource})
def elt():
elt_silver(elt_bronze())
if __name__ == "__main__":
result = elt.execute_in_process(run_config={
"resources": {
"dbt": {
"config": {
"project_dir": "../dbt/kt_bronze",
"profiles_dir": "../dbt"
}
}
},
"ops": {
"elt_bronze": {
"config": {
"project_dir": "../dbt/kt_br"
}
},
"elt_silver": {
"config": {
"project_dir": "../dbt/kt_si"
}
}
}})
Zdenek Svoboda
06/08/2022, 5:15 PMowen
06/08/2022, 5:21 PMproject_dir
parameter which will overwrite any flags on the resource, so if you don't mind hardcoding, you could just have
@op(required_resource_keys={'dbt'})
def elt_silver(context, dependent_job=None):
context.resources.dbt.run(project_dir="../dbt/kt_si")
Stephen Bailey
06/08/2022, 5:23 PMfrom dagster_dbt import dbt_cli_resource
@op(required_resource_keys={'dbt_bronze'})
def elt_bronze(context, dependent_job=None):
context.resources.dbt_bronze.run()
@op(required_resource_keys={'dbt_silver'})
def elt_silver(context, dependent_job=None):
context.resources.dbt_silver.run()
dbt_bronze_resource = dbt_cli_resource.configured(**{...})
dbt_silver_resource = dbt_cli_resource.configured(**{...})
@job(resource_defs={'dbt_bronze': dbt_bronze_resource, 'dbt_silver': dbt_silver_resource})
def elt():
elt_silver(elt_bronze())
if __name__ == "__main__":
result = elt.execute_in_process()
owen
06/08/2022, 5:23 PMZdenek Svoboda
06/08/2022, 6:14 PM