Katrin Grunert
09/27/2022, 9:23 AMRunStatusSensor
as I seem to be able to inspect only one job-state at a time, but I would need to monitor multiple jobs’ runtimes.
This issue I run into with the code example below, is that I am not able to retrieve the run_stats for a job. No error is being logged and no code seems to be executed after trying to retrieve run_stats via the DagsterInstance found in the op-context.
What could that issue be here? btw, I am running dagster version 1.0.7
from dagster import DagsterInstance, DagsterRun
@op()
def op_notify_long_runtime_jobs(context):
instance: DagsterInstance = context.instance
runs: Iterable[DagsterRun] = filter(
lambda run: run.status == DagsterRunStatus.STARTED and
run.job_name != 'notify_long_runtime_jobs',
instance.get_runs())
def evaluate_run_time(r):
print(f"{r.job_name} | {r.run_id} | {r.status} | {type(r)}") # anything below, is not being printed
print(f"{instance.get_run_stats(r.run_id)}")
run_stats = instance.get_run_stats(r.run_id)
elapsed_time = time.time() - run_stats.start_time
if elapsed_time > 20: # TODO Configurable Limit in seconds
print(f"Long running job {r.run_id}! Run for {elapsed_time} seconds ") # TODO run slack notification
[evaluate_run_time(r) for r in runs]
@job()
def notify_long_runtime_jobs():
op_notify_long_runtime_jobs()
claire
09/27/2022, 4:30 PMget_run_stats
is valid?