https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Arsenii Poriadin

03/14/2023, 6:41 PM
Hello! I have the situation when the amount of data (received from the external endpoint) which triggers the sensor is way beyond the amount that can be processed during one sensor run. We're using cursors to optimize the sensor saving the cursor for the next runs. But I'm trying to understand why sensor runs can still go way beyond its max time window of 1 minute trying to yield huge amount of
RunRequests
and then failing after some time anyway because of, I guess, some internal Dagster gRPC timeout error (full err example in the disc thread below). I suppose it happens because sensor has to wait until the yielded
RunRequests
succeed, right? But such runs might take a long time and end up in a sensor failure essentially because it couldn't wait for longer anymore. Does it mean it's a bad design done on our side with the jobs being triggered by a sensor or it can actually be tackled somehow by adjusting the configs? Here is pretty much the example of how we are doing it:
Copy code
@sensor(...)
def blockstream_sensor(context: SensorEvaluationContext):
    with build_resources(
        resources={ ... },
        resource_config={ ... }
    ) as resources:

     start_time = datetime.now()
     stream = typing.cast(FirehoseGrpcResource, resources.firehose_connection).get_stream_by_cursor(context.cursor)

     for response in stream:
            # some preps

            yield RunRequest(
                run_key=response.cursor,
                run_config={
                    "ops": {
                        "decode_block_data": {
                            "config": {
                                "block_num": block.number,
                                "timestamp": str(datetime.now())
                            }
                        }
                    }
                },
            )

            context.update_cursor(response.cursor)

            if (datetime.now() - start_time).seconds > 50:
                resources.firehose_connection.close()
                break
Copy code
dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server. gRPC Error code: UNAVAILABLE

...
env/lib/python3.9/site-packages/dagster/_grpc/client.py", line 142, in _raise_grpc_exception
    raise DagsterUserCodeUnreachableError(

The above exception was caused by the following exception:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "{"created":"@1678818811.823113180","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1678818811.823112330","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>

...
/env/lib/python3.9/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
2 Views