achintamiri
09/03/2019, 3:39 PMDataFrame = as_dagster_type(
pd.DataFrame,
name='PandasDataFrame',
description='''Two-dimensional size-mutable, potentially heterogeneous
tabular data structure with labeled axes (rows and columns).
See <http://pandas.pydata.org/>''',
)
@lambda_solid
def Input1(pdd: DataFrame) -> DataFrame:
r = pdd.read_csv('file1.csv')
return r
@lambda_solid
def Merge(r: DataFrame,r2: DataFrame,pdd: DataFrame) -> DataFrame:
r3=pdd.concat([r,r2], axis=1)
return
@lambda_solid
def Input2(pdd: DataFrame) -> DataFrame:
r2 = pdd.read_csv('file2.csv')
return r2
@lambda_solid
def Result_output(y: DataFrame) -> DataFrame:
y3=y
return
@pipeline
def actual_dag_pipeline() :
y=Merge(Input1(),Input2())
Result_output(y)
alex
09/03/2019, 3:44 PMpdd
input to Input1/2
is the source of the problem - based on the fact your are reading a csv from a known path, i think you want to make a new data frame in the solid instead of defining an inputalex
09/03/2019, 3:45 PMinput_hydration_config
)alex
09/03/2019, 3:47 PM@lambda_solid
def Input1() -> DataFrame:
r = pd.read_csv('file1.csv')
return r`
achintamiri
09/03/2019, 3:58 PMalex
09/03/2019, 4:02 PMalex
09/03/2019, 4:04 PMdagster-pandas
which already has this set upachintamiri
09/03/2019, 5:06 PM@lambda_solid
def Input1() -> DataFrame:
r = pd.read_csv('file1.csv')
return r
I can see the pipline execution is failing for agster.core.definitions.events.Failure: Value None should be of type DataFrame.
this is for merge node in my codeachintamiri
09/03/2019, 5:07 PMachintamiri
09/03/2019, 5:08 PMachintamiri
09/03/2019, 5:09 PMdagster-pandas
in my current python code fileachintamiri
09/04/2019, 12:48 PMachintamiri
09/04/2019, 12:48 PMachintamiri
09/04/2019, 12:49 PMdef Merge(r:DataFrame,r2:DataFrame) -> DataFrame: