I have the following data asset that I'd like to l...
# ask-community
p
I have the following data asset that I'd like to load into a postgres database. What would be the easiest way for me to do this? I've been searching for similar posts here and it seems like a custom I/O Manager and/or TypeHandler may be required? Hoping there is a more straightforward way to achieve this though.
Copy code
@asset
def medi_cal_ffs_source_data(context):
    """Retrieve the latest DHCS Medi-Cal fee-for-service CSV and return DataFrame"""
    url = "<https://geohub-cadhcs.hub.arcgis.com/datasets/CADHCS::medi-cal-ffs-provider-listing.csv>"
    df = pd.read_csv(url)
    return Output(value=df)
t
Hi! I saw the issue you raised and wanted to say thank you! Issues are an important way to collect user feedback. Now, re: your question You can create a super lightweight resource that wraps around SQLAlchemy or another lib that'll let you write to your Postgres database and then use that to write to the db rather than using an i/o manager
p
Thank you for the help! I'll review this resource doc and see if I can piece things together.
t
Oh, hmm, might've sent you the wrong link, I'd recommend looking at this one instead: https://docs.dagster.io/concepts/resources#defining-resources-which-require-state
👍 1
Here's an incomplete code snippet to help give some direction:
Copy code
from dagster import asset, ResourceParam, ConfigurableResource, asset

class PostgresResource(ConfigurableResource):
    username: str
    password: str
    host: str

    def get_client(self):
        return new SQLAlchemyGeneratorwahtever(self.username, self.password, self.host)


@asset(
    required_resource_keys={"postgres"}
)
def medi_cal_ffs_source_data(conn: PostgresResource):
    """Retrieve the latest DHCS Medi-Cal fee-for-service CSV and return DataFrame"""
    url = "<https://geohub-cadhcs.hub.arcgis.com/datasets/CADHCS::medi-cal-ffs-provider-listing.csv>"
    df = pd.read_csv(url)
    df.to_sql("table_name", conn.get_client())
💯 1
p
Thanks for sharing this!
e
Would it be possible to do something like this?
Copy code
class PostgresResource(ConfigurableResource):
    _pool: Pool
    _schema: str
where pool is the database client
so then in my asset I can do:
Copy code
async with postgres._pool.acquire() as conn:
     conn.execute(...)