Hi, could someone please explain what is meant by ...
# deployment-kubernetes
o
Hi, could someone please explain what is meant by this line in the sensors docs
The Dagster Daemon runs each sensor evaluation function on a tight loop
Does it mean only one sensor is evaulated at a time?
d
Hi Oliver - this is correct, although we’ve discussed allowing multiple daemons to run for horizontally scaling in the future if this becomes a bottleneck, so definitely let us know if that’s the case for you. (Crucially the sensors are evaluated one at a time to determine which runs to launch, but the runs that they create are launched separately - so sensors are blocked on other sensors’ evaluation functions, but not on the pipelines)
The reason for this is relevant to your previous question about the uses for async syntax for sensors actually - the thinking was that sensors would be relatively quick and lightweight vs being long-running streaming functions. But definitely open to feedback on that thinking.
o
For my use case I am consuming data from kinesis streams, each sensor reads from a different shard of the kinesis endpoint the approach I am using to enable sensors for this is return if either a) the function has run too long, b) there was nothing new data in the last request. I will need to do more testing but I imaging this will result in an avalanche type effect in that if one stream last so long that it times out then all streams will start doing so and it will be difficult to catch up. I guess it might make more sense to have a single sensor that uses some form of multiprocessing over all streams. ah no, this wont work due to the run no
Copy code
2021-05-17 06:21:08 - SensorDaemon - INFO - Checking for new runs for sensor: kinesis007
2021-05-17 06:21:12 - SensorDaemon - INFO - Creating new run for kinesis007
2021-05-17 06:21:13 - SensorDaemon - INFO - Launching run for kinesis007
2021-05-17 06:21:14 - SensorDaemon - INFO - Completed launch of run d299d55a-96e4-46de-a055-ce75d8b7b60a for kinesis007
2021-05-17 06:21:14 - SensorDaemon - INFO - Creating new run for kinesis007
2021-05-17 06:21:15 - SensorDaemon - INFO - Launching run for kinesis007
2021-05-17 06:21:16 - SensorDaemon - INFO - Completed launch of run d02b9da4-e06d-4632-8afb-0894e211a4f6 for kinesis007
2021-05-17 06:21:17 - SensorDaemon - INFO - Creating new run for kinesis007
2021-05-17 06:21:17 - SensorDaemon - INFO - Launching run for kinesis007
2021-05-17 06:21:18 - SensorDaemon - INFO - Completed launch of run a313bcf4-daf1-4ca5-91f0-403f75e05878 for kinesis007
2021-05-17 06:21:18 - SensorDaemon - INFO - Creating new run for kinesis007
2021-05-17 06:21:19 - SensorDaemon - INFO - Launching run for kinesis007
2021-05-17 06:21:20 - SensorDaemon - INFO - Completed launch of run 98f32b52-6173-4ce4-973a-86add0f6de90 for kinesis007
2021-05-17 06:21:20 - SensorDaemon - INFO - Creating new run for kinesis007
2021-05-17 06:21:21 - SensorDaemon - INFO - Launching run for kinesis007
2021-05-17 06:21:23 - SensorDaemon - INFO - Completed launch of run 578c78a0-24ac-4829-ab4d-e5e451917d2c for kinesis007
2021-05-17 06:21:23 - SensorDaemon - INFO - Creating new run for kinesis007
2021-05-17 06:21:25 - SensorDaemon - INFO - Launching run for kinesis007
2021-05-17 06:21:26 - BackfillDaemon - INFO - No backfill jobs requested.
2021-05-17 06:21:26 - SensorDaemon - INFO - Completed launch of run 80ca17ad-0c42-41bd-8dfd-800842fc8321 for kinesis007
2021-05-17 06:21:26 - SensorDaemon - INFO - Creating new run for kinesis007
2021-05-17 06:21:28 - SensorDaemon - INFO - Launching run for kinesis007
2021-05-17 06:21:28 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-05-17 06:21:29 - SensorDaemon - INFO - Completed launch of run 4d737c1e-7e7f-476a-b675-92f7de8e8479 for kinesis007
2021-05-17 06:21:29 - SensorDaemon - INFO - Creating new run for kinesis007
2021-05-17 06:21:29 - SensorDaemon - INFO - Launching run for kinesis007
2021-05-17 06:21:31 - SensorDaemon - INFO - Completed launch of run 49ee27a0-0048-4f8d-81dd-603872a52f65 for kinesis007
2021-05-17 06:21:31 - SensorDaemon - INFO - Checking for new runs for sensor: kinesis001
2021-05-17 06:21:34 - SensorDaemon - INFO - Creating new run for kinesis001
the three main issues in the way of making this use case work 1. slow to actually create the jobs eg
Copy code
2021-05-17 06:21:12 - SensorDaemon - INFO - Creating new run for kinesis007
2021-05-17 06:21:13 - SensorDaemon - INFO - Launching run for kinesis007
2021-05-17 06:21:14 - SensorDaemon - INFO - Completed launch of run d299d55a-96e4-46de-a055-ce75d8b7b60a for kinesis007
2. Sensors are not evaluated in parallel. 3. job not executed on yield. #1 - probably just inherent in the system and would be hard to reduce #2 - I think this is most detrimental issue for the use case since it ties the latency of the system to the number of shards. #3 - would incidentally fix #1 but I think this might cross a grpc boundary so probably quite involved.
Setup: data-proxy-service- this esentially takes single json items, batches them together and pushes them to aws kinesis. it sends a batch to kinesis every 5 seconds or if the size of the batch is >1mb. (these are tunable) sensor- looks something like this, each sensor is reading its own shard in the kinesis stream. (kinesis works very much like kafka, let me know if you want more explanation)
Copy code
def get_jobs(sensor):
    now = time.now()
    while True:
        if time.now()-start>2:
            return
        records = get_records()
        if len(records)==0:
            return
        job = make_job(record)
        yield job
situtation: the data-proxy-service is fed with ~10mb/s of data. each kinesis shard can handle 2mb/s and so we need 5 streams to handle the load which results in 5 sensors. through experimentation it was found that each senor iteration can yield ~3 jobs in the 2 seconds it is allocated. these 3 jobs are returned to the calling function. The calling function then iterates over the list and will launch about 1 job every 2 seconds. so for the 3 jobs to be launched and the collection function itself this results in around 8 seconds per sensor evaluation. We have 5 sensors so in total one round of sensor evaluations is taking ~40 seconds. In 40 seconds the data-queue-proxy is going to push 8 records to the kinesis stream. so for every round of sensor evaluations we are losing 5 places from the head of the kinesis stream in each shard.
Hope this elaborates well on the issue im running into. please let me know if you need any more info/if i should create or add to an gh issue.
d
Thanks for the detailed explanation! One thing I would try for issue #1 is enabling our run queue feature: https://docs.dagster.io/deployment/run-coordinator That will cause the run launching part to happen on a different thread which would give you some parallelism here. (Instead the sensor daemon will now just be adding the run to a queue which should be faster than 2 seconds per run) The ability to run multiple sensor daemons in parallel is also a very reasonable feature request though for high-volume sensors, but that might give you some more breathing room in the short term. I also think that rather than having the sensor return if it has run too long, you might be able to get away with only doing one check for items per sensor iteration? If you put each sensor on a short interval, say 1 second (via https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#evaluation-interval) the sensor daemon may be able to handle calling each sensor in succession in a loop for you
o
The run coordinator works great, thanks The run interval has always been low, but this doesn't make a difference since the frequency is dependent on the other sensors. I was however able to modify the fetch call to be able to pull more than one record at a time and so only have to do one fetch total. these two modification have increased my throughput to be able to handle what my laptop can throw at it, so out of hot water for now. Although ideally in the long run sensor will all run in parallel as this will help to reduce latency of the system. Thanks for the help
condagster 1