David Weber
06/29/2023, 2:19 PM@asset(
...
non_argument_deps="asset1",
partitions_def=our_partitions_def
...
)
def asset2(context: OpExecutionContext) -> Output[None]:
print("Asset2, I do not use an IO manager, I return None")
@asset(
required_resource_keys={"pyspark_step_launcher"},
partitions_def=our_partitions_def,
io_manager_key="our_custom_io_manager",
metadata={"key1": "value1"},
ins={"asset2": AssetIn(key=asset2,
input_manager_key="our_custom_io_manager",
metadata={"key2": "value2", "key3": "value3"})
}
)
def asset3(context: OpExecutionContext, asset2):
print("Asset3, I want to use the custom IO manager")
It feels weird that I have to specify our custom IO manager twice (in ins={AssetIn()}
and in io_manager_key
) to make sure that both load_input
and handle_output
are used in asset3
. Is there a better option to use IO managers, that we are not aware of? Especially when we do not intend to use an IO manager for asset2
?
2. The code also shows, that we use the metadata argument both in the @asset
decorator and in the ins={AssetIn(asset2)}
argument. We have a lot of data we want to pass to all assets and corresponding IO managers throughout the job. I can then access the metadata from ins={AssetIn(asset2)}
by calling context.metadata
in the custom IO manager our_custom_io_manager
. However again, this feels weird. Is "metadata" the only way to pass additional information to IO managers?
• In assets it is easy: You have one asset-instance per configuration (we use an asset-factory), so you can pass this additional meta-information directly to the asset as an input param.
• For IO managers this doesn't work: You specify your IO manager once, and then give it a configuration. But this configuration doesn't change with every asset-instance...
So let's say the DynamicPartition
we work with in a job has a lot of metadata (file_name, interface, entity, version, ...) that we want to pass to every asset and every IO manager across our whole job. What is the best way to do this? Using "metadata" doesn't seem fully right.
We would be grateful for some guidance and general architecture tips that fit our setup!David Weber
06/29/2023, 2:20 PMDavid Weber
06/29/2023, 2:24 PMDavid Weber
06/29/2023, 2:35 PMsandy
06/29/2023, 2:58 PMNone
, and then your custom IO manager can see the None
and do a no-op. This is what the builtin IO managers do. Then you won't need to set an IO manager on the AssetIn
.David Weber
06/29/2023, 3:04 PMcustom_io_manager
reads from dbfs, and returns a dataframe. Something that is completely unrelated to the Unzip Asset. So why should I specify the IO manager there? Just to get rid of the AssetIn
in the following asset3
- for that it seems like a workaround...sandy
06/29/2023, 3:15 PMnon_argument_deps
if you don't want to use IO managers in this situation. ultimately, you either need to:
• opt out of IO managers for that input by using non_argument_deps
• specify somewhere how you want the input to be loaded, either on the upstream asset or the downstream AssetIn
David Weber
06/30/2023, 12:37 PMAssetIn
and io_manager_key
. But generally, do you see any major things we could improve here?
And do you have some input for (2) on how we could handle this topic?
Thanks, David