https://dagster.io/ logo
#ask-community
Title
# ask-community
d

david rodriguez

02/14/2024, 7:49 PM
Hello guys, I have a question related to DynamicOuts I have an asset that is generated with 3 ops:
Copy code
@op(out=DynamicOut(is_required=True))
def get_cameras_df(context:OpExecutionContext, upstream):
     cameras_list = list(set(upstream["camera"]))
     for idx, camera in enumerate(cameras_list):
         camera_df = upstream.query(f"camera == '{camera}'")
         yield DynamicOutput(value=(camera, camera_df), mapping_key=camera)

@op
def process_camera_df(context:OpExecutionContext, camera_data:tuple):
     camera, dataframe = camera_data
     return processed_dataframe(dataframe)
@op
def merge_camera_matched_df(context:OpExecutionContext, dataframes):
     result_dataframe = None
     for df in dataframes:
         if result_dataframe is None:
             result_dataframe = df
         else:
            result_dataframe = pd.concat([result_dataframe, df])

     metadata = {"preview": MetadataValue.md(result_dataframe.to_markdown())}
     context.add_output_metadata(metadata=metadata)
     return result_dataframe

@graph_asset(ins={"upstream": AssetIn("simulate_image_folder_sample")})
def simulate_matched_folder(upstream):
     return merge_camera_matched_df(get_cameras_df(upstream).map(process_camera_df).collect())
the last OP is to merge the resulting dataframes and return the asset, it works, but the asset appears as materialized before the ops finish, is there a way to make the graph_asset wait until the last OP called merge_camera_matched_df finish to completely materializing?