Simon Späti
03/22/2020, 9:01 PMdagster.core.errors.DagsterInvalidDefinitionError: In @pipeline networkscore_incremental_pipeline received invalid type <class 'dagster.core.definitions.composition.CallableSolidNode'> for input "tag_session" (at position 0) in solid invocation "update_tags". Must pass the output from previous solid invocations or inputs to the composition function as inputs when invoking solids during composition.
My pipeline:
@pipeline(...)
def networkscore_incremental_pipeline():
df_tag_session = load_delta_table_to_df.alias("tag_session_load_delta_table_to_df")
df = update_tags(df_tag_session, sa_simulation_of_input_nps_categories(),)
I found that the error is in load_delta_table_to_df
, if I use something different it works. I figured maybe the return of the solid is not typed as DataFrame
. Tried _`output_defs`_ and other things. What I’m missing?
my very easy update_tags-solid:
@solid(
description='''Update Tag to Session mapping table with new inputs from external applications (e.g. SA)''',
)
def update_tags(context, tag_session: DataFrame, new_tags_sessions: DataFrame) -> DataFrame:
return tag_session
and my error source in load_delta_table_to_df
, but why?
@solid(
required_resource_keys={'spark', 's3'},
description='''Loads given delta coordinates into a spark data frame''',
# output_defs=[OutputDefinition(name='df', dagster_type=DataFrame, is_optional=False),],
)
def load_delta_table_to_df(
context, delta_coordinate: DeltaCoordinate, where_conditions: String,
) -> DataFrame:
delta_path = get_s3a_path(
delta_coordinate['s3_coordinate_bucket'], delta_coordinate['s3_coordinate_key']
)
df = (
context.resources.spark.spark_session.read.format("delta")
.load(delta_path)
.where(where_conditions)
)
return df
Thanks a lot for any help, hint or others.