Nick
05/24/2023, 9:07 AMrdw_org_loc_dm = SourceAsset(io_manager_key="rdw_io_manager", key=AssetKey("rdw_org_loc_dm"))
@asset(io_manager_key="sql_server_io_manager", key_prefix=["ref", str(orgSys + "_" + orgTable)])
def rdw_org_loc_dm_to_sql(rdw_org_loc_dm: pd.DataFrame):
df = rdw_org_loc_dm
return Output( # The return value is updated to wrap it in `Output` class
value=df, # The original df is passed in with the `value` parameter
metadata={
"num_records": len(df), # Metadata can be any key-value pair
"preview": MetadataValue.md(df.head().to_markdown()),
# The `MetadataValue` class has useful static methods to build Metadata
}
)
The defs is as follows....
defs = Definitions(
assets=all_assets,
jobs=[stock_job],
resources={
"sql_server_io_manager": sql_server_io_manager.SQLServerIOManager(
user=EnvVar("SQL_USER"),
password=EnvVar("SQL_PASSWORD"),
database="db_Staging",
table="dim_date"
),
"rdw_io_manager": rdw_io_manager.RDWIOManager(
table= "rdw_org_loc_dm"
)
},
)
The error I get when I run the job is the screenshot below. No extra errors in the console. It doesn't try to use the io manager as far as I can tell.
I am really stumped, any help would be appreciated. Or a working example?Aaron T
05/24/2023, 11:16 AM