Rahul Dave
01/30/2023, 6:28 PMType check failed for step input "df_test" - expected type "DataFrame".
because i clearly have not figured out how to supply the two assets to this job. I thought of using input_values
and assets as inputs as I might find myself using different files later. But is this the right way? I am doing it at the job level because the same graph might be used in a different context (training vs test vs production set for example) but then how do i set df_train
and `df_test`to the appropriate assets?jamie
01/30/2023, 7:00 PM@asset
def train_dataset() -> pd.DataFrame:
train_data : str = "data/train.csv"
return read_data(train_data)
@asset
def test_dataset() -> pd.DataFrame:
test_data : str = "data/test.csv"
return read_data(test_data)
Rahul Dave
01/30/2023, 7:08 PMAssetKey
dagster._core.errors.DagsterTypeCheckDidNotPass: Type check failed for step input "df_test" - expected type "DataFrame". Description: Value of type <class 'dagster._core.definitions.events.AssetKey'> failed type check for Dagster type DataFrame, expected value to be of Python type pandas.core.frame.DataFrame.
jamie
01/30/2023, 7:10 PMRahul Dave
01/30/2023, 7:11 PM@resource
def current_training_data(init_context):
return "data/train.csv"
@resource
def current_testing_data(init_context):
return "data/test.csv"
def read_data(data: str):
return pd.read_csv(data)
@op(required_resource_keys={"training_data"})
def read_train_data(context) -> pd.DataFrame:
return read_data(context.resources.training_data)
@op(required_resource_keys={"testing_data"})
def read_test_data(context) -> pd.DataFrame:
test_data : str = "data/test.csv"
return read_data(context.resources.testing_data)
encoder_op = define_dagstermill_op(
name="encoder_op",
notebook_path=file_relative_path(__file__, "../notebooks/encoder.ipynb"),
output_notebook_name="output_encoder",
outs={"encoders": Out(dict)},
ins={"df_train": In(pd.DataFrame), "df_test": In(pd.DataFrame)}
)
@graph(out = {'encoders': GraphOut()},
)
def encoder_graph():
df_train = read_train_data()
df_test = read_test_data()
encoders, _ = encoder_op(df_test, df_train)
return encoders
local_encoder_job = encoder_graph.to_job(
name="local_encoder_job",
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
"training_data": current_training_data,
"testing_data": current_testing_data
}
)
jamie
01/30/2023, 8:40 PMRahul Dave
01/30/2023, 8:46 PMjamie
01/30/2023, 8:47 PMRahul Dave
01/30/2023, 8:49 PMjamie
01/30/2023, 9:00 PMRahul Dave
01/30/2023, 9:04 PM