Andrei Duralei
11/30/2022, 2:30 PMSensor daemon caught an error for sensor my_sensor : dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server.
I’m not sure why it happens but I think that the problem is that my sensor making a lot of requests to another service via REST API. What is the best practice to do that inside sensor? (right now I’m yielding data from python generator and put it inside RunRequest in the sensor. Maybe there are some other ways to do it?2022-11-30 15:21:42 +0100 - dagster.daemon.SensorDaemon - ERROR - Sensor daemon caught an error for sensor my_sensor : dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server
Stack Trace:
File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_daemon/sensor.py", line 483, in _process_tick_generator
sensor_debug_crash_flags,
File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_daemon/sensor.py", line 549, in _evaluate_sensor
instigator_data.cursor if instigator_data else None,
File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_core/host_representation/repository_location.py", line 826, in get_external_sensor_execution_data
cursor,
File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_api/snapshot_sensor.py", line 66, in sync_get_external_sensor_execution_data_grpc
cursor=cursor,
File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_grpc/client.py", line 375, in external_sensor_execution
sensor_execution_args
File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_grpc/client.py", line 166, in _streaming_query
raise DagsterUserCodeUnreachableError("Could not reach user code server") from e
The above exception was caused by the following exception:
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "{"created":"@1669818102.393917000","description":"Error received from peer unix:/var/folders/z6/58cn7h592ys9j6rfzk8hnmt00000gn/T/tmpxxa20eld","file":"src/core/lib/surface/call.cc","file_line":967,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>
Stack Trace:
File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_grpc/client.py", line 163, in _streaming_query
method, request=request_type(**kwargs), timeout=timeout
File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_grpc/client.py", line 152, in _get_streaming_response
yield from getattr(stub, method)(request, metadata=self._metadata, timeout=timeout)
File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/grpc/_channel.py", line 426, in __next__
return self._next()
File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/grpc/_channel.py", line 826, in _next
raise self
chris
11/30/2022, 3:18 PMright now I’m yielding data from python generator and put it inside RunRequest in the sensor.Do you mind sending a code snippet of this?
Andrei Duralei
11/30/2022, 3:32 PMdef get_all_from_outside():
list_of_data = ['ids', 'ids'] # request to external API with python requests to get list of data (for ex. 400 elements)
all_data = []
for data in list_of_data:
request_result = requests.get(data["url"]) # another request
all_data.append(request_result)
return all_data
def get_changes():
data = get_all_from_outside()
our_data = database.get_all_procesed_ids()
to_process = []
for element in data:
if element["id"] not in our_data:
to_process.append(element)
return to_process
def sensor():
to_process = get_changes()
if len(to_process) > 0:
for el in to_process:
yield RunRequest(run_key="some_id", run_config=some_run_config)
else:
yield SkipReason("No updates")
dagster - INFO - Shutting down Dagster code server for package infrastructure.dagster in process 87114
And after this happens Starting of dagster code server in another process. Maybe dagster trying to ping old process that was shut down while sensor is processingchris
11/30/2022, 5:15 PMAndrei Duralei
11/30/2022, 5:27 PM2022-11-30 18:30:51 +0100 - dagster - INFO - Started Dagster code server for package infrastructure.dagster in process 88279
2022-11-30 18:31:58 +0100 - dagster - INFO - Started Dagster code server for package infrastructure.dagster in process 88288
If sensor evaluation takes more than a minute, it will cause an error even If I will extend DAGSTER_GRPC_TIMEOUT_SECONDS. The same error will happen from grpc, but with different status
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
I think that it is a bug behaviour or at least not specified behaviour. What do you think?