Sebastien
06/20/2023, 3:38 PMclass MysqlIOManager(ConfigurableIOManager):
def handle_output(self, context : OutputContext, obj: pd.DataFrame):
con_instance_id = context.metadata["instance_id"]
con_context = context.metadata["context"]
con_credential_id = context.metadata["credential_id"]
context.log.debug(f"Connection to instance_id <{con_instance_id}>, context <{con_context}> using credential_id <{con_credential_id}>")
con = tools_sa.get_sa_engine_connection(
instance_id=con_instance_id,
context=con_context,
credential_id=con_credential_id,
)
name = context.metadata["table"]
schema = context.metadata["schema"]
context.log.debug(f"Storing data to: <{schema}>.<{name}>")
if obj is not None:
obj.to_sql(
name = name,
con = con,
schema = schema,
if_exists = 'append',
index = False,
index_label = None,
chunksize = None,
dtype = None,
method = None,
)
def load_input(self, context: InputContext) -> pd.DataFrame:
con_instance_id = context.metadata["instance_id"]
con_context = context.metadata["context"]
con_credential_id = context.metadata["credential_id"]
context.log.debug(f"Connection to instance_id <{con_instance_id}>, context <{con_context}> using credential_id <{con_credential_id}>")
con = tools_sa.get_sa_engine_connection(
instance_id=con_instance_id,
context=con_context,
credential_id=con_credential_id,
)
sql = context.metadata["sql"]
context.log.debug(f"Loading data from sql: <{sql}>")
return pd.read_sql(
sql,
con = con,
index_col=None,
coerce_float=True,
params=None,
parse_dates=None,
columns=None,
chunksize=None,
)