Rahul Dave
02/16/2023, 7:33 PMio_manager_key
. Then I use it in another Out
in another op, and this is where things blow up. Here is my code:
target_extractor_op = define_dagstermill_op(
name="target_extractor_op",
notebook_path=file_relative_path(__file__, "../notebooks/target_extractor.ipynb"),
output_notebook_name="output_target_extractor",
outs={"target": Out(pd.DataFrame, io_manager_key="lake_io_manager")},
ins={"df_train": In(pd.DataFrame)}
)
local_target_extractor_job = target_extractor_graph.to_job(
name="target_extractor_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"training_data": current_training_data,
"lake_io_manager": local_pandas_parquet_io_manager,
}
)
transformer_op = define_dagstermill_op(
name="transformer_op",
notebook_path=file_relative_path(__file__, "../notebooks/transform.ipynb"),
output_notebook_name="output_transform",
outs={"transformed_data": Out(pd.DataFrame, io_manager_key="lake_io_manager")},
ins={"df": In(pd.DataFrame), "encoders": In(dict), "datatype": In(str)}
)
local_train_transformer_job = transformer_graph.to_job(
name="train_transformer_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"data_file": current_training_data,
"data_type": train_type,
"encoder_file": encoder_file,
"lake_io_manager": local_pandas_parquet_io_manager,
}
)
owen
02/16/2023, 7:54 PMreuired_resource_keys
argument of define_dagstermill_op
?Rahul Dave
02/16/2023, 7:55 PMclass PandasParquetIOManager(UPathIOManager):
extension: str = ".parquet"
def _get_path(self, context) -> str:
<http://context.log.info|context.log.info>(context.resource_config)
<http://context.log.info|context.log.info>(type(context))
return UPath(f"{context.resource_config['base_path']}/{context.name}{PandasParquetIOManager.extension}")
def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
with path.open("wb") as file:
obj.to_parquet(file)
def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
with path.open("rb") as file:
return pd.read_parquet(file)
def load_input(self, context):
<http://context.log.info|context.log.info>("=============================in load input")
if context.upstream_output is None: # input manager
path = self._get_path(context)
else:
<http://context.log.info|context.log.info>("===========================upstream path")
path = self._get_path(context.upstream_output)
with path.open("rb") as file:
return pd.read_parquet(file)
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pandas_parquet_io_manager(
init_context: InitResourceContext,
) -> PandasParquetIOManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return PandasParquetIOManager(base_path=base_path)
@graph(out = {'target': GraphOut()},
)
def target_extractor_graph():
df_train = read_train_data()
target, _ = target_extractor_op(df_train)
return target
@graph(out = {'transformed_data': GraphOut()},
)
def transformer_graph():
df = read_data_file()
datatype = read_data_type()
edict = read_encoder_file()
transformed_data, _ = transformer_op(datatype = datatype, df = df, encoders = edict)
#transformed_data, _ = transformer_op(datatype = datatype, df = df)
return transformed_data
yuhan
02/16/2023, 10:04 PMRahul Dave
02/16/2023, 10:11 PMyuhan
02/16/2023, 10:27 PM@@ -373,6 +373,7 @@ local_test_transformer_job = transformer_graph.to_job(
name="test_transformer_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
+ "lake_io_manager": local_pandas_parquet_io_manager,
"data_file": current_testing_data,
"data_type": test_type,
"encoder_file": encoder_file
@@ -383,6 +384,7 @@ local_dataset_transformer_job = transformer_graph.to_job(
name="dataset_transformer_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
+ "lake_io_manager": local_pandas_parquet_io_manager,
"data_file": current_dataset_data,
"data_type": dataset_type,
"encoder_file": encoder_file
@@ -481,10 +483,11 @@ local_inference_from_data_job = inference_from_data_graph.to_job(
name="inference_from_data_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"data_file": current_dataset_data,
+ "lake_io_manager": local_pandas_parquet_io_manager,
"data_type": dataset_type,
"encoder_file": encoder_file,
"model_file": model_file,
"infer_type": dataset_type
}
)
@@ -507,10 +510,11 @@ local_inference_from_data_job_scratch = inference_from_data_graph_scratch.to_job
name="inference_from_data_job_scratch",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"data_file": current_dataset_data,
+ "lake_io_manager": local_pandas_parquet_io_manager,
"data_type": dataset_type,
"encoder_file": encoder_file,
"model_file": model_file,
"infer_type": dataset_type
}
)
"lake_io_manager": local_pandas_parquet_io_manager
in all the jobs.Rahul Dave
02/16/2023, 10:38 PMyuhan
02/16/2023, 10:40 PMRahul Dave
02/16/2023, 10:41 PMyuhan
02/16/2023, 10:42 PMRahul Dave
02/16/2023, 10:43 PMlake_io_manager = participants.local_pandas_parquet_io_manager
in resource definitions and then not include it in any job, just using the key in ops?lake_io_manager = participants.local_pandas_parquet_io_manager,
and so:
local_target_extractor_job = target_extractor_graph.to_job(
name="target_extractor_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"training_data": current_training_data,
#"lake_io_manager": local_pandas_parquet_io_manager,
}
)
in the participants file. The op definition remains the same...yuhan
02/16/2023, 11:12 PMresources
arg to Definitions
- only assets do.Rahul Dave
02/17/2023, 12:33 AMyuhan
02/17/2023, 12:40 AM