Hi Team, retry_policy / op_retry_policy injected u...
# ask-community
s
Hi Team, retry_policy / op_retry_policy injected using opDefinition/graphDefinition is not working as expected. Any help on this issue ?
o
hi @Sundara Moorthy -- what is not working as expected? do you mind sharing some sample code / the behavior you're experiencing?
s
On running below script with in_process_executor and if i terminate the run using dagster UI. Op's retry policy is working. But if i change the executor to multiprocess_executor, getting childprocess crash exception and op's retry is not working.
Copy code
from dagster import graph, op
from dagster import (
    In, Out,
    repository,
    Output,
    ResourceDefinition,
    DependencyDefinition,
    OpDefinition,
    GraphDefinition,
    in_process_executor,
    multiprocess_executor,
    graph, reconstructable,DagsterInstance,execute_job,
    RetryPolicy,Backoff,Jitter,
    RetryRequested,
    DagsterExecutionInterruptedError,
    Failure
)

def x_solid(context,inputs):
    try:
        <http://context.log.info|context.log.info>("Starting the op's")
        try:
            <http://context.log.info|context.log.info>("Start the process")
            count=1
            while True:
                <http://context.log.info|context.log.info>("Pls raise the interrupt :{}".format(count))
                count+=1
            yield Output(key="str_output",value="SUCCESS")
        except DagsterExecutionInterruptedError:
            <http://context.log.info|context.log.info>("Propagating dagster error to the root level >>")
            raise DagsterExecutionInterruptedError
        except Exception as e:
            <http://context.log.info|context.log.info>("Exception in the retry :"+str(e))
            yield Output(key="str_output",value="FAILED")
    except DagsterExecutionInterruptedError:
        <http://context.log.info|context.log.info>(">>>>>>>>>>>> RETRY >>>>>>>>>>>>>")
        raise DagsterExecutionInterruptedError
    except Exception as e:
        <http://context.log.info|context.log.info>("Completed>>>")
        yield Output(key="str_output",value="FAILED")

def get_op():
    return [OpDefinition(
            name="key",
            compute_fn=x_solid,
            outs={"str_output":Out()},
            retry_policy=RetryPolicy(
                max_retries=1,
                delay=0.2,  # 200ms
                backoff=Backoff.EXPONENTIAL,
                jitter=Jitter.PLUS_MINUS,
            ),
            tags={
            "max_retries": 3, 
            "retry_strategy": "ALL_STEPS"}
            )]
    
def get_job():
    graph_def = GraphDefinition(
        name="job",
        node_defs=get_op()
    )
    return graph_def.to_job(
         executor_def=multiprocess_executor #multiprocess_executor,
    )


@repository
def example_repo():
    return [
        get_job()]

# dagster version = 1.1.14
# dagster dev -f test.py
Any update on this issue ?
o
I see -- I would say that this is expected behavior. Terminating the run through the UI is expected to terminate the run as soon as possible, and so the multi-process behavior makes sense to me. The in-process seems a bit less intuitive to me, but I'm curious what the end goal here is for catching these exceptions. This type of error is used in dagster internals to signal that execution was intentionally interrupted, and so capturing / suppressing it is generally not recommended
s
We are facing dagsterExecutionInterruptedError when we are deploying on the k8 ( And it was raised by the cluster and adding the annotation as mentioned here solves the problem ). But we trying to handle the case when the pod/job annotation is not mentioned and retry is configured for the op, we want to retry it. So how we can solve this case ? To solve the above case, i tried to reproduce it locally using inprocess/multiprocess executor and facing different exceptions on different executor's for same terminate operation as mentioned above. Note : Tried op's retry policy / run retires / RetryRequested (manual op's retry ) - not working
o
I see -- in this case, you'll probably want to do your retries at the run level, not the op level. This error is happening due to the actual process that was running your op failing, so any handling within the op is unlikely to help, and it's best to just restart the run from the point of failure. Here are some docs on run retries: https://docs.dagster.io/deployment/run-retries#configuration
s
Have tried run-retires, but it was working only for in-process executor and it is failing on multiprocess executor for the above script.
Any update on this ?
o
can you say more about what was failing? run-level retries should work regardless of the executor, as it just activates a daemon that will look for failed runs and attempt to create new runs to finish up the failed steps. because of that, the exact code inside your ops shouldn't make a difference to the daemon -- once the run fails / an error occurs, a completely new run will be created