https://dagster.io/ logo
#ask-community
Title
# ask-community
w

Wei Hao Quek

02/27/2023, 9:23 AM
Hi all, I am trying to use IO Manager to load from and save to a PostgreSQL database.
Copy code
import pandas as pd

from dagster import IOManager, InputContext, OutputContext

class MyIOManager(IOManager):
    def handle_output(self, context: OutputContext, obj: pd.DataFrame) -> None:
        # Push it into a PostgreSQL database
        # Save its returning index to a file
        pass

    def load_input(self, context: InputContext) -> pd.DataFrame:
        # Read the index from the file
        # Pull from PostgreSQL database using the index
        pass
However, saving to a file and read from it does not seem right to me. Can someone shed some lights on me if there is a better way to do this? Thank you in advance. 🙂
🤖 1
p

Peter Davidson

02/27/2023, 9:48 AM
We have this setup: • Source assets from Postgres using a db_io (based on the snowflake) • downstream assets do something, ◦ take source asset as input ▪︎ this triggers load_input in the db_io which contains the SQL SELECT ◦ Write to file using fs_io • further downstream have storage of the result ◦ Use a db_io again ▪︎ handle_output uses pandas to_sql to write the data back to PostGres So, you need two IOmanagers, one for files and one for DB tables
w

Wei Hao Quek

02/27/2023, 10:24 AM
Hi @Vinnie, the example you shared seems to be using timeframe to load_input. For PostgreSQL, I believe unless we inject timestamp into the data, we cannot pull in this way. Now, to pull and push from the database, I am thinking that I can add a column to store the run_id. However, in your opinion, is there any other way which I do not need to modify the table structure?
Hi @Peter Davidson, my apologies for my confusing sentences. My problem is mainly with using IO Manager for storing the data I have processed into a PostgreSQL DB and load back the exact data for downstream. The loading part is the part where I am stucked. When writing my own IO Manager, I am unable to pass any information from handle_output to load_input, so I am unable to get any IDs to retrieve back the data from database in the load_input function. Hopefully, this can clear up my questions.
v

Vinnie

02/27/2023, 10:48 AM
Do I understand correctly that you don’t have any date/time information in your table? The example passes the partition information into the select statement, thus filtering the query; it’s nothing to do with the run id or run timestamp.
w

Wei Hao Quek

02/27/2023, 2:42 PM
@Vinnie Yes. The table which I am using do not have any date time information. I saw that the time_window is gotten from the partition information. After that, they use this time_windows to query from the db. So I believe since my table has no date time information, I have to add some sort of information to track "what have I processed". Either I can add a DateTime column for my data or I am thinking to add a run_id column with context.get_identifier()?
d

Daniel Gafni

02/27/2023, 9:30 PM
1. You can log your storage information (like the table name) as metadata with context.add_output_metadata in handle_output and read it in load_input. This metadata can point to the actual table. 2. You can make your own table in postgres specifically for your IOManager and store the information there. (1) is basically a better (2) 3. Have a deterministic function that extracts this information from the context (using asset_key, etc). You then can call it in both handle_output and load_input.
It’s not straightforward how to access the metadata in load_input. This should help. By the way, @sandy , are there any plans to make this pattern more accessible?
w

Wei Hao Quek

02/28/2023, 12:55 AM
@Daniel Gafni (1) looks really promising! I will give it a try. Thank you. 🙂
73 Views