Rahul Dave
01/30/2023, 3:49 PM@asset
def train_dataset():
train_data : str = "data/train.csv"
return read_data(train_data)
@asset
def test_dataset():
test_data : str = "data/test.csv"
return read_data(test_data)
input_datasets = [train_dataset, test_dataset]
encoder_op = define_dagstermill_op(
name="encoder_op",
notebook_path=file_relative_path(__file__, "../notebooks/encoder.ipynb"),
output_notebook_name="output_encoder",
outs={"encoders": Out(dict)},
ins={"df_train": In(pd.DataFrame), "df_test": In(pd.DataFrame)}
)
@graph(out = {'encoders': GraphOut()})
def encoder_graph(df_train, df_test):
encoders = encoder_op(df_train, df_test)
return encoders
encoder_asset = AssetsDefinition.from_graph(encoder_graph,
keys_by_input_name={"df_train": AssetKey("train_dataset"), "df_test": AssetKey("test_dataset")},
keys_by_output_name={"encoders": AssetKey("encoders_asset")}
)
which I think reflects the structure better but i still get the same error DagstermillError: dagstermill assets do not currently support dagstermill.yield_result
which means that my dagstermill_op
is getting converted to a dagstermill_asset
which is not what I want. What am I doing wrong?