david rodriguez
02/14/2024, 7:49 PM@op(out=DynamicOut(is_required=True))
def get_cameras_df(context:OpExecutionContext, upstream):
cameras_list = list(set(upstream["camera"]))
for idx, camera in enumerate(cameras_list):
camera_df = upstream.query(f"camera == '{camera}'")
yield DynamicOutput(value=(camera, camera_df), mapping_key=camera)
@op
def process_camera_df(context:OpExecutionContext, camera_data:tuple):
camera, dataframe = camera_data
return processed_dataframe(dataframe)
@op
def merge_camera_matched_df(context:OpExecutionContext, dataframes):
result_dataframe = None
for df in dataframes:
if result_dataframe is None:
result_dataframe = df
else:
result_dataframe = pd.concat([result_dataframe, df])
metadata = {"preview": MetadataValue.md(result_dataframe.to_markdown())}
context.add_output_metadata(metadata=metadata)
return result_dataframe
@graph_asset(ins={"upstream": AssetIn("simulate_image_folder_sample")})
def simulate_matched_folder(upstream):
return merge_camera_matched_df(get_cameras_df(upstream).map(process_camera_df).collect())
the last OP is to merge the resulting dataframes and return the asset, it works, but the asset appears as materialized before the ops finish, is there a way to make the graph_asset wait until the last OP called merge_camera_matched_df finish to completely materializing?