Hi, on the dagster asset RetryPolicy object, is th...
# ask-community
v
Hi, on the dagster asset RetryPolicy object, is there a way to rerun an asset only if it fails to materialize, not if the run is cancelled?
🤖 1
In addition, if a job is running in_process and an asset (with a non-null retry policy fails), the job restarts, but in the same process?
h
You write the op, so you can kick out whatever Failure you want. You can set the allow_retries flag to specifically respecte the Retry policy or not https://docs.dagster.io/_apidocs/ops#dagster.Failure
v
Is there a way to generally configure the different dagster run statuses where retries are allowed? Could you point me to a small code snippet where you catch Failures and permit retries but for cancellations don't permit retries?
Is there a way to have dagster run a job inprocess but on retry, create a new process?
s
on the dagster asset RetryPolicy object, is there a way to rerun an asset only if it fails to materialize, not if the run is cancelled?
If a run is manually terminated, the asset’s retry policy shouldn’t do anything. Try this and terminate the run, no retry occurs.
Copy code
from time import sleep
from dagster import asset, define_asset_job, Definitions, RetryPolicy

@asset(retry_policy=RetryPolicy(max_retries=3))
def foo_asset():
    sleep(5)
    return 1

defs = Definitions(
    assets=[foo_asset],
    jobs=[define_asset_job(name="my_job")],
)
In addition, if a job is running in_process and an asset (with a non-null retry policy fails), the job restarts, but in the same process?
This should cause the op associated with the failing asset to restart, not the entire job.
Is there a way to have dagster run a job inprocess but on retry, create a new process?
Pretty sure we don’t offer an API for this.
v
Thanks for the follow ups. I tried manual termination and it seems like I'm not able to replicate. I'm seeing that dagster raises "dagster._core.errors.DagsterExecutionInterruptedError" on manual termination but then it retires the asset
s
How are you manually terminating?
Ah ok so looking a little more carefully-- I am terminating here by hitting the “Terminate” button in dagit. My event log shows
DagsterExecutionInterruptedError
and a
STEP_UP_FOR_RETRY
event, but the retry never actually happens because of the run cancellation:
v
Here's an example where I'm getting a retry after manual termination. I am running with the
in_process_executor
, I wonder if that could cause different behavior, though I wouldn't expect that?
Copy code
@asset(
    retry_policy=RetryPolicy(max_retries=1),
    name="pasta_asset",
)
def pasta():
    time.sleep(60)
    now = datetime.datetime.now()
    if now.microsecond <= 100:
        raise AssertionError("fail for now")
    else:
        return "pasta"


@asset(
    retry_policy=RetryPolicy(max_retries=1),
    non_argument_deps={"pasta_asset"},
)
def return_60():
    time.sleep(30)
    return 60


return_60_job = define_asset_job(name="return_60_job", selection=[return_60, pasta])
I'm manually terminating by clicking the red "Terminate" button in the top right, then clicking "Terminate 1 Run", and then clicking "Done"
s
I am running with the in_process_executor , I wonder if that could cause different behavior, though I wouldn’t expect that?
I was just able to replicate using the `in_process_executor`:
Copy code
from time import sleep
from dagster import asset, define_asset_job, Definitions, RetryPolicy
from dagster._core.definitions.executor_definition import in_process_executor

@asset(retry_policy=RetryPolicy(max_retries=3))
def foo_asset():
    sleep(5)
    return 1

defs = Definitions(
    assets=[foo_asset],
    jobs=[define_asset_job(name="my_job", executor_def=in_process_executor)],
)
This is a legitimate bug that we will fix. I’ve opened an issue: https://github.com/dagster-io/dagster/issues/15026 Thanks for the report!
v
Thank you, do you know what is the timeline on these bug fixes?
I did find a workaround which is to put the asset code inside of a try except. I catch dagster.DagsterExecutionInterruptedError and then raise dg.Failure with allow_retries set to False
s
Can’t give a timeline on this one immediately-- not clear how hard the fix is and use of
in_process_executor
outside testing is not too common
v
Ok gotcha thank you