https://dagster.io/ logo
#ask-community
Title
# ask-community
r

Rahul Dave

02/12/2023, 7:27 PM
Folks, I'd like to override the default fs_iomanager for some outputs (and corresponding inputs). I notice that the canonical way the pickles are stored are in the
tmp_xxx/run/step
formation. I'd like my own path (which i plan to version using artifact storage such as mlflow). It seems I can pass this into the fs_iomanager definition. However, I am not sure how to stop it from using the canonical run/step format. In my case I want the old files to be rewritten in a new run, versioning is handled elsewhere. Is this doable in fs_iomanager, or should I be inheriting from the
UPathIoManager
as shown here: https://docs.dagster.io/concepts/io-management/io-managers#custom-filesystem-based-io-manager . And will implementing
load_from_path
and
dump_to_path
be enough to get the path completely custom? Or must I do something more to eliminate the run/step hierarchy...?
o

owen

02/13/2023, 5:52 PM
hi @Rahul Dave! To do this, you would want to inherit from the UPathIoManager. Implementing just those two methods would work fine for your purposes.
r

Rahul Dave

02/13/2023, 6:03 PM
Thanks! Will go try!
@owen I tried this but find that it still wants to save files with run-id and step id. Using the PandasParquetInputManager from the docs i get my parquet file at:
warehouse/badd02d4-f664-48ff-b636-d3e356657d08/encoder_op/encoders.parquet
(warehouse is the base-path config i provided:
Copy code
resources:
  model_io_manager:
    config:
      base_path: warehouse
The code from the docs:
Copy code
class PandasParquetIOManager(UPathIOManager):
    extension: str = ".parquet"

    def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
        with path.open("wb") as file:
            obj.to_parquet(file)

    def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
        with path.open("rb") as file:
            return pd.read_parquet(file)

@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pandas_parquet_io_manager(
    init_context: InitResourceContext,
) -> PandasParquetIOManager:
    assert init_context.instance is not None  # to please mypy
    base_path = UPath(
        init_context.resource_config.get(
            "base_path", init_context.instance.storage_directory()
        )
    )
    return PandasParquetIOManager(base_path=base_path)
Now clearly i could strip in both
dump_to_path
and
load_to_path
the run-id and step-id before saving and reading...but is this the right way to do it? Wont it cause a mismatch between what dagit reports and where the file is? Is there a "proper" way to do this?
o

owen

02/16/2023, 12:23 AM
ah I see what you mean -- the manual stripping was what I had in mind but you're correct that this would cause conflicts with the metadata attached to the outputs. I don't think there's currently a proper way to customize the output path to this degree. any other solution would end up needing to override private properties of the class (specifically
_get_path
), which would probably be fine but would not be guaranteed to remain compatible with the base definition over time.
I think providing a non-private hook into the path-generating behavior would make a ton of sense, but I think overriding
_get_path
would be the way to go for now (sorry for leading you in the wrong direction!)
r

Rahul Dave

02/16/2023, 12:27 AM
@owen One thing I am missing is how it constructs the filename. I am guessing this comes from variables in the init_context? Here, in
encodrs.parquet
the "encoders" is obtained from the output variable of the op. But how do i access this from the context?
A perusal of the source code also seems to indicate that i could use
CustomPathPickledObjectFilesystemIOManager
but I am no closer to understanding how the "run-is/step-id/variable" path is constructed
Aha, there is a
get_identifier
. And the
encoder
is coming from the result name, default result. I used
encoders
. Still having trouble understanding all the properties of the context though: specificlly how to access the config from them!
Ok, I'm almost there but something very bizarre is happening, perhaps because the op producing the output is inside a graph, which is made into a job. My iomanager:
Copy code
class FixedPathIOManager(UPathIOManager):
    extension: str = ".joblib"

    def _get_path(self, context) -> str:
        <http://context.log.info|context.log.info>(context.resource_config)
        <http://context.log.info|context.log.info>(type(context))
        return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
    
    def dump_to_path(self, context: OutputContext, obj, path: UPath):
        <http://context.log.info|context.log.info>("dump")
        <http://context.log.info|context.log.info>(context.resource_config)
        with path.open("wb") as file:
            joblib.dump(obj, file)

    def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
        <http://context.log.info|context.log.info>("load")
        <http://context.log.info|context.log.info>(context.resource_config)
        with path.open("rb") as file:
            return joblib.load(file)



@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_model_fixedpath_io_manager(
    init_context: InitResourceContext,
) -> FixedPathIOManager:
    assert init_context.instance is not None  # to please mypy
    base_path = UPath(
        init_context.resource_config.get(
            "base_path", init_context.instance.storage_directory()
        )
    )
    return FixedPathIOManager(base_path=base_path)
In
_get_path
the logging tells me that the
resource_config
has the key
base_path
but when it tries to return the UPath at the end of the function I ket a key-not-present-error:
Copy code
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "encoder_op":
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 265, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 382, in core_dagster_event_sequence_for_step
    for user_event in check.generator(
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 94, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 177, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 145, in _yield_compute_results
    for event in iterate_with_context(
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 459, in iterate_with_context
    return
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 85, in op_execution_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
KeyError: 'base_path'
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
    yield
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 457, in iterate_with_context
    next_output = next(iterator)
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagstermill/factory.py", line 313, in _t_fn
    value = io_manager.load_input(
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 137, in load_input
    path = self._get_path(context)
  File "/Users/rahul/Websites/mlops2_with_dagster/mlops2_with_dagster/participants.py", line 47, in _get_path
    return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
Even more strangely this seems to be in a
load_input
. The file
encoders.joblib
is actually written! So i get the feeling that because the op is inside a graph "something" is trying to reload it from this file and failing...Here is the graph and corresponding job definition:
Copy code
@graph(out = {'daencoders': GraphOut()},
)
def encoder_graph():
    df_train = read_train_data()
    df_test = read_test_data()
    daencoders, _ = encoder_op(df_test=df_test, df_train=df_train)
    return daencoders
Copy code
local_encoder_job = encoder_graph.to_job(
    name="local_encoder_job",
    resource_defs={
        "output_notebook_io_manager": local_output_notebook_io_manager,
        "training_data": current_training_data,
        "testing_data": current_testing_data,
        "model_io_manager": local_model_fixedpath_io_manager,
    }
)
And the config i am using in dagit launchpad to run the job:
Copy code
resources:
  model_io_manager:
    config:
      base_path: warehouse
Anyone understand what is going on? @owen? It seems very bizarre that the config would just dissapear. And the fact that the file is actually saved and there is a load_input involved makes me think that this error is happening somehow between the completion of the op and the completion of the graph, which is just a return: but perhaps return from graphs create io?
If i add a
Copy code
def load_input(self, context):
        <http://context.log.info|context.log.info>("in load input")
        path = self._get_path(context.upstream_output)
        with path.open("rb") as file:
            return joblib.load(file)
then things work! And i get two "dumps" printed out. Which tells me that the data is getting written twice, once at op finish, and once at graph/job finish. Is that avoidable?
o

owen

02/16/2023, 5:46 PM
hi again! I think you're seeing the two "dump" messages because the encoder_op has two outputs -- even though one output is ignored in your graph code, it'll still get written. So I think things may actually be working as expected (might be worth printing out the path in that "dump" message to make sure). One thing to note though is that your current scheme only varies the path based on the output name, which is pretty hard to keep unique. Unless you specify an output name, the
context.name
value will always be
result
, so basically any op without an explicit output name will end up writing to the same location. It might make sense to include the
context.step_key
in the path as well to make it easier to distinguish. The step key will stay static between runs so I think this still would work fine for your use case.
r

Rahul Dave

02/16/2023, 5:54 PM
So the other output is a notebook: should it not be handled via the notebook io manager (built into dagstermill). And why then would the system be calling
load_input
as evidenced by the error message? Or is there some kind of loop that gets called? I can confirm the writing is happening two times: when i got the error i still got one write happening. I feel i am missing something in my mental model of the software...
o

owen

02/16/2023, 5:58 PM
hm do you mind sharing the definition of encoder_op?
r

Rahul Dave

02/16/2023, 5:59 PM
Copy code
encoder_op = define_dagstermill_op(
    name="encoder_op",
    notebook_path=file_relative_path(__file__, "../notebooks/encoder.ipynb"),
    output_notebook_name="output_encoder",
    outs={"encoders": Out(dict, io_manager_key="model_io_manager")},
    ins={"df_train": In(pd.DataFrame), "df_test": In(pd.DataFrame)}
)
Copy code
@graph(out = {'daencoders': GraphOut()},
)
def encoder_graph():
    df_train = read_train_data()
    df_test = read_test_data()
    daencoders, _ = encoder_op(df_test=df_test, df_train=df_train)
    return daencoders
Copy code
local_encoder_job = encoder_graph.to_job(
    name="local_encoder_job",
    resource_defs={
        "output_notebook_io_manager": local_output_notebook_io_manager,
        "training_data": current_training_data,
        "testing_data": current_testing_data,
        "model_io_manager": local_model_fixedpath_io_manager,
    }
)
Thats the op-graph-job
and the io manager:
Copy code
class FixedPathInputManager(InputManager):
    extension: str = ".joblib"

    def _get_path(self, context) -> str:
        <http://context.log.info|context.log.info>(context.resource_config)
        <http://context.log.info|context.log.info>(type(context))
        return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
    

    def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
        <http://context.log.info|context.log.info>("load")
        <http://context.log.info|context.log.info>(context.resource_config)
        with path.open("rb") as file:
            return joblib.load(file)

    def load_input(self, context):
        <http://context.log.info|context.log.info>("in load input")
        if context.upstream_output is None: # input manager
            path = self._get_path(context)
        else:
            path = self._get_path(context.upstream_output)
        with path.open("rb") as file:
            return joblib.load(file)

@input_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_model_fixedpath_input_manager(
    init_context: InitResourceContext,
) -> FixedPathInputManager:
    assert init_context.instance is not None  # to please mypy
    base_path = UPath(
        init_context.resource_config.get(
            "base_path", init_context.instance.storage_directory()
        )
    )
    return FixedPathInputManager(base_path=base_path)
o

owen

02/16/2023, 6:02 PM
ah i see -- you're correct in that case to expect your custom IOManager to be invoked a single time (I didn't realize that encoder_op was a dagstermill op)
r

Rahul Dave

02/16/2023, 6:03 PM
It did not work until i put in my own load input, which makes me think something is trying to re-read the output saved once. And it didnt work because i had not provided a
path = self._get_path(context.upstream_output)
in
o

owen

02/16/2023, 6:05 PM
looks like that's an input_manager not an IOManager (might have caught you in an in-between place) -- in general, the IOManager solution will probably be simpler
r

Rahul Dave

02/16/2023, 6:06 PM
sorry wrong code i am playing with input managers and have my own problems there, here you go:
Copy code
class FixedPathIOManager(UPathIOManager):
    extension: str = ".joblib"

    def _get_path(self, context) -> str:
        <http://context.log.info|context.log.info>(context.resource_config)
        <http://context.log.info|context.log.info>(type(context))
        return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
    
    def dump_to_path(self, context: OutputContext, obj, path: UPath):
        <http://context.log.info|context.log.info>("dump")
        <http://context.log.info|context.log.info>(context.resource_config)
        with path.open("wb") as file:
            joblib.dump(obj, file)

    def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
        <http://context.log.info|context.log.info>("load")
        <http://context.log.info|context.log.info>(context.resource_config)
        with path.open("rb") as file:
            return joblib.load(file)

    def load_input(self, context):
        <http://context.log.info|context.log.info>("in load input")
        if context.upstream_output is None: # input manager
            path = self._get_path(context)
        else:
            path = self._get_path(context.upstream_output)
        with path.open("rb") as file:
            return joblib.load(file)



@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_model_fixedpath_io_manager(
    init_context: InitResourceContext,
) -> FixedPathIOManager:
    assert init_context.instance is not None  # to please mypy
    base_path = UPath(
        init_context.resource_config.get(
            "base_path", init_context.instance.storage_directory()
        )
    )
    return FixedPathIOManager(base_path=base_path)
I will use the step-id idea,! otherwise non-named ops will get messed up!! In the future i want this to work with an artifact management system such as weights and biases or mlflow so can derive an id from there
o

owen

02/16/2023, 6:10 PM
so just to confirm, if you run this code, the run will succeed but you'll see the "dump" message twice in the logs? and which step will that message show up in?
does the "Writing file at ..." message also show up twice?
r

Rahul Dave

02/16/2023, 6:17 PM
One more data point. My defining a load_input seems to prevent upathiomanager from using its own load_input which kills the error i had, but mu load_input, i just confirmed, is actually not called!
Here is the output: i think you are write that the double dump is being called because of the dagstermill notebook output...but it should not be happening. It is also while my file got saved once. In my notebook the last cell is a yield as adviced by the dagstermill_op docs.
when i turn the debug on in the dagit UI there are definitely two "writing file at"s
Screenshot 2023-02-16 at 1.22.27 PM.png
o

owen

02/16/2023, 6:31 PM
ah ok -- I did some digging and it looks like this is a dagstermill-specific quirk that I was not aware of (which explains why things were so confusing for you -- sorry about that). It looks like the dagstermill code actually manually invokes the iomanager for the output in order to pass it across the notebook boundary. Specifically, the handle_output code is invoked inside the notebook process, and the load_input code is invoked in the host dagster process. This is non-standard, and I'm not aware of other integrations working this way, but it's basically a way to get the python object across the process boundary into the host process. After this happens, the regular framework code invokes this iomanager's handle_output function again (this time in the host process, with the full python object).
I can imagine how confusing this process would be to divine coming at this with a blank slate (it was surprising even to me!) so again sorry about hitting that rough edge
so for now, the gist of it is that this is working as intended, but I'm not sure if that manual serialization/deserialization stuff is actually necessary. I'll see if someone else on the team has more in-depth knowledge of this part of the codebase
r

Rahul Dave

02/16/2023, 6:35 PM
Ok this makes ton of sense!
I had thought that it might be a in memory communication from the notebook to the op (because of the yield) but i guess not!
So this then leaves us with one more mystery, why the initial failure if load_input was not defined: but that is perhaps explained by the upath io manager's load_input being called by the notebook process, and the contexts not being identical?
I do not specify required_resource_keys in my op: so i suspect the requisite dictionary is not available in the in-notebook context
not sure it is actually: this is the error from removing
load_input
from my iomanager
Copy code
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "target_extractor_op":

  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 265, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 382, in core_dagster_event_sequence_for_step
    for user_event in check.generator(
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 94, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 177, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 145, in _yield_compute_results
    for event in iterate_with_context(
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 471, in iterate_with_context
    return
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 85, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
KeyError: 'base_path'

  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
    yield
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 469, in iterate_with_context
    next_output = next(iterator)
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagstermill/factory.py", line 313, in _t_fn
    value = io_manager.load_input(
  File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 137, in load_input
    path = self._get_path(context)
  File "/Users/rahul/Websites/mlops2_with_dagster/mlops2_with_dagster/participants.py", line 130, in _get_path
    return UPath(f"{context.resource_config['base_path']}/{context.name}{PandasParquetIOManager.extension}")
Note that the error is in upath io-managers load_input.
and here is the dagit screen on this removal
Screenshot 2023-02-16 at 1.53.53 PM.png
You will notice that the file is saved once. Now perhaps the inter-process communication across notebook/op-container bounbdaries requires the op-container to now read the file, and this is where my
load_input
is needed. But as i pointed out earlier, it does not seem to get called! just prevents the upathiomanager's
load_input
from being called. Very bizarre!
Actually i take that back: the structured output log view in dagit is buggy
on seeing the raw logs
Screenshot 2023-02-16 at 1.59.45 PM.png
i see the two log lines i put in with lots of equal tos before the output that shows that my
load_input
is called
Copy code
def load_input(self, context):
        <http://context.log.info|context.log.info>("=============================in load input")
        if context.upstream_output is None: # input manager
            path = self._get_path(context)
        else:
            <http://context.log.info|context.log.info>("===========================upstream path")
            path = self._get_path(context.upstream_output)
        with path.open("rb") as file:
            return pd.read_parquet(file)
What it also tells you is that the context needs the upstream context, ie the notebook context: and so the process boundary is perhaps not haring the context, which comes as a surprise. And indeed just for dagstermill my hypothesis of an "op" downstream of the notebook, the container op is perhaps correct
(in the mwanwhile i must figure why my context.log.infos did not show up in the structured output)
32 Views