https://dagster.io/ logo
#ask-community
Title
# ask-community
r

Rahul Dave

02/16/2023, 8:40 AM
It seems an IO-manager can be an input manager at the same time? But it seems the
@input_manager
decorator needs to decorate the function that will instantiate the internal io-manager?
I ask this as i get an error like so:
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: input manager with key 'model_input_manager' required by input 'encoders' of op 'transformer_op' was not provided. Please provide a <class 'dagster._core.storage.input_manager.IInputManagerDefinition'> to key 'model_input_manager', or change the required key to one of the following keys which points to an <class 'dagster._core.storage.input_manager.IInputManagerDefinition'>: ['io_manager', 'output_notebook_io_manager']
But the two share a common interface, so instead of duplicating code, what ought i do?
The docs seem to indicate this use case:
Copy code
# in this example, TableIOManager is defined elsewhere and we just want to override load_input
class Table1IOManager(TableIOManager):
    def load_input(self, context):
        return read_dataframe_from_table(name="table_1")


@io_manager
def table_1_io_manager():
    return Table1IOManager()


@job(resource_defs={"load_input_manager": table_1_io_manager})
def io_load_table_job():
    my_op()
In all of these examples, setting the input_manager_key on an In controls how that input is loaded.
But passing an io manager as an inout manager gives the error above
I am inheriting off the UPathIoManager, perhaps it needs a mixin? (
class FixedPathIOManager(UPathIOManager):
surprisingly, even simplifying the structure:
Copy code
class FixedPathInputManager(InputManager):
    extension: str = ".joblib"

    def _get_path(self, context) -> str:
        <http://context.log.info|context.log.info>(context.resource_config)
        <http://context.log.info|context.log.info>(type(context))
        return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
    

    def load_input(self, context):
        <http://context.log.info|context.log.info>("in load input")
        if context.upstream_output is None: # input manager
            path = self._get_path(context)
        else:
            path = self._get_path(context.upstream_output)
        with path.open("rb") as file:
            return joblib.load(file)

@input_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_model_fixedpath_input_manager(
    init_context: InitResourceContext,
) -> FixedPathInputManager:
    assert init_context.instance is not None  # to please mypy
    base_path = UPath(
        init_context.resource_config.get(
            "base_path", init_context.instance.storage_directory()
        )
    )
    return FixedPathInputManager(base_path=base_path)
results in the same error: it is as if inheriting from InputManager is not enough
3 Views