Philip Orlando
05/03/2023, 10:01 PM@asset
def medi_cal_ffs_source_data(context):
"""Retrieve the latest DHCS Medi-Cal fee-for-service CSV and return DataFrame"""
url = "<https://geohub-cadhcs.hub.arcgis.com/datasets/CADHCS::medi-cal-ffs-provider-listing.csv>"
df = pd.read_csv(url)
return Output(value=df)
Tim Castillo
05/03/2023, 10:06 PMPhilip Orlando
05/03/2023, 10:08 PMTim Castillo
05/03/2023, 10:10 PMTim Castillo
05/03/2023, 10:13 PMfrom dagster import asset, ResourceParam, ConfigurableResource, asset
class PostgresResource(ConfigurableResource):
username: str
password: str
host: str
def get_client(self):
return new SQLAlchemyGeneratorwahtever(self.username, self.password, self.host)
@asset(
required_resource_keys={"postgres"}
)
def medi_cal_ffs_source_data(conn: PostgresResource):
"""Retrieve the latest DHCS Medi-Cal fee-for-service CSV and return DataFrame"""
url = "<https://geohub-cadhcs.hub.arcgis.com/datasets/CADHCS::medi-cal-ffs-provider-listing.csv>"
df = pd.read_csv(url)
df.to_sql("table_name", conn.get_client())
Philip Orlando
05/03/2023, 10:14 PMEthan Leifer
05/09/2023, 1:02 PMclass PostgresResource(ConfigurableResource):
_pool: Pool
_schema: str
Ethan Leifer
05/09/2023, 1:02 PMEthan Leifer
05/09/2023, 1:02 PMasync with postgres._pool.acquire() as conn:
conn.execute(...)