https://dagster.io/ logo
Title
p

Phillip Marks

02/27/2023, 3:54 PM
Hey, I'm having an issue linking the output of a newly created snowflake table to pandas. All the code below works; however, I want to find a solution that avoids creating a
my_new_table
source asset. It works as follows: 1. production_cement creates a new table in snowflake called "PRODUCTION". 2. "PRODUCTION" is imported as a new SourceAsset 3. my_derived_asset reads the new SourceAsset into pandas Is there a way of avoiding step 2 and loading the output of 1 into 3?
ngfs = SourceAsset(key="NGFS")

@asset(required_resource_keys={"snow_resource"}, non_argument_deps={"NGFS"})
def production_cement(context) -> None:
    ''' Create a new table on Snowflake called PRODUCTION '''
    context.resources.snow_resource.execute_query("create or replace view PRODUCTION as select * from NGFS where variable = 'Production|Cement'")


my_new_table = SourceAsset(key="PRODUCTION")


@asset(io_manager_key= "pd_snowflake_manager", ins={"upstream": AssetIn("PRODUCTION")})
def my_derived_asset(upstream: pd.DataFrame):
    return upstream.dropna()
s

sandy

02/27/2023, 6:28 PM
Hi @Phillip Marks - have you tried setting
io_manager_key= "pd_snowflake_manager"
on the
production_cement
asset?
p

Phillip Marks

02/28/2023, 10:17 AM
Hey @sandy - switching to
pd_snowflake_manger
rather then
snowflake_resource
I get the error "AttributeError: 'DbIOManager' object has no attribute 'execute_query'" using
context.resources.pd_snowflake_manager.execute_query("SQL")
does pd_snowflake_manger have an equivalent to `snowflake_resource`'s
execute_query
?
s

sandy

02/28/2023, 4:01 PM
I didn't mean to suggest that you should replace the
snowflake_resource
with the
pd_snowflake_manger
. But rather that you should keep doing what you're doing and also set the
io_manager_key
p

Phillip Marks

02/28/2023, 4:32 PM
is the idea that production_cement outputs a pandas df? If so, --> None works but doesn't output a pd Dataframe
@asset(required_resource_keys={"snow_resource"}, non_argument_deps={"NGFS"}, io_manager_key= "io_manager")
def production_cement(context) -> None:
    context.resources.snow_resource.execute_query("create or replace view PRODUCTION as select * from NGFS where variable = 'Production|Cement'")
this
def production_cement(context) -> pd.DataFrame:
and this
def production_cement(context:pd.DataFrame):
don't or can I modify
my_derived_asset
to accept
production_cement
as a pd Dataframe
s

sandy

02/28/2023, 4:32 PM
is the idea that production_cement outputs a pandas df? If so, --> None works but doesn't output a pd Dataframe
it doesn't need to output a pandas DF. putting the IO manager on the asset just tells downstream assets that depend on it how to load it
p

Phillip Marks

02/28/2023, 4:34 PM
righto, how would I change the decorator of
my_derived_asset
to be downstream of production_cement?
@asset(io_manager_key= "pd_snowflake_manager", ins={"upstream": AssetIn("PRODUCTION")})
def my_derived_asset(upstream: pd.DataFrame):
    return upstream.dropna()
s

sandy

02/28/2023, 4:36 PM
@asset(io_manager_key= "pd_snowflake_manager")
def my_derived_asset(production_cement: pd.DataFrame):
    return production_cement.dropna()
p

Phillip Marks

02/28/2023, 4:49 PM
thanks, I tried that but get the error "DATABASE.SCHEMA.PRODUCTION_CEMENT" does not exist because production_cement creates table PRODUCTION. how can I pass "PRODUCTION" to my_derived_asset.
s

sandy

02/28/2023, 4:55 PM
so you want the table to have a different name than the asset? right now, the built-in snowflake pandas IO manager uses the asset name as the table name, so you'd need to either change the name of the asset or write an IO manager that does the translation
p

Phillip Marks

02/28/2023, 4:58 PM
thank you so much for your help @sandy