Hello! I’m having a hard time wrapping my head aro...
# ask-community
b
Hello! I’m having a hard time wrapping my head around op factories. Can someone point me towards a fully implemented example of an op factory in practice please?
dagster bot responded by community 1
d
don’t have an Op factory example specifically but here is a sensor factory that we use which is basically the same idea:
Copy code
def create_ping_cronitor_monitor_sensor(
    name: str, monitor_key: str, api_key: str, env: str, state: CronitorState, status: PipelineRunStatus, **kwargs
) -> RunStatusSensorDefinition:
    """Factory for creating Dagster run status sensors that ping Cronitor Monitors with state changes."""

    @run_status_sensor(
        pipeline_run_status=status,
        name=name,
        description=f"A Dagster run status sensor for pinging a Cronitor Monitor with a '{state}' state change.",
        **kwargs,
    )
    def _ping_cronitor_monitor_sensor(context: RunStatusSensorContext):
        monitor = Monitor(
            key=monitor_key,
            api_key=api_key,
            env=env,
        )
        monitor.ping(state=state, env=env, series=context.dagster_run.run_id)

    return _ping_cronitor_monitor_sensor
dagster bot responded by community 1
and then using it practice looks like:
Copy code
CRONITOR_SENSORS: list[tuple[str, JobDefinition]] = [
    (DBT_HOURLY_CRONITOR_MONITOR_KEY, dbt_hourly_job),
    (DBT_DAILY_CRONITOR_MONITOR_KEY, dbt_daily_job),
    (DBT_DAILY_SNAPSHOT_CRONITOR_MONITOR_KEY, dbt_daily_snapshot_job),
    (DBT_ORDERS_REFRESH_CRONITOR_MONITOR_KEY, dbt_orders_refresh_job),
    (DBT_FRESHNESS_CRONITOR_MONITOR_KEY, dbt_source_freshness_job),
]

dbt_sensors: list[RunStatusSensorDefinition] = []
for key, job in CRONITOR_SENSORS:
    sensor = create_ping_cronitor_monitor_sensor(
        name=f"{job.name}_fail",
        monitor_key=key,
        api_key=os.getenv("CRONITOR_PING_API_KEY"),
        env=os.getenv("CRONITOR_ENVIRONMENT"),
        state="fail",
        status=PipelineRunStatus.FAILURE,
        job_selection=[job],
    )
    dbt_sensors.append(sensor)
r
There’s also a section at the bottom with examples in projects: https://docs.dagster.io/concepts/ops-jobs-graphs/ops#see-it-in-action