Bernardo Cortez
05/18/2023, 10:53 AMfrom dagster import build_sensor_context, instance_for_test, validate_run_config
import os
PRIOR_JOB_PATH = ...
POSTERIOR_JOB = ...
SENSOR_IN_TEST = ...
def core_execute_job(job_path):
"""Executes a job using the Dagster CLI
To be revised, since execute_job function rom dagster, currently isn't working
This creates a job config, saves it into local storage in conig.generated_job_config_path and runs the job with that config
"""
run_config = ...
run_config.save_run_config_in_memory()
# We need to use a different DAGSTER_HOME, so that the tests aren't logged in Dagit
os.system(f"DAGSTER_HOME=/opt/dagster/dagster_home_test dagster job execute -f {job_path} --config {run_config.generated_job_config_path}")
def get_and_assert_run_requests(context):
for run_request in SENSOR_IN_TEST(context):
print(f'Yielding run request {run_request.run_key}')
assert validate_run_config(POSTERIOR_JOB, run_request.run_config)
def test_multiple_detections(context):
print('\nTesting the detection of multiple RunRequests')
n = 2 # number of RunRequests to detect
for _ in range(n):
# Run prior job
core_execute_job(job_path=PRIOR_JOB_PATH)
get_and_assert_run_requests(context)
with instance_for_test() as instance:
context = build_sensor_context(instance=instance, cursor='0')
multiple_detections_result, run_key = test_multiple_detections(context)
When running this, the SENSOR_IN_TEST(context)
returns no RunRequests, even though core_execute_job
has ran twice, as if it s AssetObservations are being stored somewhere else. Can you help me understand what is going on?
We are using dagster 1.3.1jamie
05/18/2023, 2:33 PMexecute_in_process
function so that you can use a debugger to step through the job execution and make sure things are happening as you expect.