https://dagster.io/ logo
#ask-community
Title
# ask-community
s

Sebastien

06/20/2023, 3:38 PM
Hi experts, I need help to understand how to do this : *my_temp_asset*() is a Panda asset, using a custom IOManager called "DataFrameIOManager" (as "io_manager_dataframe" resource) it is stored as .csv file fully rewrited at each materialization *my_cumulated_asset*() have to store and cumulate all my_temp_asset() materializations results into a MySQL table I try to write some other IOManager called "MySQLIOManager" (as "io_manager_mysql" resource) I try to set @asset(<params>) for my_cumulated_asset() but I dont have any new data inserted into my target table :/ Maybe you can help me,
Copy code
class MysqlIOManager(ConfigurableIOManager):
       
    def handle_output(self, context : OutputContext, obj: pd.DataFrame):
        con_instance_id = context.metadata["instance_id"]
        con_context = context.metadata["context"]
        con_credential_id = context.metadata["credential_id"]
        context.log.debug(f"Connection to instance_id <{con_instance_id}>, context <{con_context}> using credential_id <{con_credential_id}>")
        con = tools_sa.get_sa_engine_connection(
            instance_id=con_instance_id,
            context=con_context,
            credential_id=con_credential_id,
        )
        name = context.metadata["table"]
        schema = context.metadata["schema"]
        context.log.debug(f"Storing data to: <{schema}>.<{name}>")
        if obj is not None:
            obj.to_sql( 
                name = name,
                con = con,
                schema = schema,
                if_exists = 'append',
                index = False,
                index_label = None,
                chunksize = None,
                dtype = None,
                method = None,
            )

    def load_input(self, context: InputContext) -> pd.DataFrame:
        con_instance_id = context.metadata["instance_id"]
        con_context = context.metadata["context"]
        con_credential_id = context.metadata["credential_id"]
        context.log.debug(f"Connection to instance_id <{con_instance_id}>, context <{con_context}> using credential_id <{con_credential_id}>")
        con = tools_sa.get_sa_engine_connection(
            instance_id=con_instance_id,
            context=con_context,
            credential_id=con_credential_id,
        )
        sql = context.metadata["sql"]
        context.log.debug(f"Loading data from sql: <{sql}>")
        return pd.read_sql(
            sql, 
            con = con, 
            index_col=None, 
            coerce_float=True, 
            params=None, 
            parse_dates=None, 
            columns=None, 
            chunksize=None, 
        )
3 Views