Arun Kumar
07/13/2022, 1:24 AM@sensor_tracker
which can be added to all the sensors on top of the @sensor
decorator to do metrics tracking. However, I might have to look into the sensor return result after calling the function to check if a sensor tick has yielded a run request or not. Looks like sensor is currently returning a generator which should be iterated in order to check for run request. Is there anyway I can do this safely without affecting the Dagster flow?yuhan
07/13/2022, 8:22 PMsensor_tracker
as a generator.Arun Kumar
07/14/2022, 12:35 AMdef sensor_tracker(sensor_type: SensorType, sensor_name: str):
def call_sensor_and_track(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = list(func(*args, **kwargs))
gen, gen_return = tee(result)
print("track success metric")
return gen_return
except Exception as e:
print("track error metric")
return wrapper
return call_sensor_and_track
@sensor_tracker(sensor_type=SensorType.EVENT_SOURCE, sensor_name="name")
@sensor(
name="name",
job=job,
minimum_interval_seconds=180,
)
def check_sensor():
yield SkipReason("Skipped")
Error:
File "/usr/local/lib/python3.7/site-packages/dagster/grpc/server.py", line 474, in StreamingExternalRepository
serialized_external_repository_data = self._get_serialized_external_repository_data(request)
File "/usr/local/lib/python3.7/site-packages/dagster/grpc/server.py", line 464, in _get_serialized_external_repository_data
external_repository_data_from_def(recon_repo.get_definition())
File "/usr/local/lib/python3.7/site-packages/dagster/core/host_representation/external_data.py", line 764, in external_repository_data_from_def
list(map(external_sensor_data_from_def, repository_def.sensor_defs)),
File "/usr/local/lib/python3.7/site-packages/dagster/core/host_representation/external_data.py", line 976, in external_sensor_data_from_def
first_target = sensor_def.targets[0] if sensor_def.targets else None
tributeError: 'function' object has no attribute 'targets'