Rahul Dave
02/12/2023, 7:27 PMtmp_xxx/run/step
formation. I'd like my own path (which i plan to version using artifact storage such as mlflow). It seems I can pass this into the fs_iomanager definition. However, I am not sure how to stop it from using the canonical run/step format. In my case I want the old files to be rewritten in a new run, versioning is handled elsewhere. Is this doable in fs_iomanager, or should I be inheriting from the UPathIoManager
as shown here: https://docs.dagster.io/concepts/io-management/io-managers#custom-filesystem-based-io-manager . And will implementing load_from_path
and dump_to_path
be enough to get the path completely custom? Or must I do something more to eliminate the run/step hierarchy...?owen
02/13/2023, 5:52 PMRahul Dave
02/13/2023, 6:03 PMRahul Dave
02/16/2023, 12:17 AMwarehouse/badd02d4-f664-48ff-b636-d3e356657d08/encoder_op/encoders.parquet
(warehouse is the base-path config i provided:
resources:
model_io_manager:
config:
base_path: warehouse
Rahul Dave
02/16/2023, 12:21 AMclass PandasParquetIOManager(UPathIOManager):
extension: str = ".parquet"
def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
with path.open("wb") as file:
obj.to_parquet(file)
def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
with path.open("rb") as file:
return pd.read_parquet(file)
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pandas_parquet_io_manager(
init_context: InitResourceContext,
) -> PandasParquetIOManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return PandasParquetIOManager(base_path=base_path)
Now clearly i could strip in both dump_to_path
and load_to_path
the run-id and step-id before saving and reading...but is this the right way to do it? Wont it cause a mismatch between what dagit reports and where the file is? Is there a "proper" way to do this?owen
02/16/2023, 12:23 AM_get_path
), which would probably be fine but would not be guaranteed to remain compatible with the base definition over time.owen
02/16/2023, 12:25 AM_get_path
would be the way to go for now (sorry for leading you in the wrong direction!)Rahul Dave
02/16/2023, 12:27 AMRahul Dave
02/16/2023, 12:59 AMencodrs.parquet
the "encoders" is obtained from the output variable of the op. But how do i access this from the context?Rahul Dave
02/16/2023, 1:08 AMCustomPathPickledObjectFilesystemIOManager
but I am no closer to understanding how the "run-is/step-id/variable" path is constructedRahul Dave
02/16/2023, 1:41 AMget_identifier
. And the encoder
is coming from the result name, default result. I used encoders
. Still having trouble understanding all the properties of the context though: specificlly how to access the config from them!Rahul Dave
02/16/2023, 3:14 AMclass FixedPathIOManager(UPathIOManager):
extension: str = ".joblib"
def _get_path(self, context) -> str:
<http://context.log.info|context.log.info>(context.resource_config)
<http://context.log.info|context.log.info>(type(context))
return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
def dump_to_path(self, context: OutputContext, obj, path: UPath):
<http://context.log.info|context.log.info>("dump")
<http://context.log.info|context.log.info>(context.resource_config)
with path.open("wb") as file:
joblib.dump(obj, file)
def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
<http://context.log.info|context.log.info>("load")
<http://context.log.info|context.log.info>(context.resource_config)
with path.open("rb") as file:
return joblib.load(file)
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_model_fixedpath_io_manager(
init_context: InitResourceContext,
) -> FixedPathIOManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return FixedPathIOManager(base_path=base_path)
In _get_path
the logging tells me that the resource_config
has the key base_path
but when it tries to return the UPath at the end of the function I ket a key-not-present-error:
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "encoder_op":
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 265, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 382, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 94, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 177, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 145, in _yield_compute_results
for event in iterate_with_context(
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 459, in iterate_with_context
return
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 85, in op_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
KeyError: 'base_path'
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
yield
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 457, in iterate_with_context
next_output = next(iterator)
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagstermill/factory.py", line 313, in _t_fn
value = io_manager.load_input(
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 137, in load_input
path = self._get_path(context)
File "/Users/rahul/Websites/mlops2_with_dagster/mlops2_with_dagster/participants.py", line 47, in _get_path
return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
Even more strangely this seems to be in a load_input
. The file encoders.joblib
is actually written! So i get the feeling that because the op is inside a graph "something" is trying to reload it from this file and failing...Here is the graph and corresponding job definition:
@graph(out = {'daencoders': GraphOut()},
)
def encoder_graph():
df_train = read_train_data()
df_test = read_test_data()
daencoders, _ = encoder_op(df_test=df_test, df_train=df_train)
return daencoders
local_encoder_job = encoder_graph.to_job(
name="local_encoder_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"training_data": current_training_data,
"testing_data": current_testing_data,
"model_io_manager": local_model_fixedpath_io_manager,
}
)
And the config i am using in dagit launchpad to run the job:
resources:
model_io_manager:
config:
base_path: warehouse
Anyone understand what is going on? @owen? It seems very bizarre that the config would just dissapear. And the fact that the file is actually saved and there is a load_input involved makes me think that this error is happening somehow between the completion of the op and the completion of the graph, which is just a return: but perhaps return from graphs create io?Rahul Dave
02/16/2023, 3:31 AMdef load_input(self, context):
<http://context.log.info|context.log.info>("in load input")
path = self._get_path(context.upstream_output)
with path.open("rb") as file:
return joblib.load(file)
then things work! And i get two "dumps" printed out. Which tells me that the data is getting written twice, once at op finish, and once at graph/job finish. Is that avoidable?owen
02/16/2023, 5:46 PMcontext.name
value will always be result
, so basically any op without an explicit output name will end up writing to the same location. It might make sense to include the context.step_key
in the path as well to make it easier to distinguish. The step key will stay static between runs so I think this still would work fine for your use case.Rahul Dave
02/16/2023, 5:54 PMload_input
as evidenced by the error message? Or is there some kind of loop that gets called? I can confirm the writing is happening two times: when i got the error i still got one write happening. I feel i am missing something in my mental model of the software...owen
02/16/2023, 5:58 PMRahul Dave
02/16/2023, 5:59 PMencoder_op = define_dagstermill_op(
name="encoder_op",
notebook_path=file_relative_path(__file__, "../notebooks/encoder.ipynb"),
output_notebook_name="output_encoder",
outs={"encoders": Out(dict, io_manager_key="model_io_manager")},
ins={"df_train": In(pd.DataFrame), "df_test": In(pd.DataFrame)}
)
Rahul Dave
02/16/2023, 5:59 PM@graph(out = {'daencoders': GraphOut()},
)
def encoder_graph():
df_train = read_train_data()
df_test = read_test_data()
daencoders, _ = encoder_op(df_test=df_test, df_train=df_train)
return daencoders
Rahul Dave
02/16/2023, 6:00 PMlocal_encoder_job = encoder_graph.to_job(
name="local_encoder_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"training_data": current_training_data,
"testing_data": current_testing_data,
"model_io_manager": local_model_fixedpath_io_manager,
}
)
Rahul Dave
02/16/2023, 6:01 PMRahul Dave
02/16/2023, 6:01 PMclass FixedPathInputManager(InputManager):
extension: str = ".joblib"
def _get_path(self, context) -> str:
<http://context.log.info|context.log.info>(context.resource_config)
<http://context.log.info|context.log.info>(type(context))
return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
<http://context.log.info|context.log.info>("load")
<http://context.log.info|context.log.info>(context.resource_config)
with path.open("rb") as file:
return joblib.load(file)
def load_input(self, context):
<http://context.log.info|context.log.info>("in load input")
if context.upstream_output is None: # input manager
path = self._get_path(context)
else:
path = self._get_path(context.upstream_output)
with path.open("rb") as file:
return joblib.load(file)
@input_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_model_fixedpath_input_manager(
init_context: InitResourceContext,
) -> FixedPathInputManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return FixedPathInputManager(base_path=base_path)
owen
02/16/2023, 6:02 PMRahul Dave
02/16/2023, 6:03 PMpath = self._get_path(context.upstream_output)
inowen
02/16/2023, 6:05 PMRahul Dave
02/16/2023, 6:06 PMRahul Dave
02/16/2023, 6:06 PMclass FixedPathIOManager(UPathIOManager):
extension: str = ".joblib"
def _get_path(self, context) -> str:
<http://context.log.info|context.log.info>(context.resource_config)
<http://context.log.info|context.log.info>(type(context))
return UPath(f"{context.resource_config['base_path']}/{context.name}{FixedPathIOManager.extension}")
def dump_to_path(self, context: OutputContext, obj, path: UPath):
<http://context.log.info|context.log.info>("dump")
<http://context.log.info|context.log.info>(context.resource_config)
with path.open("wb") as file:
joblib.dump(obj, file)
def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
<http://context.log.info|context.log.info>("load")
<http://context.log.info|context.log.info>(context.resource_config)
with path.open("rb") as file:
return joblib.load(file)
def load_input(self, context):
<http://context.log.info|context.log.info>("in load input")
if context.upstream_output is None: # input manager
path = self._get_path(context)
else:
path = self._get_path(context.upstream_output)
with path.open("rb") as file:
return joblib.load(file)
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_model_fixedpath_io_manager(
init_context: InitResourceContext,
) -> FixedPathIOManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return FixedPathIOManager(base_path=base_path)
Rahul Dave
02/16/2023, 6:08 PMowen
02/16/2023, 6:10 PMowen
02/16/2023, 6:11 PMRahul Dave
02/16/2023, 6:17 PMRahul Dave
02/16/2023, 6:21 PMRahul Dave
02/16/2023, 6:23 PMRahul Dave
02/16/2023, 6:23 PMowen
02/16/2023, 6:31 PMowen
02/16/2023, 6:32 PMowen
02/16/2023, 6:33 PMRahul Dave
02/16/2023, 6:35 PMRahul Dave
02/16/2023, 6:36 PMRahul Dave
02/16/2023, 6:38 PMRahul Dave
02/16/2023, 6:41 PMRahul Dave
02/16/2023, 6:51 PMload_input
from my iomanagerRahul Dave
02/16/2023, 6:51 PMdagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "target_extractor_op":
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 265, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 382, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 94, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 177, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 145, in _yield_compute_results
for event in iterate_with_context(
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 471, in iterate_with_context
return
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 85, in op_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
KeyError: 'base_path'
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
yield
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 469, in iterate_with_context
next_output = next(iterator)
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagstermill/factory.py", line 313, in _t_fn
value = io_manager.load_input(
File "/Users/rahul/miniforge3/envs/ml1-arm64/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 137, in load_input
path = self._get_path(context)
File "/Users/rahul/Websites/mlops2_with_dagster/mlops2_with_dagster/participants.py", line 130, in _get_path
return UPath(f"{context.resource_config['base_path']}/{context.name}{PandasParquetIOManager.extension}")
Rahul Dave
02/16/2023, 6:52 PMRahul Dave
02/16/2023, 6:53 PMRahul Dave
02/16/2023, 6:54 PMRahul Dave
02/16/2023, 6:56 PMload_input
is needed. But as i pointed out earlier, it does not seem to get called! just prevents the upathiomanager's load_input
from being called. Very bizarre!Rahul Dave
02/16/2023, 7:00 PMRahul Dave
02/16/2023, 7:00 PMRahul Dave
02/16/2023, 7:00 PMRahul Dave
02/16/2023, 7:02 PMload_input
is calledRahul Dave
02/16/2023, 7:04 PMdef load_input(self, context):
<http://context.log.info|context.log.info>("=============================in load input")
if context.upstream_output is None: # input manager
path = self._get_path(context)
else:
<http://context.log.info|context.log.info>("===========================upstream path")
path = self._get_path(context.upstream_output)
with path.open("rb") as file:
return pd.read_parquet(file)
What it also tells you is that the context needs the upstream context, ie the notebook context: and so the process boundary is perhaps not haring the context, which comes as a surprise. And indeed just for dagstermill my hypothesis of an "op" downstream of the notebook, the container op is perhaps correctRahul Dave
02/16/2023, 7:05 PM