https://dagster.io/ logo
#dagster-support
Title
# dagster-support
v

Vlad Efanov

07/03/2022, 12:04 PM
Hello. I want to submit another Job from Op and wait for it to complete. Now I'm using the Python API to check the status of a Job. Is there a better way to do this? Here is how I do it now:
Copy code
while True:
    run_status = client.get_run_status(new_run_id)
    if run_status == DagsterRunStatus.SUCCESS:
        context.log.debug(f"Job finished successfully: run_id: {new_run_id}")
        break
    elif run_status == DagsterRunStatus.FAILURE or run_status == DagsterRunStatus.CANCELED:
        raise ChildProcessError(f"Job Failed: run_id: {new_run_id}")
    time.sleep(1)
dagster bot responded by community 1
r

Roei Jacobovich

07/03/2022, 12:23 PM
Hi again 🙂 • Instead of busy looping using
while True
inside a Op (using compute time), check out some retry policies using https://docs.dagster.io/concepts/ops-jobs-graphs/op-retries. It’ll make Dagster run manager (can be different unit depending on your deployment) handle the retries and the backoff time • If you’d like to execute something after a job finishes, check out Dagster Sensor: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors. You can even request another job to start from that sensor. Beware that sensors are running on the daemon (if I’m not wrong, and again - depends on the deployment you chose) and heavy logic should not be executed on them.
v

Vlad Efanov

07/03/2022, 1:38 PM
@Roei Jacobovich I don't think that sensor can helps me. I have a Job that contains 4 Ops. Each Op waits for the previous Op to complete. Ops A, B, and C start Job and wait for it to complete. They run like this: A => B => C => D. Can I use retry policies for this flow? How? If not, is there a better way to do this than I do this?
r

Roei Jacobovich

07/03/2022, 1:43 PM
Retry policies can help with the busy waiting. But I’d re-think the separate jobs - Are you familiar with Dagster graphs? You can use graphs inside graphs. Do you plan to launch the jobs invoked from A, B, C separately? If not - there’s not reason to create another job for them, in my opinion.
v

Vlad Efanov

07/03/2022, 2:11 PM
Yes, I want to launch the jobs invoked from A, B, C separately. Another thing. I configured limit on the number of runs: Main Job: 20 Job A: 4 Job B: 1 Job C: 15
r

Roei Jacobovich

07/03/2022, 2:28 PM
I see. If the only thing you do is to start the jobs you could create a complex sensor, something like (didn’t test it out!)
Copy code
@run_status_sensor(pipeline_run_status=DagsterRunStatus.SUCCESS)
def my_complex_sensor(context: RunStatusSensorContext):
    pipeline_name = context.pipeline_run.pipeline_name
    next_pipelines_mapping = {
        "A_JOB": "B_JOB",
        "B_JOB": "C_JOB",
        "C_JOB": "FNL_JOB",
    }
    next_pipeline = next_pipelines_mapping.get(pipeline_name)
    if next_pipeline:
        # Execute the next job
        # Warning: didn't test it out
        return RunRequest(job_name=next_pipeline)
That solution will allow you to let Dagster handle these dependencies. Run JOB_A with a schedule. FNL_JOB will run when JOB_C is done and successful
v

Vlad Efanov

07/03/2022, 2:52 PM
@Roei Jacobovich I don't understand how to use it. Also, I want to be able to execute part of the flow. For example: 1. A 2. A => B 3. A => B => C 4. A => B => C => D Now I am using conditional branching for this
r

Roei Jacobovich

07/03/2022, 3:32 PM
In that case - use a regular job, with the code you mentioned at first. But I’d avoid the busy looping using retry policies as I mentioned before. Also - increase the
time.sleep
from 1 second, It’ll pollute
client.get_run_status
.
v

Vlad Efanov

07/03/2022, 3:39 PM
Okay. Thanks
🎉 1