Rahul Dave
02/18/2023, 8:34 PMdagster._core.errors.DagsterInvalidDefinitionError: Input 'df_train' of op 'target_extractor_op' has no way of being resolved. Must provide a resolution to this input via another op/graph, or via a direct input value mapped from the top-level graph. To learn more, see the docs for unconnected inputs: <https://docs.dagster.io/concepts/io-management/unconnected-inputs#unconnected-inputs>.
target_extractor_op = define_dagstermill_op(
name="target_extractor_op",
notebook_path=file_relative_path(__file__, "../notebooks/target_extractor.ipynb"),
output_notebook_name="output_target_extractor",
outs={"target": Out(pd.DataFrame, io_manager_key="lake_io_manager")},
ins={"df_train": In(pd.DataFrame, input_manager_key="raw_data_input_manager")}
)
@graph(out = {'target': GraphOut()},
)
def target_extractor_graph():
#df_train = read_train_data()
#target, _ = target_extractor_op(df_train)
target, _ = target_extractor_op()
return target
local_target_extractor_job = target_extractor_graph.to_job(
name="target_extractor_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"training_data": current_training_data,
"lake_io_manager": local_pandas_parquet_io_manager,
"raw_data_input_manager": local_pandas_csv_io_manager,
}
)
encoders
input in this op/graph combo:
transformer_op = define_dagstermill_op(
name="transformer_op",
notebook_path=file_relative_path(__file__, "../notebooks/transform.ipynb"),
output_notebook_name="output_transform",
#outs={"transformed_data": Out(pd.DataFrame)},
outs={"transformed_data": Out(pd.DataFrame, io_manager_key="lake_io_manager")},
#ins={"df": In(pd.DataFrame), "encoders": In(dict), "datatype": In(str)}
ins={"df": In(pd.DataFrame),
"encoders": In(dict, input_manager_key="model_input_manager"),
"datatype": In(str)}
# ins={"df": In(pd.DataFrame, input_manager_key="lake_input_manager"),
# "encoders": In(dict, input_manager_key="model_input_manager"),
# "datatype": In(str)}
)
@graph(out = {'transformed_data': GraphOut()},
)
def transformer_graph():
df = read_data_file()
datatype = read_data_type()
#edict = read_encoder_file()
#transformed_data, _ = transformer_op(datatype = datatype, df = df, encoders = edict)
transformed_data, _ = transformer_op(datatype = datatype, df=df)
return transformed_data
local_train_transformer_job = transformer_graph.to_job(
name="train_transformer_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"data_file": current_training_data,
"data_type": train_type,
"encoder_file": encoder_file,
"model_input_manager": local_model_fixedpath_io_manager,
"lake_io_manager": local_pandas_parquet_io_manager,
"lake_input_manager": local_pandas_parquet_io_manager2,
"raw_data_io_manager": local_pandas_csv_io_manager,
}
)
encoders
also has no problem being resolved, which makes the above error perplexing. Meanwhile, if i try and resolve df using an inputmanager rather than passing it into an argument, I get the same error.claire
02/21/2023, 8:37 PM