https://dagster.io/ logo
Title
a

assaf

06/06/2021, 7:34 AM
👋 Hi all! 👋 I'd like your help in figuring out this scenario, please let me know if it's been addressed before, as I couldn't find it searching the Slack history. I have this Dagster pipeline that archives S3 objects into Glacier. The input data for that are generated by a Databricks job operating on inventory data from S3, and writing its output (references to S3 objects eligible for archiving) as a set of parquet files in S3. Each of those files is the input for a single run of the Dagster pipeline, which does the actual archiving. So the topology goes roughly like (this isn't actual code):
archiving_inventory_parquet_files = databricks_process_inventory()
for pq in archiving_inventory_parquet_files:
  run_archiving_pipeline_dagster(pq)
To better track progress of the actual archiving, I wanted to use partition sets, where each partition represents a single parquet file. What I did was output the list of parquet files as a CSV, save that as a file next to my pipeline code, and generate a
PartitionSetDefinition
from that. However, I need to add such a CSV to my code whenever I rerun the Databricks code and generate a new partition set. Ideally, I would like to wrap that Databricks job in a Dagster pipeline, and have it add a partition set to the other pipeline when it's done. My question is: is there a way to dynamically update the partition sets (namely, add a new one) for pipeline B, using output from pipeline A? Maintaining that state in Dagster is not a trivial ask, I understand, but has anybody else come across a similar challenge?
My best idea so far would be to write partition metadata from pipeline A into a postgres table, and dynamically generate partition sets for pipeline B by reading that table.
a

alex

06/07/2021, 3:37 PM
i can’t think of a better approach than externalizing the state in a DB like you described