Arsenii Poriadin
03/14/2023, 6:41 PMRunRequests
and then failing after some time anyway because of, I guess, some internal Dagster gRPC timeout error (full err example in the disc thread below).
I suppose it happens because sensor has to wait until the yielded RunRequests
succeed, right? But such runs might take a long time and end up in a sensor failure essentially because it couldn't wait for longer anymore. Does it mean it's a bad design done on our side with the jobs being triggered by a sensor or it can actually be tackled somehow by adjusting the configs?
Here is pretty much the example of how we are doing it:
@sensor(...)
def blockstream_sensor(context: SensorEvaluationContext):
with build_resources(
resources={ ... },
resource_config={ ... }
) as resources:
start_time = datetime.now()
stream = typing.cast(FirehoseGrpcResource, resources.firehose_connection).get_stream_by_cursor(context.cursor)
for response in stream:
# some preps
yield RunRequest(
run_key=response.cursor,
run_config={
"ops": {
"decode_block_data": {
"config": {
"block_num": block.number,
"timestamp": str(datetime.now())
}
}
}
},
)
context.update_cursor(response.cursor)
if (datetime.now() - start_time).seconds > 50:
resources.firehose_connection.close()
break
Arsenii Poriadin
03/14/2023, 6:41 PMdagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server. gRPC Error code: UNAVAILABLE
...
env/lib/python3.9/site-packages/dagster/_grpc/client.py", line 142, in _raise_grpc_exception
raise DagsterUserCodeUnreachableError(
The above exception was caused by the following exception:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "{"created":"@1678818811.823113180","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1678818811.823112330","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>
...
/env/lib/python3.9/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)