Rahul Dave
02/16/2023, 8:40 AM@input_manager
decorator needs to decorate the function that will instantiate the internal io-manager?dagster._core.errors.DagsterInvalidDefinitionError: input manager with key 'model_input_manager' required by input 'encoders' of op 'transformer_op' was not provided. Please provide a <class 'dagster._core.storage.input_manager.IInputManagerDefinition'> to key 'model_input_manager', or change the required key to one of the following keys which points to an <class 'dagster._core.storage.input_manager.IInputManagerDefinition'>: ['io_manager', 'output_notebook_io_manager']
But the two share a common interface, so instead of duplicating code, what ought i do?# in this example, TableIOManager is defined elsewhere and we just want to override load_input
class Table1IOManager(TableIOManager):
def load_input(self, context):
return read_dataframe_from_table(name="table_1")
@io_manager
def table_1_io_manager():
return Table1IOManager()
@job(resource_defs={"load_input_manager": table_1_io_manager})
def io_load_table_job():
my_op()
In all of these examples, setting the input_manager_key on an In controls how that input is loaded.
But passing an io manager as an inout manager gives the error aboveclass FixedPathIOManager(UPathIOManager):
class FixedPathInputManager(InputManager):
extension: str = ".joblib"
def _get_path(self, context) -> str:
<http://context.log.info|context.log.info>(context.resource_config)
<http://context.log.info|context.log.info>(type(context))
return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
def load_input(self, context):
<http://context.log.info|context.log.info>("in load input")
if context.upstream_output is None: # input manager
path = self._get_path(context)
else:
path = self._get_path(context.upstream_output)
with path.open("rb") as file:
return joblib.load(file)
@input_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_model_fixedpath_input_manager(
init_context: InitResourceContext,
) -> FixedPathInputManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return FixedPathInputManager(base_path=base_path)
results in the same error: it is as if inheriting from InputManager is not enough