As I already posted in the <support channel> (but ...
# dagster-feedback
m
As I already posted in the support channel (but guess it better fits here) the
@root_input_manager
and
@io_manager decorator
IMHO should work the same way. Especially as
RootInputManager
and
IOManager
do behave so similar. At the moment
@root_input_manager
in contrast to
@io_manager
creates a new
RootInputManager
itself. That's why you can't simply write
Copy code
class DatabaseManager(RootInputManager, IOManager):
    def handle_output(self, context, obj):
        ...

    def load_input(self, context):
        ...

@io_manager(required_resource_keys={"database_client"})
def database_io_manager():
    return DatabaseManager()

@root_input_manager(required_resource_keys={"database_client"})
def database_root_manager():
     return DatabaseManager()
but you have to write
Copy code
@root_input_manager(required_resource_keys={"database_client"})
def database_root_manager(context: InputContext):
    manager = DatabaseManager()
    return manager.load_input(context)
I know that it is not a big deal, but just want to leave some feedback while @root_input_manager is still experimental.
s
hey @medihack - have you tried this out? I believe at one point what you're suggesting actually worked
m
Yes I tried it out, and it doesn't seems to work (v0.13.11). I get a type check error (even
load_input
returns a
DataFrame
and input type of the op is a
DataFrame
).
s
would it be easy for you to share the error you hit?
m
Sure, when using the decorator without a context parameter (which would be most similar to
@io_manager
):
Copy code
@root_input_manager(required_resource_keys={"database_client"})
def database_root_manager():
    return DatabaseManager()
I get this error
Copy code
dagster.core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "snp_mapdata" of step "load_snp_metadata":
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/execution/plan/execute_plan.py", line 195, in _dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/execution/plan/execute_step.py", line 287, in core_dagster_event_sequence_for_step
    for event_or_input_value in ensure_gen(step_input.source.load_input_object(step_context)):
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/execution/plan/inputs.py", line 161, in load_input_object
    yield _load_input_with_input_manager(loader, load_input_context)
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/execution/plan/inputs.py", line 574, in _load_input_with_input_manager
    with solid_execution_error_boundary(
  File "/home/zardoz/.pyenv/versions/3.10.1/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/execution/plan/utils.py", line 73, in solid_execution_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
TypeError: database_root_manager() takes 0 positional arguments but 1 was given
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/execution/plan/inputs.py", line 584, in _load_input_with_input_manager
    value = input_manager.load_input(context)
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/storage/root_input_manager.py", line 161, in load_input
    return self._load_fn(context)
And with the context parameter
Copy code
@root_input_manager(required_resource_keys={"database_client"})
def database_root_manager(_context):
    return DatabaseManager()
I get this error
Copy code
dagster.core.errors.DagsterTypeCheckDidNotPass: Type check failed for step input "snp_mapdata" - expected type "DataFrame". Description: Value of type <class 'snp_collector.resources.database_manager.DatabaseManager'> failed type check for Dagster type DataFrame, expected value to be of Python type pandas.core.frame.DataFrame.
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/execution/plan/execute_plan.py", line 195, in _dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/execution/plan/execute_step.py", line 295, in core_dagster_event_sequence_for_step
    for evt in check.generator(
  File "/home/zardoz/.cache/pypoetry/virtualenvs/snp-5C0jmVAp-py3.10/lib/python3.10/site-packages/dagster/core/execution/plan/execute_step.py", line 193, in _type_checked_event_sequence_for_input
    raise DagsterTypeCheckDidNotPass(
(also strange that it ends with an opening bracket even on the console)
Also just upgraded to Dagster v0.13.12. Hope this helps.
s
ooh I see. what if you do this:
Copy code
class DatabaseManager(RootInputManager, IOManager):
    def handle_output(self, context, obj):
        ...

    def load_input(self, context):
        ...

@io_manager(required_resource_keys={"database_client"})
def database_io_manager():
    return DatabaseManager()

@job(resource_defs={"my_root_input_manager": database_io_manager, "io_manager": database_io_manager}):
    ...
m
@sandy I already have them in there like this:
Copy code
resource_defs={
        ...
        "database_io_manager": database_io_manager,
        "database_root_manager": database_root_manager,
        ...
    },
I also specify them completely in the op:
Copy code
@op(
    config_schema={"batch_size": int},
    ins={
        "snp_mapdata": In(
            root_manager_key="database_root_manager",
            dagster_type=DataFrame,
        )
    },
    out={
        "snp_metadata": Out(
            io_manager_key="database_io_manager",
            dagster_type=DataFrame,
        )
    },
)
But from looking at the Dagster source code I am unsure if this scenario is really implemented. It seems to me that always a new RootInputManager is implicitly created and you can't just return an own one. It's easy to work around this (as in my example above), but just IMHO a bit cumbersome from an API design perspective (especially as the
RootInputManager
is so similar to the
IOManager
). I can absolutely live with it, just wanna give some feedback ;-)
s
My suggestion is to try supplying
database_io_manager
as the value for your
"database_root_manager"
key, instead of
database_root_manager
.
🎉 1
m
@sandy Ah sure, now I get it. I should have read it more precisely. This makes totally sense. How could I not think of that?! Thanks a lot for your help. Really love Dagster!
s
I hope it works out! Definitely wouldn't expect it to be an obvious thing to try
m
Yeah, but I don't think it is such a rare use case to just reuse the IO manager for loading data when just running a part of a job (especially when debugging complex and long running ones).
s
I agree with you. This is something that is currently more difficult in Dagster than it ideally should be