https://dagster.io/ logo
Title
n

Nitin Madhavan

12/16/2021, 7:21 AM
I recently discovered Dagster and loved the way it does things. I am trying to build the ETL pipeline for a new system we are building using Dagster instead of Airflow. Though it is better in almost every ways, I am stuck with an important requirement and am not able to find a way out. There are two  problem statements :- 1. We have a Kafka queue which is populated at different rates based on when data comes in. 2. Sometimes it may be over a 1000 msgs per second and at other times no messages for long periods. 3. So the Job reading the data from the Kafka queue has to keep checking and run regularly to process the queue. 4. When no data is coming, it would check every 1 minute and stop. The job would end in a few seconds. 5. But if data is continuously coming, it would have to read data and process data. This would take 2-3 minutes. 6. Once the job is completed, it would be run again till the entire queue is again empty. I think I need a  Schedule/Sensor which runs say every 30 seconds. If another instance of the same job is already running, it skips this run. Else it runs the Job. I think even scripts which are receiving data from streaming sources would have a similar requirement. Is there any other pattern for such long running jobs. Would be grateful for any help.
y

yuhan

12/16/2021, 7:47 AM
Hi @Nitin Madhavan! Thanks for the feedback and the write-up. I would recommend having a sensor that checks the Kafka states to start a run and check if there's the same job running to skip a tick (exactly as you described). You can get the run status/records via
context.instance.get_run_records(filters=PipelineRunsFilter(pipeline_name="your_job_name"))
inside the sensor’s evaluation func
n

Nitin Madhavan

12/16/2021, 7:58 AM
Thanks @yuhan I will look into this.