Vlad Efanov
07/03/2022, 12:04 PMwhile True:
run_status = client.get_run_status(new_run_id)
if run_status == DagsterRunStatus.SUCCESS:
context.log.debug(f"Job finished successfully: run_id: {new_run_id}")
break
elif run_status == DagsterRunStatus.FAILURE or run_status == DagsterRunStatus.CANCELED:
raise ChildProcessError(f"Job Failed: run_id: {new_run_id}")
time.sleep(1)
Roei Jacobovich
07/03/2022, 12:23 PMwhile True
inside a Op (using compute time), check out some retry policies using https://docs.dagster.io/concepts/ops-jobs-graphs/op-retries. It’ll make Dagster run manager (can be different unit depending on your deployment) handle the retries and the backoff time
• If you’d like to execute something after a job finishes, check out Dagster Sensor: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors. You can even request another job to start from that sensor. Beware that sensors are running on the daemon (if I’m not wrong, and again - depends on the deployment you chose) and heavy logic should not be executed on them.Vlad Efanov
07/03/2022, 1:38 PMRoei Jacobovich
07/03/2022, 1:43 PMVlad Efanov
07/03/2022, 2:11 PMRoei Jacobovich
07/03/2022, 2:28 PM@run_status_sensor(pipeline_run_status=DagsterRunStatus.SUCCESS)
def my_complex_sensor(context: RunStatusSensorContext):
pipeline_name = context.pipeline_run.pipeline_name
next_pipelines_mapping = {
"A_JOB": "B_JOB",
"B_JOB": "C_JOB",
"C_JOB": "FNL_JOB",
}
next_pipeline = next_pipelines_mapping.get(pipeline_name)
if next_pipeline:
# Execute the next job
# Warning: didn't test it out
return RunRequest(job_name=next_pipeline)
RunRequest
definition https://docs.dagster.io/_apidocs/schedules-sensors#dagster.RunRequestVlad Efanov
07/03/2022, 2:52 PMRoei Jacobovich
07/03/2022, 3:32 PMtime.sleep
from 1 second, It’ll pollute client.get_run_status
.Vlad Efanov
07/03/2022, 3:39 PM