Jake Kagan
01/26/2023, 9:08 PMclass IOBigQToDF(IOManager):
def handle_output(self, context: OutputContext, obj: str) -> None:
df: pl.DataFrame = context.resources.rsrc_bigquery_runner(obj).bigquery_to_df()
filter_func = context.config["filter_func"]
if filter_func is None:
get_dagster_logger().info(
f'DATAFRAME WAS NOT ADJUSTED, PER OP: {str(df.shape)} columns: {str(df.shape[1])}, rows: {str(df.shape[0])}')
self.result = df
else:
filtered_df = filter_func(df)
get_dagster_logger().info(
f'ADJUSTED DATAFRAME SHAPE: {str(filtered_df.shape)} columns: {str(filtered_df.shape[1])}, rows: {str(filtered_df.shape[0])}')
self.result = filtered_df
def load_input(self, context: InputContext) -> pl.DataFrame:
return self.result
i thought that i could keep it in the self.result
attribute but i get an error saying AttributeError: 'IOBigQToDF' object has no attribute 'result'
when i try to use the io manager as an input.
when it's used as an output handler, the logger prints out the line above, and that's how i know it works. but i guess after that, the instance is no longer used??
this doesn't work:
@op#(ins={"bq_to_df_io": In(input_manager_key="io_bigq_to_df")})
def some_op(bq_to_df_io):
get_dagster_logger().info(bq_to_df_io)
def load_input(self, context: InputContext):
return(context.upstream_output.get_output_identifier())
i will get the following:
['10b2c5a6-ea86-43b6-9889-e150a161ad09', 'upstream_op', 'result']
but how do i tap into the value of resultTo store intermediate values in memory, instead of on-disk, you can use mem_io_manager. For mem_io_manager to work, all the ops within an execution need to share the same process, which requires using the in_process_executor.
got it here