Phillip Marks
02/27/2023, 3:54 PMmy_new_table
source asset. It works as follows:
1. production_cement creates a new table in snowflake called "PRODUCTION".
2. "PRODUCTION" is imported as a new SourceAsset
3. my_derived_asset reads the new SourceAsset into pandas
Is there a way of avoiding step 2 and loading the output of 1 into 3?
ngfs = SourceAsset(key="NGFS")
@asset(required_resource_keys={"snow_resource"}, non_argument_deps={"NGFS"})
def production_cement(context) -> None:
''' Create a new table on Snowflake called PRODUCTION '''
context.resources.snow_resource.execute_query("create or replace view PRODUCTION as select * from NGFS where variable = 'Production|Cement'")
my_new_table = SourceAsset(key="PRODUCTION")
@asset(io_manager_key= "pd_snowflake_manager", ins={"upstream": AssetIn("PRODUCTION")})
def my_derived_asset(upstream: pd.DataFrame):
return upstream.dropna()
sandy
02/27/2023, 6:28 PMio_manager_key= "pd_snowflake_manager"
on the production_cement
asset?Phillip Marks
02/28/2023, 10:17 AMpd_snowflake_manger
rather then snowflake_resource
I get the error
"AttributeError: 'DbIOManager' object has no attribute 'execute_query'" using context.resources.pd_snowflake_manager.execute_query("SQL")
does pd_snowflake_manger have an equivalent to `snowflake_resource`'s execute_query
?sandy
02/28/2023, 4:01 PMsnowflake_resource
with the pd_snowflake_manger
. But rather that you should keep doing what you're doing and also set the io_manager_key
Phillip Marks
02/28/2023, 4:32 PM@asset(required_resource_keys={"snow_resource"}, non_argument_deps={"NGFS"}, io_manager_key= "io_manager")
def production_cement(context) -> None:
context.resources.snow_resource.execute_query("create or replace view PRODUCTION as select * from NGFS where variable = 'Production|Cement'")
this
def production_cement(context) -> pd.DataFrame:
and this
def production_cement(context:pd.DataFrame):
don't
or can I modify my_derived_asset
to accept production_cement
as a pd Dataframesandy
02/28/2023, 4:32 PMis the idea that production_cement outputs a pandas df? If so, --> None works but doesn't output a pd Dataframeit doesn't need to output a pandas DF. putting the IO manager on the asset just tells downstream assets that depend on it how to load it
Phillip Marks
02/28/2023, 4:34 PMmy_derived_asset
to be downstream of production_cement?
@asset(io_manager_key= "pd_snowflake_manager", ins={"upstream": AssetIn("PRODUCTION")})
def my_derived_asset(upstream: pd.DataFrame):
return upstream.dropna()
sandy
02/28/2023, 4:36 PM@asset(io_manager_key= "pd_snowflake_manager")
def my_derived_asset(production_cement: pd.DataFrame):
return production_cement.dropna()
Phillip Marks
02/28/2023, 4:49 PMsandy
02/28/2023, 4:55 PMPhillip Marks
02/28/2023, 4:58 PM