Looking into sensors and I am getting this error: ...
# ask-community
Looking into sensors and I am getting this error:
Copy code
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.DEADLINE_EXCEEDED details = "Deadline Exceeded" debug_error_string = "{"created":"@1648579523.062155830","description":"Error received from peer unix:/tmp/tmp270j3kna","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"Deadline Exceeded","grpc_status":4}"
I am trying to have a pipeline job run everytime a file has been added into a very large s3 bucket. My sensor functions currently look like:
Copy code
def config_dagster_sensors():
    ##Dagster Sensors
    dagster_sensors = []

    def build_sensor() -> SensorDefinition:
        return SensorDefinition(

    def sensor_fn(context: SensorExecutionContext):
        new_s3_keys = get_s3_keys("bucket_name", since_key=context.cursor)
        if not new_s3_keys:
            yield SkipReason("No new s3 files found for bucket.")
        for s3_key in new_s3_keys:
            yield RunRequest(run_key=s3_key, run_config={}, pipeline_name="pipeline_name")

    return dagster_sensors
I am getting the impression the error is coming from not being able to finish in the 60 second time. Is there anyway to override this and allow it to finish no matter how long it takes?
Hi Dominick, it’s not currently possible to increase that timeout. However you should be able to impose a limit on how many items in
you’ll process within a single tick, and use the cursor to just pick up where you left off
@johann Shouldn’t the cursor be updated after every run request I am calling above and start from that key next iteration?
Yes- your code could just change to something like
Copy code
for s3_key in new_s3_keys[:KEYS_PER_TICK_LIMIT]:
You’d just have to experiment with KEYS_PER_TICK_LIMIT to find a reasonable number that doesn’t go over the timeout. cc @prha in case he has any tips here
The cursor isn't actually updated until the function completely finishes (which is a bit counterintuitive due to the yields)
And similarly it waits until the function completely finishes before any RunRequests are processed - it's not actually async currently
Ok, great. I will mess around with my config. Thanks everyone!
Even when I lower limit for keys down to 1, it crashes my daemon entirely. I do not need a special kind of config for the daemon to work with sensors, do I? @johann @daniel
that's surprising, do you have a stack trace for the crash?
is it a memory issue possibly?
Same error as before - but both memory and CPU of my ec2 maxed out so easily could be part of it.
When I run job I am testing with a schedule or just manually in UI I do not get anywhere near that kind of usage rate
Are you running the daemon locally?
If the bucket is massive, its possible that get_s3_keys is OOMing the process
👍 1
Both are on ecs containers
Is the user code server running in its own ECS task?
huh, that's surprising that the daemon would crash then. Do you mean literally crash? Like the process/task stops?
It seems to have shut down - task is still running
Any logs from the daemon task that might help explain what's going on?
I think my use case may be better suited to use Lambda in aws to trigger jobs using graphql
Daemon task logs just showing same error as before ^^ and that the daemon shut off
Exception: Stopping dagster-daemon process since the following threads are no longer sending heartbeats: ['SENSOR']
Hm, and what version of dagster is this? It should still be heartbeating even if a sensor is timing out
Should be latest - I just rebuilt image and it grabs latest from my requirements file
We can see if we can reproduce - if it's possible to post or DM the logs from the failed task (for 5-10 minutes or so before it crashed) that would help debug
Also a bit surprising that the task stayed up after the daemon process died - I would expect it to restart