https://dagster.io/ logo
Title
r

Rahul Dave

02/16/2023, 4:34 PM
I wanted to bump this InputManager not acting as expected error:
dagster._core.errors.DagsterInvalidDefinitionError: input manager with key 'model_input_manager' required by input 'encoders' of op 'transformer_op' was not provided. Please provide a <class 'dagster._core.storage.input_manager.IInputManagerDefinition'> to key 'model_input_manager', or change the required key to one of the following keys which points to an <class 'dagster._core.storage.input_manager.IInputManagerDefinition'>: ['io_manager', 'output_notebook_io_manager']
which comes from the following definitions:
class FixedPathInputManager(InputManager):
    extension: str = ".joblib"

    def _get_path(self, context) -> str:
        <http://context.log.info|context.log.info>(context.resource_config)
        <http://context.log.info|context.log.info>(type(context))
        return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")

    def load_input(self, context):
        <http://context.log.info|context.log.info>("in load input")
        if context.upstream_output is None: # input manager
            path = self._get_path(context)
        else:
            path = self._get_path(context.upstream_output)
        with path.open("rb") as file:
            return joblib.load(file)

@input_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_model_fixedpath_input_manager(
    init_context: InitResourceContext,
) -> FixedPathInputManager:
    assert init_context.instance is not None  # to please mypy
    base_path = UPath(
        init_context.resource_config.get(
            "base_path", init_context.instance.storage_directory()
        )
    )
    return FixedPathInputManager(base_path=base_path)
and which is used as so:
local_train_transformer_job = transformer_graph.to_job(
    name="train_transformer_job",
    resource_defs={
        "output_notebook_io_manager": local_output_notebook_io_manager,
        "data_file": current_training_data, 
        "data_type": train_type,
        "encoder_file": encoder_file,
        "model_input_manager": local_model_fixedpath_input_manager,
        "lake_io_manager": local_pandas_parquet_io_manager,
    }
)
According to the docs this definition of an InputManager should be enough? Am i being really stupid and missing something super-obvious?