How is the `@asset` decorator’s `io_manager_key` p...
# ask-community
s
How is the
@asset
decorator’s
io_manager_key
parameter supposed to work? I can’t seem to get it to recognize a custom IOManager even with a very minimal case (code in thread)
This is the bare minimum, taken from examples:
Copy code
class MyIOManager(IOManager):
    def _get_path(self, context) -> str:
        return "/".join(context.asset_key.path)

    def handle_output(self, context, obj):
        pass

    def load_input(self, context):
        return 1


@io_manager
def my_io_manager(init_context):
    return MyIOManager()


@asset(io_manager_key="my_io_manager")
def my_asset(context):
    pass
When I try to load this, I get an error:
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: io manager with key 'my_io_manager' required by output 'result' of op 'my_asset'' was not provided. Please provide a <class 'dagster._core.storage.io_manager.IOManagerDefinition'> to key 'my_io_manager', or change the required key to one of the following keys which points to an <class 'dagster._core.storage.io_manager.IOManagerDefinition'>: ['io_manager']
I will note that using
io_manager_def
works, for example:
Copy code
@asset(io_manager_def=my_io_manager)
def my_asset(context):
    pass
But that parameter is marked as “experimental” so I wont use it until it’s released.
z
are you mapping your IO manager to your asset in your definition? otherwise I don't think Dagster knows what IO manager to use, the "io_manager_key" argument is just a string that dagster uses to look up what resource definition key to look for the io manager in. see this example
s
Adding
Copy code
defs = Definitions(
    assets=[my_asset],
    resources={
        "my_io_manager": my_io_manager
    },
)
at the end of the file has no effect
Here is the complete file, which errors with the same error I posted above:
Copy code
from dagster import asset, IOManager, io_manager, Definitions

class MyIOManager(IOManager):
    def _get_path(self, context) -> str:
        return "/".join(context.asset_key.path)

    def handle_output(self, context, obj):
        pass

    def load_input(self, context):
        return 1


@io_manager
def my_io_manager(init_context):
    return MyIOManager()


@asset(io_manager_key="my_io_manager")
def my_asset(context):
    pass

defs = Definitions(
    assets=[my_asset],
    resources={
        "my_io_manager": my_io_manager
    },
)
This remains a problem for me. Should I make a Github issue, since this is more about the Python API?
a
@Spencer Nelson Howdy friend! Just FYI I think we have this working properly now. The thing that breaks is if you try to create a
@job
, it will not properly load the resource definitions. However, if you use
define_asset_job
, it loads in the global resource definitions just fine.
👍 1
174 Views