Hi Community! I am currently defining my custom IO...
# ask-community
d
Hi Community! I am currently defining my custom IO Manager for a software-defined asset using the
ConfigurableIOManager
. There are a lot of parameters for this IO Manager, that are dependent on the partition that the asset is working with. For example, the custom IO manager will create a new table. The table name should be parameterized, based on partition-name and other metadata of the partition. How can I pass this kind of information to the custom IO Manager? I read about the Run Configuration (
Config
) which you can use in assets. But I guess I cannot use it in an IO Manager? Or can I? The documentation only shows it with assets... The other idea I have, is to pass these parameters to the @asset decorator, and then set the context metadata, which I can access in the IO manager:
Copy code
@asset(
out=Out(metadata={"table_schema": "my_table_schema"},
        io_manager_key="my_custom_io_manager")
)
But this of course only works for the output of the asset, which is handled by the
handle_output
function of my custom IO manager. Is there also a way to give metadata to the input
load_input
? What is the right way to go? Any other options? Any best-practices?
j
Hey @David Weber you can definitely achieve all of this! To start with, here is a simple example of add configuration to a ConfigurableIOManager https://docs.dagster.io/concepts/io-management/io-managers#defining-an-io-manager As for getting partition information and metadata in the IO manager, all of that you will find in the
context
argument that is automatically supplied by dagster to the
handle_output
and
load_input
functions. An important thing to know is that in the
load_input
function, you can get the context from the corresponding
handle_output
for the asset using
context.upstream_output
. So for example, if you had
Copy code
@asset(
out=Out(metadata={"table_schema": "my_table_schema"},
        io_manager_key="my_custom_io_manager")
)
in
handle_output
you could get the metadata like this
Copy code
def handle_output(self, context, obj):
    metadata = context.metadata
and in the
load_input
you would get it like this
Copy code
def load_input(self, context):
    metadata = context.upstream_output.metadata
I recommend looking at the InputContext and OutputContext classes to see what is available to you
We have quite a few built in IO managers that do some pretty complex stuff, so if you’re looking for inspiration I can point you to some relevant examples!
d
Hi @jamie! Thanks for your help - I will try to build it that way and report back. The "complication" in our case is that the upstream asset is actually not using an IO manager, as it doesn't make sense (it copies the partition from one bucket to the other). Which means, we have to build the dependency like this:
non_argument_deps={"asset1"}
Does this change anything about the approach? So I understand your comment the following: In asset 1 (which doesnt have an IO manager) I define the metadata I need later:
Copy code
@asset(
out=Out(metadata={"table_schema": "my_table_schema"}
)
def asset1() -> None:
    pass
Then I have my dependency to asset 2, which uses my custom IO manager:
Copy code
@asset(
required_resource_keys={"my_custom_io_manager"},
non_argumemt_deps={"asset1"}
)
def asset2():
    pass
And in
my_custom_io_manager
, which is used in asset2, I should be able to do the following:
Copy code
...
def load_input(self, context: InputContext) -> Any:
    metadata = context.upstream_output.metadata

def handle_output(self, context: OutputContext, obj) -> None:
    metadata = context.upstream_output.metadata
Maybe you can give me some feedback on that! Thanks, really appreciated!
j
Ah ok you are on the right track but there’s one thing to change. So when you have
non_argument_deps
specified, there is actually no IO manager invoked. The idea for non-argument deps is actually the opposite of what you want, it’s for the case when the upstream asset used an IO manager but in the downstream asset, you dont want to load any data, you just want to make sure the downstream asset only materializes after the upstream. What you’ll want is to set up the asset dependency like normal and then set a specific io manager for the downstream asset. here’s a code sample that specifies this better
Copy code
@asset(
out=Out(metadata={"table_schema": "my_table_schema"}
)
def asset1() -> None:
    pass

@asset(
required_resource_keys={"my_custom_io_manager"},
ins={"asset1": AssetIn(input_manager_key="my_custom_io_manager")}
)
def asset2(asset1):
    # when asset1 is loaded, the io manager specified by input_manager_key in AssetIn will be used 
    pass
❤️ 1
otherwise, I think everything else you wrote is correct. I’d definitely recommend setting up a super simple example set of assets to test it out and confirm everything is working as expected
d
Thanks a lot @jamie! The first challenge I already run into -
asset() got an unexpected keyword argument 'out'.
Using Dagster version 1.3.4.
j
oh sorry, my morning brain totally missed that. You can specify metadata directly on the
@asset
decorator. Since each asset only has one output, we know that the metadata is associated with that output (unlike the op world where each op may have multiple outputs)
Copy code
@asset(
metadata={"table_schema": "my_table_schema"}
)
def asset1() -> None:
    pass
🌈 1
d
Got it, thank you so much! 🙂
j
no problem! let me know if anything else comes up
❤️ 1