Alessandro Cantarelli
09/30/2022, 2:39 PM@asset(
partitions_def=daily,
ins={"load_data": AssetIn(metadata={'allow_missing_partitions': True}, partition_mapping=NDaysPartitionMapping(days=1, offset=0))},
)
def trailing_window(context, load_data: pandas.DataFrame) -> list:
df = load_data
tidied_raw_data = tidy_raw_data(database=df, cellsParallel=context.op_config['cellsParallel'], cellsSeries=context.op_config['cellsSeries'])
split_time_data = split_by_time(data=tidied_raw_data, cut_off_time=context.op_config['cut_off_time'])
return split_time_data
@graph(ins={"trailing_window": GraphIn()})
def event_split_graph(trailing_window):
df = trailing_window
return df
event_split = AssetsDefinition.from_graph(graph_def=event_split_graph, partitions_def=daily,)
I am currently receiving this error:
@graph 'event_split_graph' returned problematic value of type <class 'dagster._core.definitions.composition.InputMappingNode'>
claire
09/30/2022, 5:05 PM@graph
functions as a wrapper over a set of ops with defined dependencies between them. Dagster will flatten each graph into a flat map of ops and then pass each input of the graph to the corresponding op inputs.
This means that a graph must contain at least one op, and each input of the graph must be mapped to at least one op within the graph.
We could do a better job of clarifying this error for sure, since it's pretty unclear what the issue is from looking at the error message.@op
def transform(trailing_window):
return trailing_window
@graph(ins={"trailing_window": GraphIn()})
def event_split_graph(trailing_window):
df = transform(trailing_window)
return df
Alessandro Cantarelli
10/03/2022, 8:11 AMNiraj Shah
12/09/2022, 5:30 PM