https://dagster.io/ logo
Title
a

Andrei Duralei

11/30/2022, 2:30 PM
Hi! Thanks for the project, really appreciate it! I have a problem. The error says something like this
Sensor daemon caught an error for sensor my_sensor : dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server.
I’m not sure why it happens but I think that the problem is that my sensor making a lot of requests to another service via REST API. What is the best practice to do that inside sensor? (right now I’m yielding data from python generator and put it inside RunRequest in the sensor. Maybe there are some other ways to do it?
I’m testing it locally on my MacOs laptop. with dagster version 1.1.3
StackTrace:
2022-11-30 15:21:42 +0100 - dagster.daemon.SensorDaemon - ERROR - Sensor daemon caught an error for sensor my_sensor : dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server

Stack Trace:
  File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_daemon/sensor.py", line 483, in _process_tick_generator
    sensor_debug_crash_flags,
  File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_daemon/sensor.py", line 549, in _evaluate_sensor
    instigator_data.cursor if instigator_data else None,
  File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_core/host_representation/repository_location.py", line 826, in get_external_sensor_execution_data
    cursor,
  File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_api/snapshot_sensor.py", line 66, in sync_get_external_sensor_execution_data_grpc
    cursor=cursor,
  File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_grpc/client.py", line 375, in external_sensor_execution
    sensor_execution_args
  File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_grpc/client.py", line 166, in _streaming_query
    raise DagsterUserCodeUnreachableError("Could not reach user code server") from e

The above exception was caused by the following exception:
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "{"created":"@1669818102.393917000","description":"Error received from peer unix:/var/folders/z6/58cn7h592ys9j6rfzk8hnmt00000gn/T/tmpxxa20eld","file":"src/core/lib/surface/call.cc","file_line":967,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>

Stack Trace:
  File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_grpc/client.py", line 163, in _streaming_query
    method, request=request_type(**kwargs), timeout=timeout
  File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/dagster/_grpc/client.py", line 152, in _get_streaming_response
    yield from getattr(stub, method)(request, metadata=self._metadata, timeout=timeout)
  File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/Users/user/anaconda3/envs/env/lib/python3.7/site-packages/grpc/_channel.py", line 826, in _next
    raise self
c

chris

11/30/2022, 3:18 PM
right now I’m yielding data from python generator and put it inside RunRequest in the sensor.
Do you mind sending a code snippet of this?
a

Andrei Duralei

11/30/2022, 3:32 PM
I thought that yielding data instead of process everything in sync mode will help dagster to communicate with grpc server. But I returned to my previous code, where I process everything in sync mode. Unfortunately, I can’t share exact code, but I can presudo code it if it will help.
def get_all_from_outside():
    list_of_data = ['ids', 'ids'] # request to external API with python requests to get list of data (for ex. 400 elements)
    all_data = []
    for data in list_of_data:
        request_result = requests.get(data["url"]) # another request
        all_data.append(request_result)
    return all_data


def get_changes():
    data = get_all_from_outside()
    our_data = database.get_all_procesed_ids()
    to_process = []
    for element in data:
        if element["id"] not in our_data:
            to_process.append(element)
    return to_process


def sensor():
    to_process = get_changes()
    if len(to_process) > 0:
        for el in to_process:
            yield RunRequest(run_key="some_id", run_config=some_run_config)
    else:
        yield SkipReason("No updates")
What I have found is that default timeout value for GRPC is 60. And this error happens right after 60 second after sensor processing started. Since my sensor requires about 3-4 min to process, I think it causes the problem. But I’m not sure how to fix it in the right way
I also noticed this logger info line
dagster - INFO - Shutting down Dagster code server for package infrastructure.dagster in process 87114
And after this happens Starting of dagster code server in another process. Maybe dagster trying to ping old process that was shut down while sensor is processing
c

chris

11/30/2022, 5:15 PM
yup it definitely looks like it's happening because of timeouts. You could maybe use a cursor to decrease the runtime per sensor evaluation - basically splitting the evaluation of changes into multiple sensor evaluations
a

Andrei Duralei

11/30/2022, 5:27 PM
The data that I’m receiving don’t allow me to store something simple in cursor (since id is something complicated, not sequential). But I will try to do something with it or will try to switch to another API endpoint that aggregate everything in one request. But in anyway, is it working as expected? If the sensor takes a lot of time for the evaluation it should work like this?
I’m not sure, but it seems to me, that dagster restarts code server every ~minute
2022-11-30 18:30:51 +0100 - dagster - INFO - Started Dagster code server for package infrastructure.dagster in process 88279
2022-11-30 18:31:58 +0100 - dagster - INFO - Started Dagster code server for package infrastructure.dagster in process 88288
If sensor evaluation takes more than a minute, it will cause an error even If I will extend DAGSTER_GRPC_TIMEOUT_SECONDS. The same error will happen from grpc, but with different status
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
I think that it is a bug behaviour or at least not specified behaviour. What do you think?
Finally I found where it specified. Thanks for help anyway)
😒aluting_face: 1
I will leave the link for anyone who can face the same issue https://docs.dagster.io/deployment/dagster-instance#code-servers