Nitin Madhavan

12/23/2021, 4:59 AM
HI all, I want to use Dagster to orchestrate my data recievers. These may include Kafka, a csv file or any other source. The active sources will be available in a config file. My planned deployment strategy - So for example, let us assume that my config file has one kafka source and a file folder source. My schedule will read the config file and generate two jobs - one for reading from kafka and one for reading from file. If the job is already running, it will not initiate a new run using context.instance.get_run_records. The problem I am facing is with the files. When the schedule runs, it sees "file" source in config. It then initiates a Job run for reading the files. In the Op, it loops through all the files and processes them. However, the next time the job runs, it again reads all the same files. One solution from docs is using filename as run_key when creating a job run. But in that case, the loop reading the filenames has to be outside the job_run (in a sensor). I do not know how to achieve this as I am generating the job runs based on a schedule which is looping through all active sources. Thus this would require the schedule to generate a normal job_run if the source is Kafka, and generate/activate a sensor (to generate job runs for each file) if source is file. I know this sounds complicated. Would be grateful if anyone can give some pointers.