Hi! We have sequence of jobs that must run when a ...
# ask-community
b
Hi! We have sequence of jobs that must run when a given schedule runs. To trigger them sequentially, the last thing a job does is to yield an AssetObservation that is caught by an asset sensor that triggers the next job. One of the tests we perform on this sensor is to run the "prior" job twice and, then, check that the sensor yields two RunRequests. However, this has stopped working and we suspect that it might be due to some dagster function being deprecated in recent releases.
from dagster import build_sensor_context, instance_for_test, validate_run_config
import os
PRIOR_JOB_PATH =  ...
POSTERIOR_JOB =  ...
SENSOR_IN_TEST = ...
def core_execute_job(job_path):
"""Executes a job using the Dagster CLI
To be revised, since execute_job function rom dagster, currently isn't working
This creates a job config, saves it into local storage in conig.generated_job_config_path and runs the job with that config
"""
run_config = ...
run_config.save_run_config_in_memory()
# We need to use a different DAGSTER_HOME, so that the tests aren't logged in Dagit
os.system(f"DAGSTER_HOME=/opt/dagster/dagster_home_test dagster job execute -f {job_path} --config {run_config.generated_job_config_path}")
def get_and_assert_run_requests(context):
for run_request in SENSOR_IN_TEST(context):
print(f'Yielding run request {run_request.run_key}')
assert validate_run_config(POSTERIOR_JOB, run_request.run_config)
def test_multiple_detections(context):
print('\nTesting the detection of multiple RunRequests')
n = 2   # number of RunRequests to detect
for _ in range(n):
# Run prior job
core_execute_job(job_path=PRIOR_JOB_PATH)
get_and_assert_run_requests(context)
with instance_for_test() as instance:
context = build_sensor_context(instance=instance, cursor='0')
multiple_detections_result, run_key = test_multiple_detections(context)
When running this, the
SENSOR_IN_TEST(context)
returns no RunRequests, even though
core_execute_job
has ran twice, as if it s AssetObservations are being stored somewhere else. Can you help me understand what is going on? We are using dagster 1.3.1
j
hey @Bernardo Cortez it’s possible something has been deprecated (we recently removed some old APIs that have been marked for deprecation for around a year). i don’t know for certain if that is the case here since i don’t know what the internals of your job and sensor look like. However, the removals shouldn’t have changed any non-deprecated code paths and you’d likely only see issues if you were directly using the deprecated APIs (in which case you would have gotten an import error). As far as i know, nothing related to AssetObservations should have changed. If you can, i’d recommend executing your job using the
execute_in_process
function so that you can use a debugger to step through the job execution and make sure things are happening as you expect.