Hello, I changed my Sensor a bit (hoping it would...
# ask-community
s
Hello, I changed my Sensor a bit (hoping it would work):
Copy code
@sensor(description="Used to update the partitions, which represent the s assets",
        minimum_interval_seconds=15*60, default_status=DefaultSensorStatus.RUNNING)
def s_assets_sensor(context: OpExecutionContext, asset_service: Asset_Service_Resource):
    
    asset_client = asset_service.get_client()
    
    try:
        response = asset_client.GET_assets(page=1, per_page=1)

        if response.status_code == 200:
            <http://context.log.info|context.log.info>("Asset Service is available with status code 200 OK")
           
            list_of_assets = get_assets_as_list_of_uuids(asset_service.get_client())
            existing_partitions = set(context.instance.get_dynamic_partitions(partitions_def_name="s_assets_dynamic"))
            incoming_partitions = set(list_of_assets)
            removed_partitions = existing_partitions.difference(incoming_partitions)
            added_partitions = incoming_partitions.difference(existing_partitions)
            add_request = _partitions.s_assets_partition_dynamic.build_add_request(partition_keys=list(added_partitions))
            delete_request = _partitions.s_assets_partition_dynamic.build_delete_request(partition_keys=list(removed_partitions)) 

            yield SensorResult(dynamic_partitions_requests=[add_request, delete_request])
    except Exception as e:
        context.log.error(f"An error occurred while connecting to the asset service: {str(e)}")
        yield SkipReason("Asset Service unavailable")
But I still get this message from the Dagster UI: Result Skipped Used cursor value None Computed cursor value None Skip reason Sensor function returned an empty result Am I using the "Test Sensor" wrong, or am i still missing something, that I can't see?
s
Hi Sina - from what you've written, it's a little difficult to know what behavior you're expecting and how what you're observing is deviating from it. Are you able to be more specific about what you're trying to accomplish and what Dagster APIs aren't working as expected?
s
Hi Sandy, thank you for your reply! I have a custom resource API. With my sensor, I wanted to check if there is successful connection being established with my asset_service. From the logs I can see if the connection was established or not with
<http://context.log.info|context.log.info>("Asset Service is available with status code 200 OK")
. Also I wanted the Sensor to skip the execution of the rest of the code in the Sensor if a connection cannot be established, and continue the next time when it's supposed to run if there is a connection. (I tried that with
yield SkipReason("Asset Service unavailable"
and
yield SensorResult
. But I'm not sure how I can see that in the Dagster UI or test it properly, apart from the logs in my Terminal. I tried doing something with
@run_failure_sensor
but i'm also struggling to implement that, or to see that in the Dagster UI. Is the
@run_failure_sensor
supposed to be listed as a sensor in the UI, or how is this exactly working? In the end I just want the sensor running, and to tell me why exactly it failed (e.g. no connection to asset service) and to continue running when the error isn't there anymore. So i'm confused, why the sensor is only Skipping, or telling me that the function returned an empty result?
s
the
@run_failure_sensor
supposed to be listed as a sensor in the UI
Exactly. And here's how to see logs for it: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#logging-in-sensors