Bennett Norman
06/29/2022, 8:10 PMdwall
06/29/2022, 8:13 PMdef create_ping_cronitor_monitor_sensor(
name: str, monitor_key: str, api_key: str, env: str, state: CronitorState, status: PipelineRunStatus, **kwargs
) -> RunStatusSensorDefinition:
"""Factory for creating Dagster run status sensors that ping Cronitor Monitors with state changes."""
@run_status_sensor(
pipeline_run_status=status,
name=name,
description=f"A Dagster run status sensor for pinging a Cronitor Monitor with a '{state}' state change.",
**kwargs,
)
def _ping_cronitor_monitor_sensor(context: RunStatusSensorContext):
monitor = Monitor(
key=monitor_key,
api_key=api_key,
env=env,
)
monitor.ping(state=state, env=env, series=context.dagster_run.run_id)
return _ping_cronitor_monitor_sensor
dwall
06/29/2022, 8:14 PMCRONITOR_SENSORS: list[tuple[str, JobDefinition]] = [
(DBT_HOURLY_CRONITOR_MONITOR_KEY, dbt_hourly_job),
(DBT_DAILY_CRONITOR_MONITOR_KEY, dbt_daily_job),
(DBT_DAILY_SNAPSHOT_CRONITOR_MONITOR_KEY, dbt_daily_snapshot_job),
(DBT_ORDERS_REFRESH_CRONITOR_MONITOR_KEY, dbt_orders_refresh_job),
(DBT_FRESHNESS_CRONITOR_MONITOR_KEY, dbt_source_freshness_job),
]
dbt_sensors: list[RunStatusSensorDefinition] = []
for key, job in CRONITOR_SENSORS:
sensor = create_ping_cronitor_monitor_sensor(
name=f"{job.name}_fail",
monitor_key=key,
api_key=os.getenv("CRONITOR_PING_API_KEY"),
env=os.getenv("CRONITOR_ENVIRONMENT"),
state="fail",
status=PipelineRunStatus.FAILURE,
job_selection=[job],
)
dbt_sensors.append(sensor)
rex
06/29/2022, 8:57 PM