https://dagster.io/ logo
#ask-community
Title
# ask-community
v

Vasco Villas-Boas

06/28/2023, 9:30 PM
Hi, on the dagster asset RetryPolicy object, is there a way to rerun an asset only if it fails to materialize, not if the run is cancelled?
🤖 1
In addition, if a job is running in_process and an asset (with a non-null retry policy fails), the job restarts, but in the same process?
h

Harry Park

06/28/2023, 9:52 PM
You write the op, so you can kick out whatever Failure you want. You can set the allow_retries flag to specifically respecte the Retry policy or not https://docs.dagster.io/_apidocs/ops#dagster.Failure
v

Vasco Villas-Boas

06/28/2023, 9:58 PM
Is there a way to generally configure the different dagster run statuses where retries are allowed? Could you point me to a small code snippet where you catch Failures and permit retries but for cancellations don't permit retries?
Is there a way to have dagster run a job inprocess but on retry, create a new process?
s

sean

06/28/2023, 11:35 PM
on the dagster asset RetryPolicy object, is there a way to rerun an asset only if it fails to materialize, not if the run is cancelled?
If a run is manually terminated, the asset’s retry policy shouldn’t do anything. Try this and terminate the run, no retry occurs.
Copy code
from time import sleep
from dagster import asset, define_asset_job, Definitions, RetryPolicy

@asset(retry_policy=RetryPolicy(max_retries=3))
def foo_asset():
    sleep(5)
    return 1

defs = Definitions(
    assets=[foo_asset],
    jobs=[define_asset_job(name="my_job")],
)
In addition, if a job is running in_process and an asset (with a non-null retry policy fails), the job restarts, but in the same process?
This should cause the op associated with the failing asset to restart, not the entire job.
Is there a way to have dagster run a job inprocess but on retry, create a new process?
Pretty sure we don’t offer an API for this.
v

Vasco Villas-Boas

06/29/2023, 12:42 PM
Thanks for the follow ups. I tried manual termination and it seems like I'm not able to replicate. I'm seeing that dagster raises "dagster._core.errors.DagsterExecutionInterruptedError" on manual termination but then it retires the asset
s

sean

06/29/2023, 1:56 PM
How are you manually terminating?
Ah ok so looking a little more carefully-- I am terminating here by hitting the “Terminate” button in dagit. My event log shows
DagsterExecutionInterruptedError
and a
STEP_UP_FOR_RETRY
event, but the retry never actually happens because of the run cancellation:
v

Vasco Villas-Boas

06/29/2023, 2:10 PM
Here's an example where I'm getting a retry after manual termination. I am running with the
in_process_executor
, I wonder if that could cause different behavior, though I wouldn't expect that?
Copy code
@asset(
    retry_policy=RetryPolicy(max_retries=1),
    name="pasta_asset",
)
def pasta():
    time.sleep(60)
    now = datetime.datetime.now()
    if now.microsecond <= 100:
        raise AssertionError("fail for now")
    else:
        return "pasta"


@asset(
    retry_policy=RetryPolicy(max_retries=1),
    non_argument_deps={"pasta_asset"},
)
def return_60():
    time.sleep(30)
    return 60


return_60_job = define_asset_job(name="return_60_job", selection=[return_60, pasta])
I'm manually terminating by clicking the red "Terminate" button in the top right, then clicking "Terminate 1 Run", and then clicking "Done"
s

sean

06/29/2023, 2:19 PM
I am running with the in_process_executor , I wonder if that could cause different behavior, though I wouldn’t expect that?
I was just able to replicate using the `in_process_executor`:
Copy code
from time import sleep
from dagster import asset, define_asset_job, Definitions, RetryPolicy
from dagster._core.definitions.executor_definition import in_process_executor

@asset(retry_policy=RetryPolicy(max_retries=3))
def foo_asset():
    sleep(5)
    return 1

defs = Definitions(
    assets=[foo_asset],
    jobs=[define_asset_job(name="my_job", executor_def=in_process_executor)],
)
This is a legitimate bug that we will fix. I’ve opened an issue: https://github.com/dagster-io/dagster/issues/15026 Thanks for the report!
v

Vasco Villas-Boas

06/29/2023, 2:20 PM
Thank you, do you know what is the timeline on these bug fixes?
I did find a workaround which is to put the asset code inside of a try except. I catch dagster.DagsterExecutionInterruptedError and then raise dg.Failure with allow_retries set to False
s

sean

06/29/2023, 2:34 PM
Can’t give a timeline on this one immediately-- not clear how hard the fix is and use of
in_process_executor
outside testing is not too common
v

Vasco Villas-Boas

06/29/2023, 2:44 PM
Ok gotcha thank you
18 Views