Hi Team, I am try to add data validations and crea...
# announcements
a
Hi Team, I am try to add data validations and creating user defined data type.I want to perform a check if the file read is csv from one node and the next node also receives csv . I am getting error dagster.core.errors.DagsterInvalidDefinitionError: Input "pdd" in solid "Input1" is not connected to any outputs and can not be hydrated from configuration, creating an impossible to execute pipeline Sample code from dagster import execute_pipeline, pipeline, solid,as_dagster_type,lambda_solid,input_hydration_config import pandas as pd
DataFrame = as_dagster_type(
pd.DataFrame,
name='PandasDataFrame',
description='''Two-dimensional size-mutable, potentially heterogeneous
tabular data structure with labeled axes (rows and columns).
See <http://pandas.pydata.org/>''',
)
@lambda_solid
def Input1(pdd: DataFrame) -> DataFrame:
r = pdd.read_csv('file1.csv')
return r
@lambda_solid
def Merge(r: DataFrame,r2: DataFrame,pdd: DataFrame) -> DataFrame:
r3=pdd.concat([r,r2], axis=1)
return
@lambda_solid
def Input2(pdd: DataFrame) -> DataFrame:
r2 = pdd.read_csv('file2.csv')
return r2
@lambda_solid
def Result_output(y: DataFrame) -> DataFrame:
y3=y
return
@pipeline
def actual_dag_pipeline() :
y=Merge(Input1(),Input2())
Result_output(y)
a
the
pdd
input to
Input1/2
is the source of the problem - based on the fact your are reading a csv from a known path, i think you want to make a new data frame in the solid instead of defining an input
The error is there because the pipeline has inputs that are not wired up to anything - so the only way to satisfy them would be to provide values from config. The issue is that your custom input type does not yet define how to do that (
input_hydration_config
)
ie i think you want:
Copy code
@lambda_solid
def Input1() -> DataFrame:
    r = pd.read_csv('file1.csv')
    return r`
a
Thanks alex for you response .Do you mean I should add input_hydration_config also
a
you should do that if you want to be able to provide for example the csv path as config and load the dataframe that way
you could also consider using the DataFrame type from
dagster-pandas
which already has this set up
a
I am continuing with my existing code which I shared and did the changes as suggested by you
@lambda_solid
def Input1() -> DataFrame:
r = pd.read_csv('file1.csv')
return r
I can see the pipline execution is failing for agster.core.definitions.events.Failure: Value None should be of type DataFrame. this is for merge node in my code
r and r2 are coming as any type where as i am expecting dataframe
And sorry I didnot understand how to use DataFrame type from
dagster-pandas
in my current python code file
Hi ,PLease ignore this issue
it is solved
I used
def Merge(r:DataFrame,r2:DataFrame) -> DataFrame: