https://dagster.io/ logo
#ask-community
Title
# ask-community
t

Thomas Tran

04/28/2023, 10:30 AM
Hey, I just started my Dagster journey. In my first project, I ran into some problems and am out of ideas. Any feedback is appreciated 😊 Context We are trying to read and copy a large file using PyArrow. Copying the whole file takes an unreasonable amount of time. Fortunately, we can split the dataset into batches and filter them on the fly. This makes the operation feasible. Problem The filter depends on a previously materialized asset. Question What is the best way to approach this? Our Approach We thought it would be a good idea to use a ConfigurableIOManager. This way, we don't have to replicate the code for every asset and can keep the IO operations in a centralized place. The filter can be reduced to a simple tuple of strings that can be set in the configuration. Now, we use an AssetSensor to observe our first asset. We use the values in the materialized asset to configure our IOManager in a RunConfig. This lead to several problems that we tackled and seemingly solved (cyclic dependencies due to loading an asset inside a sensor; UPathIOManager and ConfigurableIOManager not being compatible but needing both init methods; RunConfig only accepting simple Python types). We got a demo running locally. But when deploying, we ran into two issues that we don't understand. Issue 1 When the sensor tries to load the asset (definitions.load_asset_value(AssetKey(...)) or manually using the path), we receive a gRPC error (see below). It seems to randomly get one of "Socket closed" or "Connection reset by peer". This is suprising to us as loading the asset in other places works just fine.
Copy code
dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server. gRPC Error code: UNAVAILABLE
  File "/usr/local/lib/python3.7/site-packages/dagster/_daemon/sensor.py", line 517, in _process_tick_generator
    sensor_debug_crash_flags,
  File "/usr/local/lib/python3.7/site-packages/dagster/_daemon/sensor.py", line 581, in _evaluate_sensor
    instigator_data.cursor if instigator_data else None,
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/host_representation/code_location.py", line 861, in get_external_sensor_execution_data
    cursor,
  File "/usr/local/lib/python3.7/site-packages/dagster/_api/snapshot_sensor.py", line 72, in sync_get_external_sensor_execution_data_grpc
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 394, in external_sensor_execution
    custom_timeout_message=custom_timeout_message,
  File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 185, in _streaming_query
    e, timeout=timeout, custom_timeout_message=custom_timeout_message
  File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 142, in _raise_grpc_exception
    ) from e
The above exception was caused by the following exception:
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "Socket closed"
	debug_error_string = "{"created":"@1682675853.096652746","description":"Error received from peer ipv4:10.0.119.68:3030","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"Socket closed","grpc_status":14}"
>
  File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 181, in _streaming_query
    method, request=request_type(**kwargs), timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 169, in _get_streaming_response
    yield from getattr(stub, method)(request, metadata=self._metadata, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in _next
    raise self
Issue 2 This is likely directly connected to Issue 1. The whole dagster process seems to shut down when evaluating the sensor and restart. Sometimes this leads to a crash loop where dagster cannot recover on its own and has to be killed manually. Afterwards, it runs fine again (until evaluating the sensor).
c

chris

04/28/2023, 4:54 PM
Sensor evaluations time out after about 30 seconds and aren’t suitable for serious computation - mainly used to observe and kick other things off. From the above stack trace, I think you’re hitting the timeout window. When you say loading the asset in the sensor, what does that imply here exactly?
t

Thomas Tran

05/03/2023, 11:14 AM
Hey, thanks a lot for your answer! I rewrote my code and put everything in assets instead. It's now doing everything it should. To satisfy your curiosity: It seemed like the error would pop up immediately. The long computations are part of the triggered run, not the sensor. But I could be wrong. I tried using both Definitions.load_asset_value() as well as loading the asset by giving it a path via polars.read_parquet(path). It was always this line that resulted in an error. Everything after that wasn't executed.
c

chris

05/05/2023, 3:56 PM
Ah I see - glad everything ended up working. Yea now that things are refactored into assets, I’d be curious to see what the time to complete was on that line
but in any case, sounds like you were just hitting some timing constraints
7 Views