Hi team, we made huge progress over the last weeks...
# ask-community
d
Hi team, we made huge progress over the last weeks in our Dagster implementation, as we are currently doing a whole architecture rework. One concept we still struggle with for a bit, is I/O Managers and context - especially passing job information throughout all the assets and IO managers. I want to share our rough setup to make it more explainable: • We use DynamicPartitions, as we receive partitioned data deliveries in ADLS. • We use resources whenever we interact with "external" systems, like ADLS • We write a custom IO manager every time when we... ◦ interact with an external system and load data into memory for further transformation (basically when we want this data to transform in the asset) ◦ and we want to offload some of the "IO" logic inside the asset to an IO manager. • We don't use IO managers when we... ◦ do not use data into memory, and can use resources to do stuff, like copying a file in ADLS Detailed info is in the diagram I quickly came up with 🙂 Now to the questions: 1. As you see in the diagram, some of our assets use IO managers, some don't. How I currently make sure that ASSET 3 uses the custom IO manager we defined, although it depends on ASSET 2 which doesn't have an IO manager:
Copy code
@asset(
    ...
    non_argument_deps="asset1",
    partitions_def=our_partitions_def
    ...
)
def asset2(context: OpExecutionContext) -> Output[None]:
    print("Asset2, I do not use an IO manager, I return None")


@asset(
    required_resource_keys={"pyspark_step_launcher"},
    partitions_def=our_partitions_def,
    io_manager_key="our_custom_io_manager",
    metadata={"key1": "value1"},
    ins={"asset2": AssetIn(key=asset2,
                           input_manager_key="our_custom_io_manager",
                           metadata={"key2": "value2", "key3": "value3"})
        }
)
def asset3(context: OpExecutionContext, asset2):
    print("Asset3, I want to use the custom IO manager")
It feels weird that I have to specify our custom IO manager twice (in
ins={AssetIn()}
and in
io_manager_key
) to make sure that both
load_input
and
handle_output
are used in
asset3
. Is there a better option to use IO managers, that we are not aware of? Especially when we do not intend to use an IO manager for
asset2
? 2. The code also shows, that we use the metadata argument both in the
@asset
decorator and in the
ins={AssetIn(asset2)}
argument. We have a lot of data we want to pass to all assets and corresponding IO managers throughout the job. I can then access the metadata from
ins={AssetIn(asset2)}
by calling
context.metadata
in the custom IO manager
our_custom_io_manager
. However again, this feels weird. Is "metadata" the only way to pass additional information to IO managers? • In assets it is easy: You have one asset-instance per configuration (we use an asset-factory), so you can pass this additional meta-information directly to the asset as an input param. • For IO managers this doesn't work: You specify your IO manager once, and then give it a configuration. But this configuration doesn't change with every asset-instance... So let's say the
DynamicPartition
we work with in a job has a lot of metadata (file_name, interface, entity, version, ...) that we want to pass to every asset and every IO manager across our whole job. What is the best way to do this? Using "metadata" doesn't seem fully right. We would be grateful for some guidance and general architecture tips that fit our setup!
@jamie I hope it is fine if I tag you here - you were able to help us last time with the "metadata" stuff. 🙂
CC: @Döme Lőrinczy @Sascha Kellner
@sandy Maybe you can have a look as well (especially for 1.)
s
For (1), asset2 can have an IO manager and still return
None
, and then your custom IO manager can see the
None
and do a no-op. This is what the builtin IO managers do. Then you won't need to set an IO manager on the
AssetIn
.
d
Thanks @sandy. Would love to see an example of that, in case you guys have one (or show me where it is implemented so I can copy it in my custom manager). But generally, I would say it doesn't really solve the issue. The
custom_io_manager
reads from dbfs, and returns a dataframe. Something that is completely unrelated to the Unzip Asset. So why should I specify the IO manager there? Just to get rid of the
AssetIn
in the following
asset3
- for that it seems like a workaround...
s
you could also just use
non_argument_deps
if you don't want to use IO managers in this situation. ultimately, you either need to: • opt out of IO managers for that input by using
non_argument_deps
• specify somewhere how you want the input to be loaded, either on the upstream asset or the downstream
AssetIn
d
Thanks @sandy. From user perspective it feels weird though that I have data in asset3, that I want to be loaded and saved with my custom IO manager. And if I want to achieve this, I have to specify it twice -
AssetIn
and
io_manager_key
. But generally, do you see any major things we could improve here? And do you have some input for (2) on how we could handle this topic? Thanks, David