Katrin Grunert

09/27/2022, 9:23 AM
Hey all! I am building a scheduled job, that should detect long running jobs, and notify the user in such a case. In the dagster-support channel I was able to find valuable resources (see ) I decided against using a
as I seem to be able to inspect only one job-state at a time, but I would need to monitor multiple jobs’ runtimes. This issue I run into with the code example below, is that I am not able to retrieve the run_stats for a job. No error is being logged and no code seems to be executed after trying to retrieve run_stats via the DagsterInstance found in the op-context. What could that issue be here? btw, I am running dagster version
from dagster import DagsterInstance, DagsterRun

def op_notify_long_runtime_jobs(context):
    instance: DagsterInstance = context.instance
    runs: Iterable[DagsterRun] = filter(
        lambda run: run.status == DagsterRunStatus.STARTED and
                    run.job_name != 'notify_long_runtime_jobs',

    def evaluate_run_time(r):
        print(f"{r.job_name} | {r.run_id} | {r.status} | {type(r)}") # anything below, is not being printed
        run_stats = instance.get_run_stats(r.run_id) 
        elapsed_time = time.time() - run_stats.start_time

        if elapsed_time > 20:  # TODO Configurable Limit in seconds
            print(f"Long running job {r.run_id}! Run for {elapsed_time} seconds ")  # TODO run slack notification

    [evaluate_run_time(r) for r in runs]

def notify_long_runtime_jobs():


09/27/2022, 4:30 PM
Hi Katrin. Not sure what the issue is here--I'm successfully able to run this code on 1.0.7. Can you double check that the run id provided to
is valid?