The problem above with input managers seems to be ...
# ask-community
r
The problem above with input managers seems to be a problem with ops sharing resources. I define a iomanager and use it in an op with a
io_manager_key
. Then I use it in another
Out
in another op, and this is where things blow up. Here is my code:
Copy code
target_extractor_op = define_dagstermill_op(
    name="target_extractor_op",
    notebook_path=file_relative_path(__file__, "../notebooks/target_extractor.ipynb"),
    output_notebook_name="output_target_extractor",
    outs={"target": Out(pd.DataFrame, io_manager_key="lake_io_manager")},
    ins={"df_train": In(pd.DataFrame)}
)


local_target_extractor_job = target_extractor_graph.to_job(
    name="target_extractor_job",
    resource_defs={
        "output_notebook_io_manager": local_output_notebook_io_manager,
        "training_data": current_training_data,
        "lake_io_manager": local_pandas_parquet_io_manager,
    }
)

transformer_op = define_dagstermill_op(
    name="transformer_op",
    notebook_path=file_relative_path(__file__, "../notebooks/transform.ipynb"),
    output_notebook_name="output_transform",
    outs={"transformed_data": Out(pd.DataFrame, io_manager_key="lake_io_manager")},
    ins={"df": In(pd.DataFrame), "encoders": In(dict), "datatype": In(str)}
)


local_train_transformer_job = transformer_graph.to_job(
    name="train_transformer_job",
    resource_defs={
        "output_notebook_io_manager": local_output_notebook_io_manager,
        "data_file": current_training_data, 
        "data_type": train_type,
        "encoder_file": encoder_file,
        "lake_io_manager": local_pandas_parquet_io_manager,
    }
)
🤖 1
o
this is pretty weird -- what does your graph code look like? is it just those single ops? and this shouldn't be necessary but what happens if you add "lake_io_manager" to the
reuired_resource_keys
argument of
define_dagstermill_op
?
also it's not a big deal but do you mind keeping related messages together in a thread? makes it easier for us to track 🙏
r
yes sorry for that i should have threaded. My bad!!!!
note that i get no error for the target extractor (perhaps because its defined first...)
IO manager:
Copy code
class PandasParquetIOManager(UPathIOManager):
    extension: str = ".parquet"

    def _get_path(self, context) -> str:
        <http://context.log.info|context.log.info>(context.resource_config)
        <http://context.log.info|context.log.info>(type(context))
        return UPath(f"{context.resource_config['base_path']}/{context.name}{PandasParquetIOManager.extension}")

    def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
        with path.open("wb") as file:
            obj.to_parquet(file)

    def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
        with path.open("rb") as file:
            return pd.read_parquet(file)

    def load_input(self, context):
        <http://context.log.info|context.log.info>("=============================in load input")
        if context.upstream_output is None: # input manager
            path = self._get_path(context)
        else:
            <http://context.log.info|context.log.info>("===========================upstream path")
            path = self._get_path(context.upstream_output)
        with path.open("rb") as file:
            return pd.read_parquet(file)

@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pandas_parquet_io_manager(
    init_context: InitResourceContext,
) -> PandasParquetIOManager:
    assert init_context.instance is not None  # to please mypy
    base_path = UPath(
        init_context.resource_config.get(
            "base_path", init_context.instance.storage_directory()
        )
    )
    return PandasParquetIOManager(base_path=base_path)
and graph defs:
Copy code
@graph(out = {'target': GraphOut()},
)
def target_extractor_graph():
    df_train = read_train_data()
    target, _ = target_extractor_op(df_train)
    return target


@graph(out = {'transformed_data': GraphOut()},
)
def transformer_graph():
    df = read_data_file()
    datatype = read_data_type()
    edict = read_encoder_file()
    transformed_data, _ = transformer_op(datatype = datatype, df = df, encoders = edict)
    #transformed_data, _ = transformer_op(datatype = datatype, df = df)
    return transformed_data
As you suspected, no change on adding `required_resource_keys={"lake_io_manager"},`as an argument to the ops...
y
hey Rahul, what error are you getting? im trying to repro this on my end for troubleshooting.
ah nvm - saw it in the message below blob smile sweat
r
Yeah sorry for messing up the thread!!!
you can get the code on github at
univai-community/mlops2-with-dagster
y
Copy code
@@ -373,6 +373,7 @@ local_test_transformer_job = transformer_graph.to_job(
     name="test_transformer_job",
     resource_defs={
         "output_notebook_io_manager": local_output_notebook_io_manager,
+        "lake_io_manager": local_pandas_parquet_io_manager,
         "data_file": current_testing_data,
         "data_type": test_type,
         "encoder_file": encoder_file
@@ -383,6 +384,7 @@ local_dataset_transformer_job = transformer_graph.to_job(
     name="dataset_transformer_job",
     resource_defs={
         "output_notebook_io_manager": local_output_notebook_io_manager,
+        "lake_io_manager": local_pandas_parquet_io_manager,
         "data_file": current_dataset_data,
         "data_type": dataset_type,
         "encoder_file": encoder_file
@@ -481,10 +483,11 @@ local_inference_from_data_job = inference_from_data_graph.to_job(
     name="inference_from_data_job",
     resource_defs={
         "output_notebook_io_manager": local_output_notebook_io_manager,
         "data_file": current_dataset_data,
+        "lake_io_manager": local_pandas_parquet_io_manager,       
         "data_type": dataset_type,
         "encoder_file": encoder_file,
         "model_file": model_file,
         "infer_type": dataset_type
     }
 )
@@ -507,10 +510,11 @@ local_inference_from_data_job_scratch = inference_from_data_graph_scratch.to_job
     name="inference_from_data_job_scratch",
     resource_defs={
         "output_notebook_io_manager": local_output_notebook_io_manager,
         "data_file": current_dataset_data,
+        "lake_io_manager": local_pandas_parquet_io_manager,
         "data_type": dataset_type,
         "encoder_file": encoder_file,
         "model_file": model_file,
         "infer_type": dataset_type
     }
 )
i cloned your repo and did those changes^ and then the code location could be successfully loaded.
so the problem is as you are reusing the notebook op in multiple jobs, you would need to specify the resource def, i.e.
"lake_io_manager": local_pandas_parquet_io_manager
in all the jobs.
we are working on improving the error message as well as making the resource definition/specification more intuitive to use. one improvement you could do is to move the common resource_defs to Definitions, i.e. specifying the resources at definition level across all jobs, so you won’t forget to include any for new jobs.
r
Wow! Thank you!! In retrospect this makes a lot of sense, since we describe resources at op level, those resources need to be on the jobs that use the ops…
y
yeah exactly.. for exactly this type of inconvenience, we introduced the definition-level resources: https://github.com/dagster-io/dagster/discussions/11167
i do see you have some here: https://github.com/univai-community/mlops2_with_dagster/blob/master/mlops2_with_dagster/__init__.py#L8 so moving the other common ones here should make it easier to maintain
r
My strategy is to bind things as much to outer layers (jobs ) as possible. And so love the config format for inward communication.
y
yea exactly Definition would be an even outer layer than jobs blob smiley
😁 1
and if you ever need to bind things early, you can specify them on jobs or graphs
r
Yes I tried the commented lines there but it somehow didn’t work. But I shoukd try again..I was trying to many things to keep my head bolted on straight 🤣
so just to keep this straight in my head, i could do
lake_io_manager = participants.local_pandas_parquet_io_manager
in resource definitions and then not include it in any job, just using the key in ops?
I think i am misunderstanding @yuhan. I tried moving all redource definitions out of the job and into dunder init.py but that gives me the same error, only now on target_op as thats the first one in the file.
like so
lake_io_manager = participants.local_pandas_parquet_io_manager,
and so:
Copy code
local_target_extractor_job = target_extractor_graph.to_job(
    name="target_extractor_job",
    resource_defs={
        "output_notebook_io_manager": local_output_notebook_io_manager,
        "training_data": current_training_data,
        #"lake_io_manager": local_pandas_parquet_io_manager,
    }
)
in the participants file. The op definition remains the same...
y
oh sorry my bad - i got it wrong myself. jobs currently don’t respect the
resources
arg to
Definitions
- only assets do.
so in your case, because it seems like many jobs are sharing the same set of resources based on test/dataset. my suggestion would be to maintain a set of resources per dataset or environment, similar to this example here: https://github.com/dagster-io/dagster/blob/master/examples/project_fully_featured/project_fully_featured/resources/__init__.py#L55
r
Aha so it makes sense to double them into the resources definitions anyway, for asset based jobs. For example, metrics can be calculated as a downstream job after training is completed. The env based idea is super! I was thinking of that as a dev to production thing, but maintaining these dictionaries more as a "config" of sorts and passing them into jobs based on need makes sense!
👍 1
Thank you!
y
anytime! speaking of dev to prod, here’s a guide for that, in case you aren’t aware : ) https://docs.dagster.io/guides/dagster/transitioning-data-pipelines-from-development-to-production