https://dagster.io/ logo
#ask-community
Title
# ask-community
r

Rahul Dave

01/30/2023, 6:28 PM
with the error
Type check failed for step input "df_test" - expected type "DataFrame".
because i clearly have not figured out how to supply the two assets to this job. I thought of using
input_values
and assets as inputs as I might find myself using different files later. But is this the right way? I am doing it at the job level because the same graph might be used in a different context (training vs test vs production set for example) but then how do i set
df_train
and `df_test`to the appropriate assets?
j

jamie

01/30/2023, 7:00 PM
What happens if you add type hints to your assets?
Copy code
@asset
def train_dataset() -> pd.DataFrame:
    train_data : str = "data/train.csv"
    return read_data(train_data)

@asset
def test_dataset() -> pd.DataFrame:
    test_data : str = "data/test.csv"
    return read_data(test_data)
r

Rahul Dave

01/30/2023, 7:08 PM
Same error. I think input_values is not expecting an
AssetKey
dagster._core.errors.DagsterTypeCheckDidNotPass: Type check failed for step input "df_test" - expected type "DataFrame". Description: Value of type <class 'dagster._core.definitions.events.AssetKey'> failed type check for Dagster type DataFrame, expected value to be of Python type pandas.core.frame.DataFrame.
j

jamie

01/30/2023, 7:10 PM
ah yeah, silly oversight on my part. you’ll also need to convert the two assets to ops in order to use them with other ops
r

Rahul Dave

01/30/2023, 7:11 PM
yeah...am trying that now..i think there is no way to be "part assets" unless we become asset backed graphs. And we cant do that for now...
I finally managed to get it to work with this:
Copy code
@resource
def current_training_data(init_context):
    return "data/train.csv"

@resource
def current_testing_data(init_context):
    return "data/test.csv"

def read_data(data: str):
    return pd.read_csv(data)

@op(required_resource_keys={"training_data"})
def read_train_data(context) -> pd.DataFrame:
    return read_data(context.resources.training_data)

@op(required_resource_keys={"testing_data"})
def read_test_data(context) -> pd.DataFrame:
    test_data : str = "data/test.csv"
    return read_data(context.resources.testing_data)

encoder_op = define_dagstermill_op(
    name="encoder_op",
    notebook_path=file_relative_path(__file__, "../notebooks/encoder.ipynb"),
    output_notebook_name="output_encoder",
    outs={"encoders": Out(dict)},
    ins={"df_train": In(pd.DataFrame), "df_test": In(pd.DataFrame)}
)

@graph(out = {'encoders': GraphOut()},
)
def encoder_graph():
    df_train = read_train_data()
    df_test = read_test_data()
    encoders, _ = encoder_op(df_test, df_train)
    return encoders

local_encoder_job = encoder_graph.to_job(
    name="local_encoder_job",
    resource_defs={
        "output_notebook_io_manager": local_output_notebook_io_manager,
        "training_data": current_training_data,
        "testing_data": current_testing_data
    }
)
Any thoughts/criticisms? I now need to hook up the output to an appropriate iomanager (its using the default) so i can use the output elsewhere (or store it permanently)
j

jamie

01/30/2023, 8:40 PM
this looks mostly good! I’m a bit confused about the duplication of the testing/training resources since the same paths are variables in the corresponding ops, and the need for the resources in general, is there more going on in those resources than makes sense for a code snippet?
r

Rahul Dave

01/30/2023, 8:46 PM
I want to use those resources in other pipelines. For example, a training job will take the encoders output here and the training data to first transform the data and then train on it. This will store a model somewhere. Now on test data or completely new data I want to run the encoders and transform the data, which is part of the same pipeline as before, but replace the training by prediction using the model
j

jamie

01/30/2023, 8:47 PM
cool, sounds good!
r

Rahul Dave

01/30/2023, 8:49 PM
The other thing I am trying to achieve ( unsuccessfully with this code since read_data is included twice ) is to say, what if I replace the csv by parquet? The proper solution to this I think configurable io managers but I have to figure out how to do it
j

jamie

01/30/2023, 9:00 PM
you could have your resource read in the file and return the data as a dataframe. then you could write some custom logic for how to load if the file is csv vs parquet
r

Rahul Dave

01/30/2023, 9:04 PM
And hook it up to a run config to dispatch based on file ending or custom flag, say? If I understand you right, you are saying io managers are really for data transport and to think of data type orthogonally …
4 Views