Hello. I want to submit another Job from Op and wa...
# ask-community
v
Hello. I want to submit another Job from Op and wait for it to complete. Now I'm using the Python API to check the status of a Job. Is there a better way to do this? Here is how I do it now:
Copy code
while True:
    run_status = client.get_run_status(new_run_id)
    if run_status == DagsterRunStatus.SUCCESS:
        context.log.debug(f"Job finished successfully: run_id: {new_run_id}")
        break
    elif run_status == DagsterRunStatus.FAILURE or run_status == DagsterRunStatus.CANCELED:
        raise ChildProcessError(f"Job Failed: run_id: {new_run_id}")
    time.sleep(1)
dagster bot responded by community 1
r
Hi again 🙂 • Instead of busy looping using
while True
inside a Op (using compute time), check out some retry policies using https://docs.dagster.io/concepts/ops-jobs-graphs/op-retries. It’ll make Dagster run manager (can be different unit depending on your deployment) handle the retries and the backoff time • If you’d like to execute something after a job finishes, check out Dagster Sensor: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors. You can even request another job to start from that sensor. Beware that sensors are running on the daemon (if I’m not wrong, and again - depends on the deployment you chose) and heavy logic should not be executed on them.
v
@Roei Jacobovich I don't think that sensor can helps me. I have a Job that contains 4 Ops. Each Op waits for the previous Op to complete. Ops A, B, and C start Job and wait for it to complete. They run like this: A => B => C => D. Can I use retry policies for this flow? How? If not, is there a better way to do this than I do this?
r
Retry policies can help with the busy waiting. But I’d re-think the separate jobs - Are you familiar with Dagster graphs? You can use graphs inside graphs. Do you plan to launch the jobs invoked from A, B, C separately? If not - there’s not reason to create another job for them, in my opinion.
v
Yes, I want to launch the jobs invoked from A, B, C separately. Another thing. I configured limit on the number of runs: Main Job: 20 Job A: 4 Job B: 1 Job C: 15
r
I see. If the only thing you do is to start the jobs you could create a complex sensor, something like (didn’t test it out!)
Copy code
@run_status_sensor(pipeline_run_status=DagsterRunStatus.SUCCESS)
def my_complex_sensor(context: RunStatusSensorContext):
    pipeline_name = context.pipeline_run.pipeline_name
    next_pipelines_mapping = {
        "A_JOB": "B_JOB",
        "B_JOB": "C_JOB",
        "C_JOB": "FNL_JOB",
    }
    next_pipeline = next_pipelines_mapping.get(pipeline_name)
    if next_pipeline:
        # Execute the next job
        # Warning: didn't test it out
        return RunRequest(job_name=next_pipeline)
That solution will allow you to let Dagster handle these dependencies. Run JOB_A with a schedule. FNL_JOB will run when JOB_C is done and successful
v
@Roei Jacobovich I don't understand how to use it. Also, I want to be able to execute part of the flow. For example: 1. A 2. A => B 3. A => B => C 4. A => B => C => D Now I am using conditional branching for this
r
In that case - use a regular job, with the code you mentioned at first. But I’d avoid the busy looping using retry policies as I mentioned before. Also - increase the
time.sleep
from 1 second, It’ll pollute
client.get_run_status
.
v
Okay. Thanks
🎉 1