https://dagster.io/ logo
Title
h

Hendrik Steinbach

02/02/2023, 9:30 AM
Hi guys, first of all, thanks for this great product! We really appreciate dagster as it takes a lot of work from our data engineering team! Atm I’m facing a problem with DynamicOutput Ops and don’t really find a neat solution for it. The case: • An op yields n-DynamicOutputs, each Output contains some long running execution tasks • The mapped operation executes these dynamic ones • Execution should be in sequence AND we should be able to re-execute them individually as some external factors can lead to failures within the single Outputs • Problem: If one prior dynamicOutput failed, the future ones should also fail/ stop executing I already tried to request the DagsterRunStatus of the pipelineRun, however this is “Started”, even if an op already failed within this job. In my simplistic example, all op’s with input higher than 2 shouldn’t execute anymore. After all this searching, I’m yet unsure whether the DynamicOutputs are really the right approach. So please let me know, if this is the case. My example:
@op(name="long_running_op",)
def materialize_view(context, i: int) -> None:
    # in our case this is a long running op
    if i == 3:
        raise Exception("This op failed")


@op(out=DynamicOut())
def get_materialize_inputs(context) -> Generator:
    outputs = range(1, 10)
    for output in outputs:
        yield DynamicOutput(output, mapping_key=str(output))

        
        

@job(description="Updates materialized views daily.",
     config={
         "execution": {
             "config": {
                 "multiprocess": {
                     "max_concurrent": 1,
                 },
             }
         }
     },
     )
def materialize_job() -> None:
    data_input = get_materialize_inputs()
    results = data_input.map(materialize_view)
j

jamie

02/02/2023, 3:26 PM
Hey @Hendrik Steinbach there isn’t a native way to tell dagster to stop execution of other dynamic steps if one fails. I think what you could do though is query for step failure events. It’d look like this
@op(name="long_running_op",)
def materialize_view(context, i: int) -> None:
    instance = context.instance
    failed_steps = instance.get_records_for_run(context.run_id, of_type=DagsterEventType.STEP_FAILURE)

    if len(failed_steps) > 0:
         raise Exception("failed because another step failed)

    # in our case this is a long running op
    if i == 3:
        raise Exception("This op failed")
:rainbow-daggy: 1
h

Hendrik Steinbach

02/03/2023, 10:05 AM
Thank you so much! Just checked it and with a small change it works as desired! I just modified the condition, because failed_steps is a NamedTuple itself and failed steps appear with the records list.
len(failed_steps.records) > 0