When creating multiple assets using same output st...
# ask-community
w
When creating multiple assets using same output storage (mongoDB) but store in different paths (e.g. different database / collection name, etc). In this case, can we use single io manager that can parse or accept different params related to storage paths so it can run ‘handle_output’ for different asset properly? I was thinking to assign dedicate io-manager (with different io-manager-key) for these assets, but want to be clever and use just single io-manager if possible. It seems load_input or handle_output has context in it’s parameter but wonder how can we provide these params through configuration for assets (not jobs)
c
Hi Wonjae. I think it's possible to write your own IO manager that accepts config containing the different possible paths. Then, in your
load_input
/
handle_output
methods you could call
context.asset_key
to determine which path you'd like the asset to write to.
Though, I think it might just be easier to create two different IO managers so the IO managers don't need to be asset-aware
❤️ 1
w
Thanks @claire for prompt response! I want to ask one more question 🙂 When I try to materialize my asset using branchingIOManager (read from BQ, write to MongoDB), I got the error: “FileNotFoundError: [Errno 2] No such file or directory” The code ran with upath_io_manager (fs_io_manager) to read when I set io_manager_key with BranchingIOManager like this: @asset( name=table_name + “_asset”, key_prefix=[“bigquery”], group_name=“BigQuery”, partitions_def=PER_COMPANY_PARTITION_DEF, ins = { “table”: AssetIn(table_name) }, io_manager_key=“name_of_branching_io_manager” ) def _bigquery_asset(table: pd.DataFrame) -> pd.DataFrame: return table
More full error logs FYI: File “/Users/wonjaelee/Library/Python/3.10/lib/python/site-packages/dagster/_core/execution/plan/utils.py”, line 54, in op_execution_error_boundary yield File “/Users/wonjaelee/Library/Python/3.10/lib/python/site-packages/dagster/_core/execution/plan/inputs.py”, line 826, in _load_input_with_input_manager value = input_manager.load_input(context) File “/Users/wonjaelee/Library/Python/3.10/lib/python/site-packages/dagster/_core/storage/upath_io_manager.py”, line 210, in load_input return self._load_single_input(path, context, backcompat_path) File “/Users/wonjaelee/Library/Python/3.10/lib/python/site-packages/dagster/_core/storage/upath_io_manager.py”, line 144, in _load_single_input raise e File “/Users/wonjaelee/Library/Python/3.10/lib/python/site-packages/dagster/_core/storage/upath_io_manager.py”, line 134, in _load_single_input obj = self.load_from_path(context=context, path=path) File “/Users/wonjaelee/Library/Python/3.10/lib/python/site-packages/dagster/_core/storage/fs_io_manager.py”, line 172, in load_from_path with path.open(“rb”) as file: File “/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/pathlib.py”, line 1119, in open return self._accessor.open(self, mode, buffering, encoding, errors, The above exception occurred during handling of the following exception: FileNotFoundError: [Errno 2] No such file or directory: ’/Users/wonjaelee/Documents/GitHub/kelp/tmpv08j6_u2/storage/dim_company_events_active/staging|2023-04-01' File “/Users/wonjaelee/Library/Python/3.10/lib/python/site-packages/dagster/_core/storage/upath_io_manager.py”, line 138, in _load_single_input obj = self.load_from_path(context=context, path=backcompat_path) File “/Users/wonjaelee/Library/Python/3.10/lib/python/site-packages/dagster/_core/storage/fs_io_manager.py”, line 172, in load_from_path with path.open(“rb”) as file: File “/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/pathlib.py”, line 1119, in open return self._accessor.open(self, mode, buffering, encoding, errors,
I’m using “IOManagerDefinition.hardcoded_io_manager” to convert my IOmanager to IOmanagerDefinition. Can this be the reason??
c
Hi Wonjae. Likely not related to using the
.hardcoded_io_manager
function. I'm assuming the upstream asset is
dim_company_events_active
with partition
taging|2023-04-01
? Can you double check that that partition was materialized using the
fs_io_manager
? If so, I'm wondering if either of these files also exist for the upstream asset: •
taging/2023-04-01
2023-04-01/taging
We recently changed the path that multipartitioned asset partitions were stored in the
fs_io_manager
, so wondering if that could also be interfering
❤️ 1
The run logs for the upstream asset should show where it was persisted, if using the
fs_io_manager
w
Thanks for the response! I think you are right. The upstream asset seems to be handled with default io_manager (attached img). Our upstream asset is materialized through “load_assets_from_dbt_project” with resource”dbt_cli_resource.configured” and our config contains outputs - type - bigquery. This is why I was able to check the materialized table in Bigquery. But in this case I wonder what can be the way that the downstream io-manager can load the upstream asset from BQ, not fs.
I think providing (ins={assetin(io_manager_key=…) for asset force to use my io manager to trying load input. I’m going to check if there is built-in io manager for BQ (not IOManagerDefinition like “bigquery_pandas_io_manager”) but plz let me know if you have an idea already 🙂 But if I provide explicit input manager, then I also need to provide the configs for querying (e.g. table name, conditions will be used in WHERE, etc) right? (When using dbt asset to dbt asset, they were able to infer those info through partitions I believe)
c
Ah yeah, seems like the upstream asset output was written to your filesystem using the
fs_io_manager
, and your downstream asset loads the upstream asset using the
fs_io_manager
. I think if you want to load the upstream asset from bigquery instead, you'll need to swap out the IO manager for a bigquery IO manager. I think you should be able to customize the bigquery IO manager and provide the desired config following the docs here: https://docs.dagster.io/integrations/bigquery/reference#using-pandas-and-pyspark-dataframes-with-bigquery
❤️ 1
w
quick follow up questions: if use
Copy code
@asset(
ins={"table": AssetIn(input_manager_key="bq_custom_manager")},
io_manager_key='different_io_manager')
will it load data through ‘bq_custom_manager’ and store output to ‘different_io_manager’??
c
Yes, I believe so. The input manager is only responsible for loading the input and the IO manager handles the output
❤️ 1
w
That works! Thanks a lot for your supports :)