Was wondering if I could get some support on how to best structure a Dagster project. My goal is to have a pipeline that takes data from an API, applies some transformations using pandas, and saves the result to a postgresql DB. I want to apply this pipeline to about 50 different “tickers - i.e. SPY, TLT, etc.“. Ideally, I’d like to avoid using a loop and iterate through the tickers within the pipeline, so that if one ticker fails I can manually re-apply the pipeline to just that one ticker.
I have two questions:
a) once I’ve built the pipeline, how can I feed it N ticker values, so that it runs identically for each? Also, could I do this in parallel? That’d be nice!
b) I’m a bit confused how to add Postgres for actually saving the data to db. Is it adding another asset? Using the IO Manager? An example / link to github would be great.
04/20/2023, 1:32 AM
A: You can use static partitions to have a partition for each tick. By using partitions, you can run/update/backfill an individual partition at a time, or multiple in parallel
B: We don't have a native I/O manager for Postgres, as far as I know. You can have your asset write to Postgres directly by running the insert query yourself or using something like SQLAlchemy to insert the data for you.
As you scale up, it might be worth considering making a resource or I/O manager for Postgres. OLTP I/O manager support isn't the main priority for Dagster right now, but we'll gladly take the feedback and github issue if you'd like to express interest in having it built natively.
Let me know if this helped!
04/20/2023, 11:55 AM
Hi 👋 @Tim Castillo.
1. The static partitions feature is very cool - just tried it and it works. Thanks!
2. Two questions regarding Postgres / db:
a. Do you guys have any documentation / examples on how to use any database to store the results of a pipeline? Still a bit confused on how to best set this up.
b. Can I have “save to db” as a step in a pipeline, and then continue with the next materialization, do some more processing, “save to db” again, etc.? Basically, I want to materialize some of the intermediary steps of the pipeline into a table.
04/21/2023, 4:56 PM
A) We have a lot of docs around using the databases that we have native integrations using resources/io managers, but not for OLTP databases. Here's an example of how you'd use resources to manage the connection and if you want to make your own resource around the SQLAlchemy/psycopg2/etc, it's pretty straightforward with this example.
B) Yeah! The easiest way would be to write the database at the end of every asset function
I would have to add the disclaimer that the more robust way than writing your own SQL statements every time, you can use I/O managers so you can just return the intermediary data and Dagster will give it you at the next step. But it's more complicated