Natalie Rozak

05/09/2023, 2:32 PM
Hi everyone! I have a job that returns a value, and I want to create a sensor that sees this return value and responds based on the return value. I've been having trouble figuring out how to do this with Dagster, so I'd really appreciate advice on this. Thank you 🙂
def get_predictions():
    return accuracy

def predictions_pipeline():
    accuracy = get_predictions()
    return accuracy

predictions_job = predictions_pipeline.to_job()

def retrain_model():
   # get accuracy from predictions_job
   # if accuracy < 0.7, retrain the model


05/09/2023, 4:31 PM
Hi! One way would be to add metadata to the materializations and have a sensor that detects new materializations where it is possible to retrieve the metadata and compare it to 0.7 and possibly trigger a new execution. I am not an expert and maybe there are cleaner solutions

Le Yang

05/09/2023, 11:08 PM
Check out the AssetSensor topic that seems to fit this scenario.


05/10/2023, 1:32 PM
Hi Natalie, Two possibilities for you: • You can use a Run Status Sensor to take action whenever a new run of your
completes. The record for your job isn’t going to store your
value by default, so you need to make sure you’re using an IO manager that writes it somewhere you can access, then you can read the value in the sensor. • Along the lines of what Jordan is saying, you can restructure your pipeline using software defined assets.
can become an asset, and you can use an Asset sensor to respond to new materializations of the asset. As with the first solution, the
value associated with the asset isn’t directly available in the sensor, but you can either: ◦ record the value as metadata in the generated materialization ◦ store e.g. a path in the metadata that will let you read the value