does anyone know how to cache or keep in memory an...
# ask-community
j
does anyone know how to cache or keep in memory an io manager handled output?
Copy code
class IOBigQToDF(IOManager):

	def handle_output(self, context: OutputContext, obj: str) -> None:
		df: pl.DataFrame = context.resources.rsrc_bigquery_runner(obj).bigquery_to_df()
		filter_func = context.config["filter_func"]
		if filter_func is None:
			get_dagster_logger().info(
				f'DATAFRAME WAS NOT ADJUSTED, PER OP: {str(df.shape)} columns: {str(df.shape[1])}, rows: {str(df.shape[0])}')
			self.result = df
		else:
			filtered_df = filter_func(df)
			get_dagster_logger().info(
				f'ADJUSTED DATAFRAME SHAPE: {str(filtered_df.shape)} columns: {str(filtered_df.shape[1])}, rows: {str(filtered_df.shape[0])}')
			self.result = filtered_df

	def load_input(self, context: InputContext) -> pl.DataFrame:
		return self.result
i thought that i could keep it in the
self.result
attribute but i get an error saying
AttributeError: 'IOBigQToDF' object has no attribute 'result'
when i try to use the io manager as an input. when it's used as an output handler, the logger prints out the line above, and that's how i know it works. but i guess after that, the instance is no longer used?? this doesn't work:
Copy code
@op#(ins={"bq_to_df_io": In(input_manager_key="io_bigq_to_df")})
def some_op(bq_to_df_io):
   get_dagster_logger().info(bq_to_df_io)
🤖 1
for example if i use this:
Copy code
def load_input(self, context: InputContext):
   return(context.upstream_output.get_output_identifier())
i will get the following:
['10b2c5a6-ea86-43b6-9889-e150a161ad09', 'upstream_op', 'result']
but how do i tap into the value of result
ok this answer it:
Copy code
To store intermediate values in memory, instead of on-disk, you can use mem_io_manager. For mem_io_manager to work, all the ops within an execution need to share the same process, which requires using the in_process_executor.
got it here