Sina
08/28/2023, 10:49 AM@sensor(description="Used to update the partitions, which represent the s assets",
minimum_interval_seconds=15*60, default_status=DefaultSensorStatus.RUNNING)
def s_assets_sensor(context: OpExecutionContext, asset_service: Asset_Service_Resource):
asset_client = asset_service.get_client()
try:
response = asset_client.GET_assets(page=1, per_page=1)
if response.status_code == 200:
<http://context.log.info|context.log.info>("Asset Service is available with status code 200 OK")
list_of_assets = get_assets_as_list_of_uuids(asset_service.get_client())
existing_partitions = set(context.instance.get_dynamic_partitions(partitions_def_name="s_assets_dynamic"))
incoming_partitions = set(list_of_assets)
removed_partitions = existing_partitions.difference(incoming_partitions)
added_partitions = incoming_partitions.difference(existing_partitions)
add_request = _partitions.s_assets_partition_dynamic.build_add_request(partition_keys=list(added_partitions))
delete_request = _partitions.s_assets_partition_dynamic.build_delete_request(partition_keys=list(removed_partitions))
yield SensorResult(dynamic_partitions_requests=[add_request, delete_request])
except Exception as e:
context.log.error(f"An error occurred while connecting to the asset service: {str(e)}")
yield SkipReason("Asset Service unavailable")
But I still get this message from the Dagster UI:
Result
Skipped
Used cursor value
None
Computed cursor value
None
Skip reason
Sensor function returned an empty result
Am I using the "Test Sensor" wrong, or am i still missing something, that I can't see?sandy
08/28/2023, 3:07 PMSina
08/29/2023, 2:31 PM<http://context.log.info|context.log.info>("Asset Service is available with status code 200 OK")
.
Also I wanted the Sensor to skip the execution of the rest of the code in the Sensor if a connection cannot be established, and continue the next time when it's supposed to run if there is a connection. (I tried that with yield SkipReason("Asset Service unavailable"
and yield SensorResult
. But I'm not sure how I can see that in the Dagster UI or test it properly, apart from the logs in my Terminal. I tried doing something with @run_failure_sensor
but i'm also struggling to implement that, or to see that in the Dagster UI. Is the @run_failure_sensor
supposed to be listed as a sensor in the UI, or how is this exactly working?
In the end I just want the sensor running, and to tell me why exactly it failed (e.g. no connection to asset service) and to continue running when the error isn't there anymore. So i'm confused, why the sensor is only Skipping, or telling me that the function returned an empty result?sandy
08/29/2023, 9:00 PMtheExactly. And here's how to see logs for it: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#logging-in-sensorssupposed to be listed as a sensor in the UI@run_failure_sensor