https://dagster.io/ logo
#ask-community
Title
# ask-community
s

sarah

05/04/2022, 6:54 PM
We’re looking for some help with our Dagster sensor. Overview: We have a sensor that subscribes to a pub/sub topic (a message is published whenever something arrives in a specific GCS bucket). When messages are returned from our pull request to the pub/sub topic, we loop through the messages and yield a RunRequest() with a unique run-key for each message. We also acknowledge the message to avoid reprocessing. Most of the time, this works well. However, we have times of the day where thousands of files are arriving in GCS buckets at a time, and at these times, we notice that some pipelines are never triggered. We currently limit our pull request to 100 messages at a time (we set minimum_interval_seconds=60), and we allow a maximum of 100 concurrent jobs. We don’t see any error messages, and we find no record of the specific run keys that should exist, i.e., the corresponding files have arrived in our GCS bucket, but a pipeline doesn’t seem to have been triggered. Our initial hypothesis was that the messages never arrived, but our tests indicate this is unlikely. We are running on a GKE cluster using auto-scaling. To summarize: • We believe the pipelines were never initiated because there is no record of some expected run keys in any of the log messages • We find no sensor error messages • We don’t see significant numbers of failed workloads in GKE (certainly nowhere near the number of missing jobs) • We don’t see any unacknowledged pub/sub messages This is currently affecting about 5% of our workload, which is causing significant challenges for us. We do have a backup solution (i.e. find cases where pipelines didn’t initiate and reprocess), but it isn’t optimal. Do you have experience with this, or suggestions about how we can determine what is happening?
p

prha

05/04/2022, 7:05 PM
At these times, does anything interesting pop up in the dagster-daemon logs? (these aren’t captured in dagit, wondering if you have some logging service that captures the output from the daemon process)
Also, during these times, does the sensor request any runs, or does it just skip when there should be 100 runs requested?
s

sarah

05/04/2022, 8:10 PM
Thanks for your quick response. We can see the dagster-daemon logs on Google logs explorer, but we don’t see anything interesting. I’m not sure how to check how many runs are requested easily. If I use Dagit to view the sensor tick history during high volume times, I do see occasional grey dots (skipped), but I’m not sure whether that’s because there were no messages at the specific time. If I click on one of the blue dots, I see a number of pipelines, although far less than 100, but I suspect that’s just a limitation on what can be viewed. Unfortunately, this is a bit clumsy. I have set up a sink to explore the dagster-daemon logs more easily, so hopefully I can get some insight soon.
d

daniel

05/05/2022, 1:54 AM
one thing you can do is add a log/print call to your sensor function just as its yielding a RunRequest - that log will show up in the logs from your user code deployment / gRPC server. That would help verify whether or not the run keys that you expect are actually creating runs. If you see a run key that is definitely getting yielded but isn't making its way as a run in the run queue, that would indicate a Dagster bug (but I haven't seen any reports of that particular issue)
s

sarah

05/05/2022, 4:26 PM
Thanks for the suggestion Daniel. We are trying to do exactly that and will hopefully know the results soon (we are just working on a way to distinguish runs generated by our backup process from runs generated by the sensor).
Thanks again @daniel and @prha for your help. We added print calls to our sensor function just before and after yielding a RunRequest that print the file name (which is the same as the run key that should be generated), along with a short message. For some files, although we see the print statement output, we have no record of the expected run key in our log messages. For perspective, this happens for about 5% of the expected run keys, and mostly when there is a high volume of requests. Are there some requirements for resources used by the daemon? In case it’s relevant, we limit the number of concurrent runs to 100, so we expect to have many runs queued when volumes are high.
d

daniel

05/06/2022, 10:59 PM
Thanks Sarah - that's starting to sound like some race condition or error in the daemon. Any chance you are able to pull logs from the daemon during a time period when run keys were skipped? I'd be curious if there are any errors that might have caused it to not enqueue some of the runs
s

sarah

05/06/2022, 11:15 PM
I think logs from the daemon are also available via google logs explorer. Is there anything specific I should look for. I haven’t seen anything obvious yet.
d

daniel

05/06/2022, 11:18 PM
If it's possible to paste a) logs from your user code deployment during a sensor tick that didn't result in runs b) the logs from the daemon when it was processing that tick that didn't end up getting queued, we'd be happy to take a look at them
And see if anything jumps out that might explain how they ended up getting lost
One property of sensors that you might not be fully taking advantage of is that it sounds like your sensor function might have side effects / not be idempotent? Since within the sensor function it's acknowledging the message. If there's any way to use a cursor instead - if we do find that this is happening due to some error happening partway through the tick, that would make it so that on the next tick it tries again from the same start point as the previous time, instead of the ticks getting lost
Like in the example here: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#sensor-optimizations-using-cursors I see how that might be tricky with a pubsub system though
s

sarah

05/06/2022, 11:54 PM
Thanks again for your suggestions. We’re working on better understanding the log messages. Although a cursor would be tricky for us here, we do limit the number of pub/sub messages we pull at a time to 100. So each time the sensor runs, it should only yield a maximum of 100 RunRequests.
d

daniel

05/07/2022, 12:19 AM
Here's a tricky way that you might be able to make things more tolerant to failures while still using the pubsub: • on each sensor run, make the cursor the timestamp or identifier of the final event that's returned • At the beginning of each sensor run, ack / clear any messages in the queue from before the cursor The difference between that and what you described earlier is that nothing gets cleared out until we know that the previous sensor run processed all its runs (since the cursor doesn't get saved until everything is submitted from the sensor). So if there is an error or crash partway through a sensor run, it'll repeat the same messages again instead of moving on and potentially skipping things (and then run keys will ensure that nothing runs twice). Not positive that that will fix the problem you're seeing, but it would be my best guess with the information that we have now
s

sarah

05/09/2022, 7:34 PM
Thanks again for this suggestion. We might try a way to implement this, or possibly take a different approach and avoid using pub/sub. Based on our analysis, we believe the issue may be caused by daemon failures. This is how we understand the process: 1. Sensor requests some jobs 2. Daemon initiates jobs one by one 3. Unexpectedly, the daemon fails 4. New daemon is created but it does not know anything about non-launched jobs because all previous run requests stay in old daemon We saw over 40 new daemons created within 24 hours, with some running for only very short periods of time. Does our understanding match how you expect things to work?
d

daniel

05/09/2022, 8:01 PM
That sounds overall right to me yeah. It's not so much that "all previous run requests stay in old daemon" as that the request failed partway through, after it invalidated some of the run requests but before it could actually process them. The expected behavior if sensor functions have no side effects is that the sensor function will run again, pick up where it left off, and every run gets launched exactly once. But because there are side effects (acking the messages) we lose idempotency - running the function again won't result in picking up where you left off If you have any logs from the daemon failing/crashing, we'd be happy to look into those - failing 40 times in 24 hours doesn't sound expected
s

sarah

05/09/2022, 8:55 PM
Thank you again for your help and for the explanation. Interestingly, we don’t see any error messages. The last messages relate to a specific daemon are just about scale down and stopping container dagster. Maybe there is some issue with auto-scaling. In any case, we will address the idempotency issue with our current sensor.
condagster 1