Sterling Paramore
08/29/2022, 7:17 PMopA
runs some transformation and opB
saves the state. I want opB
to run regardless of whether opA
ran successfully or not. However, I also want the job to show up as a failed job in dagit if opA
fails.jamie
08/29/2022, 7:32 PM@op(
out=[Out("failed", is_required=False), Out("state", Any)]
)
def do_computation():
state = # do the computation
if computation_failed:
yield Output(True, "failed")
yield Output(state, "state")
@op
def save_state(state):
# save the state
return True
@op
def failure(failed, state_saved):
raise Exception("failed")
@graph
def computation_graph():
failed, state = do_computation()
saved = save_state(state)
failure(failed, saved)
in the do_computation
you optionally yield the output failed
if the computation failed. behind the scenes, dagster will only execute an op if all of its outputs have been provided, so if failed
isn't yielded from do_computation
then failure
wont run. And then to make sure that save_state
finishes before failure
runs you have to create a dependency between them (this could be a Nothing dependency)
this is based off of an example we have here https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#with-conditional-branchingSterling Paramore
08/29/2022, 8:23 PM