Hendrik Steinbach
02/02/2023, 9:30 AM@op(name="long_running_op",)
def materialize_view(context, i: int) -> None:
# in our case this is a long running op
if i == 3:
raise Exception("This op failed")
@op(out=DynamicOut())
def get_materialize_inputs(context) -> Generator:
outputs = range(1, 10)
for output in outputs:
yield DynamicOutput(output, mapping_key=str(output))
@job(description="Updates materialized views daily.",
config={
"execution": {
"config": {
"multiprocess": {
"max_concurrent": 1,
},
}
}
},
)
def materialize_job() -> None:
data_input = get_materialize_inputs()
results = data_input.map(materialize_view)
jamie
02/02/2023, 3:26 PM@op(name="long_running_op",)
def materialize_view(context, i: int) -> None:
instance = context.instance
failed_steps = instance.get_records_for_run(context.run_id, of_type=DagsterEventType.STEP_FAILURE)
if len(failed_steps) > 0:
raise Exception("failed because another step failed)
# in our case this is a long running op
if i == 3:
raise Exception("This op failed")
Hendrik Steinbach
02/03/2023, 10:05 AMlen(failed_steps.records) > 0