Vasco Villas-Boas
06/28/2023, 9:30 PMVasco Villas-Boas
06/28/2023, 9:48 PMHarry Park
06/28/2023, 9:52 PMVasco Villas-Boas
06/28/2023, 9:58 PMVasco Villas-Boas
06/28/2023, 9:58 PMsean
06/28/2023, 11:35 PMon the dagster asset RetryPolicy object, is there a way to rerun an asset only if it fails to materialize, not if the run is cancelled?If a run is manually terminated, the asset’s retry policy shouldn’t do anything. Try this and terminate the run, no retry occurs.
from time import sleep
from dagster import asset, define_asset_job, Definitions, RetryPolicy
@asset(retry_policy=RetryPolicy(max_retries=3))
def foo_asset():
sleep(5)
return 1
defs = Definitions(
assets=[foo_asset],
jobs=[define_asset_job(name="my_job")],
)
In addition, if a job is running in_process and an asset (with a non-null retry policy fails), the job restarts, but in the same process?This should cause the op associated with the failing asset to restart, not the entire job.
Is there a way to have dagster run a job inprocess but on retry, create a new process?Pretty sure we don’t offer an API for this.
Vasco Villas-Boas
06/29/2023, 12:42 PMsean
06/29/2023, 1:56 PMsean
06/29/2023, 2:04 PMDagsterExecutionInterruptedError
and a STEP_UP_FOR_RETRY
event, but the retry never actually happens because of the run cancellation:Vasco Villas-Boas
06/29/2023, 2:10 PMin_process_executor
, I wonder if that could cause different behavior, though I wouldn't expect that?Vasco Villas-Boas
06/29/2023, 2:10 PM@asset(
retry_policy=RetryPolicy(max_retries=1),
name="pasta_asset",
)
def pasta():
time.sleep(60)
now = datetime.datetime.now()
if now.microsecond <= 100:
raise AssertionError("fail for now")
else:
return "pasta"
@asset(
retry_policy=RetryPolicy(max_retries=1),
non_argument_deps={"pasta_asset"},
)
def return_60():
time.sleep(30)
return 60
return_60_job = define_asset_job(name="return_60_job", selection=[return_60, pasta])
Vasco Villas-Boas
06/29/2023, 2:12 PMsean
06/29/2023, 2:19 PMI am running with the in_process_executor , I wonder if that could cause different behavior, though I wouldn’t expect that?I was just able to replicate using the `in_process_executor`:
from time import sleep
from dagster import asset, define_asset_job, Definitions, RetryPolicy
from dagster._core.definitions.executor_definition import in_process_executor
@asset(retry_policy=RetryPolicy(max_retries=3))
def foo_asset():
sleep(5)
return 1
defs = Definitions(
assets=[foo_asset],
jobs=[define_asset_job(name="my_job", executor_def=in_process_executor)],
)
This is a legitimate bug that we will fix. I’ve opened an issue: https://github.com/dagster-io/dagster/issues/15026
Thanks for the report!Vasco Villas-Boas
06/29/2023, 2:20 PMVasco Villas-Boas
06/29/2023, 2:22 PMsean
06/29/2023, 2:34 PMin_process_executor
outside testing is not too commonVasco Villas-Boas
06/29/2023, 2:44 PM