Filipe Almeida
07/26/2023, 4:12 PMFilipe Almeida
07/26/2023, 4:13 PMIsmael Rodrigues
07/26/2023, 4:26 PMFilipe Almeida
07/27/2023, 8:07 AM@op(
required_resource_keys={"mysql_resource", "snowflake_resource"},
ins={"start_after": In(Nothing)},
out=DynamicOut(),
)
def anonymizer_op(context: InputContext, config: AnonymizerConfig) -> Iterator[DynamicOutput]:
logger = get_dagster_logger()
resource_mysql: Dict[str, str] = context.resources.mysql_resource
resource_snowflake: Dict[str, str] = context.resources.snowflake_resource
logger.debug(resource_mysql)
logger.debug(resource_snowflake)
records_fn: records_fetcher = partial(get_records, logger=logger, credentials=resource_mysql)
query_runner_fn: QueryRunner = partial(run_query, logger=logger, credentials=resource_snowflake)
query_fetcher: QueryFetcher = FileQueryFetcher()
added_table_names = set()
for index, (table_name, query) in enumerate(
anonymise_data(
logger=logger,
dry_run=config.dry_run,
emails=config.emails,
table_names=config.table_names,
records_fn=records_fn,
query_fetcher=query_fetcher,
query_runner_fn=query_runner_fn,
)
):
if table_name not in added_table_names:
key = f"{table_name.replace('.', '_')}_{index}"
yield DynamicOutput(value=query, mapping_key=key)
added_table_names.add(table_name)
Ismael Rodrigues
07/27/2023, 2:58 PMFilipe Almeida
07/28/2023, 10:48 AM