Hi Guys, I ran into dead ends with hitting the Gra...
# announcements
i
Hi Guys, I ran into dead ends with hitting the GraphQL API, in a streaming fashion it always freeze after a few hundred hits…. Never mind. So I would like to move / test micro batching / processing, but still using a framework like Dagster (so every 5 minutes would be a scheduled run). But with this approach I will have another problem. How can I run solids parallel in a pipeline where I do not know the inputs only run time. My inputs would be a list of tuple (sensor-id). I think I can put my single sensor pipeline into a composite solid (I have just read a little bit about this) but I still do not understand how should I emit events (device_id_1,device_id_2,…) with Output…lets say I have 5000 sensor id in a micro batch so this should be 5000 parallel composite solid run in a pipeline (If I understand the concept) I can implement a resource coordinator, I think that is not an issue, but I don’t understand how can I achieve this concurrency with Dagster approach if someone can help me my questions would be: -         do you have public / example code for something similar ? -         the solids will be distributed across the Celery workers? (even those which are part of a composite solid) -         Dagit can handle this? (3-5000 composite solids in a pipeline, with 7-8 steps so if I calculate the final task graph, that would be 5000*8 = 40 000 task 😊) Or should I move to a custom solution? Any advice would be appreciated! Thanks,  Darvi
a
Can you explain more about the broader task you are trying to accomplish? I have caught pieces of it from working through the issues you have helped uncover so far but it would be good to have that context. We do have a
sensors
feature in
master
due for release in January in
0.10.0
that may be the key to allowing dagster to be useful for what you are trying to accomplish.
cc @prha
i
@alex yes, I am going to gather it...give me 5 mins
So @alex I would like to deploy an aggregator framework for our sensors. I have sensor data coming from lots of cars and I need to aggregate these I have thousands of devices, d001,d002…etc the sensor data ingested into postgresql from Kafka, and every 5 minutes I would like to calculate all of the aggregates for the last 5 minutes (telemetries are arrived) and put back the aggregated events to Kafka, and ingest back the calculated aggregates into PG Lets say I have 1000 cars which sent me basic telemetries in the last 5 mins, and I would like to start 1000 concurrent DAG to calculate the aggregates for these cars (I have a blocking resource control on the PG resource not to kill the database which allow 10 DB connections with distributed lock coordinator so even I start 1000 sub pipeline (composite solid?) it needs to wait to have access to the db) telemetries from DB, car 1 -> agg1,agg2 | car 2 -> agg1,agg2 | car 3 -> agg1,agg2 I think the key point is here, I need to run 5000 different (per device) DAG every 5 minutes for aggregation A single DAG total run is a approximately 10-15 (with the current approach) sec with Celery execution, and 3 sec with in process, but the computation is only 1 sec or less, so nothing special…the scheduler and NFS is slow  I think, but concurrency can solve this…I think 😉 I hope my current pipeline can be transformed into a sub pipeline (composite solid?) and I can start those 5000 sub pipeline in a single schedule I hope it was clear if not, tell me and I am going to try to explain another way 😊 What would be the best approach?
p
@Istvan Darvas I think with
0.10.0
in January, you should be able to launch multiple runs in a single schedule tick, or sensor tick. Our sensor API is geared for a use-case where they are constantly being evaluated (polling for external changes) while the schedule API is set to run according to a pre-set cron string.
i
@prha do u have example pipeline code for this? a toy one? 😉 I just want to look it 😉 if that is possible
and thanks for your answer
p
I don’t have an easy-access example for the schedule, but here’s one of a sensor (which has a similar API to the new schedule API): https://github.com/dagster-io/dagster/blob/master/python_modules/dagster-test/dagster_test/toys/sensors.py#L86
i
thanks I am checking it
@prha @alex I am looking forward to try this out and read more about this in the doc. until that I try to figure out something...Thanks!