Xavier BALESI
08/05/2022, 10:36 AM@asset(name="upstream")
def upstream(_: OpExecutionContext):
return "upstream"
@asset(name="downstream", ins={"upstream": AssetIn("upstream")})
def downstream(_: OpExecutionContext, upstream):
return upstream + "downstream"
assets_with_resources = with_resources([upstream, downstream], resource_defs={...})
materialize_all = define_asset_job(name="materialize_all",config=config_from_files([...]))
@repository
def repo():
return [assets_with_resources, materialize_all]
# Note: in my custom io manager I load_input and handle_output a file named respectively from context.upstream_output.name and context.op_def.name
in dagit from the job view, when I launch upstream alone then downstream alone, all is right:
Loaded input "upstream" using input manager "my_io_manager"
but when I launch the whole job materialize_all it fails after :
Loaded input "upstream" using input manager "my_io_manager", from output "result" of step "upstream"
In the working case the output of upstream is named 'upstream' but in the breaking case it's named 'result'
I am expecting that when I launch the 2 assets separately and with materialize all, I have the same behaviourowen
08/05/2022, 5:21 PMcontext.upstream_output.name
and context.op_def.name
can cause issues like what you're seeing, as these are not guaranteed to stay the same between different executionsXavier BALESI
08/05/2022, 8:02 PM