Arthur
12/07/2022, 4:16 PM@asset(ins={"app_accounts": AssetIn(input_manager_key="stagecoach_io_manager")})
def ebdb_accounts(context,app_accounts):
return app_accounts
to no avail it gives this error.
dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["app_accounts"]' for asset '["ebdb_accounts"]' is not produced by any of the provided asset ops and is not one of the provided sources
(edited)Zach
12/07/2022, 4:28 PMArthur
12/07/2022, 4:34 PMclass MYSQLMANAGER(IOManager):
def __init__(self, config):
self._config = config
def handle_output(self, context:OutputContext, obj):
context.add_output_metadata(
{
"preview": MetadataValue.md(obj.head().to_markdown()),
"description": MetadataValue.md(obj.describe().to_markdown()),
"num_rows": len(obj)
}
)
return None
def load_input(self, context:InputContext) -> pd.DataFrame:
table = context.asset_key.path[-1]
with warnings.catch_warnings():
warnings.simplefilter('ignore', UserWarning)
with connect_mysql(config=self._config) as conn:
sql = f"select * from {table}".format(table)
return pd.read_sql_query(sql, conn)
Ignore the handle_output I was more interested in load_input and I'd assumed i'd pass the name of the database table in the asset so it can be loaded but Im very wrong but cant work out how. If i remove the ns={"app_accounts": AssetIn(input_manager_key="stagecoach_io_manager")}
I dont see how to tell the asset what database table to loadZach
12/07/2022, 4:35 PMapp_accounts
needs to be defined either as an asset or a SourceAsset. is it already defined?Arthur
12/07/2022, 4:36 PMZach
12/07/2022, 4:38 PMArthur
12/07/2022, 4:42 PM