https://dagster.io/ logo
#ask-community
Title
# ask-community
n

Nicolas Parot Alvarez

01/11/2023, 5:10 PM
I'm trying to have a multi_asset_sensor that will start a job if any asset was materialized and if the job is not already queued/running. That's because the job is long and doing some full-refresh, so there's little point in adding a new run in the queue it if the last run is not done yet. Is it possible to get the status of the job during the sensor evaluation? I'm trying to hack something based on
context.instance.get_event_records()
as seen in the
run_status_sensor
, but it's getting overcomplicated.
s

sandy

01/11/2023, 7:25 PM
Hi @Nicolas Parot Alvarez - you could use
instance.get_runs
and filter based on
job_name
and
statuses
n

Nicolas Parot Alvarez

01/12/2023, 1:49 PM
Hey @sandy, awesome! That's exactly what I needed! Here's an example for the next one. I'm requesting a job2 run on assets updated only if there's no job updating the assets and if job2 is not already running:
Copy code
@multi_asset_sensor(
    asset_selection=AssetSelection.assets(*source_tables),
    job=job2,
    minimum_interval_seconds=10 * 60,
)
def updated_source_tables_sensor(
    context: MultiAssetSensorEvaluationContext,
):
    asset_events = context.latest_materialization_records_by_key()
    existing_runs_statuses = [
        DagsterRunStatus.QUEUED,
        DagsterRunStatus.NOT_STARTED,
        DagsterRunStatus.STARTING,
        DagsterRunStatus.STARTED,
    ]
    existing_update_source_tables_job_runs = context.instance.get_runs(
        RunsFilter(
            job_name=update_source_tables_job.name,
            statuses=existing_runs_statuses,
        ),
    )
    existing_job2_runs = context.instance.get_runs(
        RunsFilter(
            job_name=job2.name,
            statuses=existing_runs_statuses,
        ),
    )
    # Check asset update
    if any(asset_events.values()):
        # Check if update_source_tables_job finished running and job2 is not already running
        if not existing_update_source_tables_job_runs and not existing_job2_runs:
            context.advance_all_cursors()
            return RunRequest()
        else:
            materialized_asset_keys = [
                key.to_user_string() for key, value in asset_events.items() if value
            ]
            running_job_names = (
                f"{update_source_tables_job.name} and {job2.name}"
                if existing_update_source_tables_job_runs and existing_job2_runs
                else (
                    update_source_tables_job.name
                    if existing_update_source_tables_job_runs
                    else job2.name
                )
            )
            return SkipReason(
                f"Observed materializations for {materialized_asset_keys}, "
                f"but runs have started for {running_job_names}."
            )
    else:
        return SkipReason(
            f"No observed materialization for {[asset_key.to_user_string() for asset_key in asset_events]}"
        )
3 Views