Hey all! I am building a scheduled job, that shoul...
# ask-community
k
Hey all! I am building a scheduled job, that should detect long running jobs, and notify the user in such a case. In the dagster-support channel I was able to find valuable resources (see https://dagster.slack.com/archives/C01U954MEER/p1628717201346100?thread_ts=1628715859.344300&cid=C01U954MEER ) I decided against using a
RunStatusSensor
as I seem to be able to inspect only one job-state at a time, but I would need to monitor multiple jobs’ runtimes. This issue I run into with the code example below, is that I am not able to retrieve the run_stats for a job. No error is being logged and no code seems to be executed after trying to retrieve run_stats via the DagsterInstance found in the op-context. What could that issue be here? btw, I am running dagster version
1.0.7
Copy code
from dagster import DagsterInstance, DagsterRun

@op()
def op_notify_long_runtime_jobs(context):
    instance: DagsterInstance = context.instance
    runs: Iterable[DagsterRun] = filter(
        lambda run: run.status == DagsterRunStatus.STARTED and
                    run.job_name != 'notify_long_runtime_jobs',
        instance.get_runs())

    def evaluate_run_time(r):
        print(f"{r.job_name} | {r.run_id} | {r.status} | {type(r)}") # anything below, is not being printed
        print(f"{instance.get_run_stats(r.run_id)}") 
        run_stats = instance.get_run_stats(r.run_id) 
        elapsed_time = time.time() - run_stats.start_time

        if elapsed_time > 20:  # TODO Configurable Limit in seconds
            print(f"Long running job {r.run_id}! Run for {elapsed_time} seconds ")  # TODO run slack notification

    [evaluate_run_time(r) for r in runs]


@job()
def notify_long_runtime_jobs():
    op_notify_long_runtime_jobs()
c
Hi Katrin. Not sure what the issue is here--I'm successfully able to run this code on 1.0.7. Can you double check that the run id provided to
get_run_stats
is valid?