Guillaume Onfroy
07/06/2023, 10:47 AMjob_config
of the underlying BQ job. Subjectively, I think it's quite critical to at least be able to configure time partitioning and clustering for the destination table, if the table doesn't exist.Guillaume Onfroy
07/06/2023, 3:40 PMclass CustomBigQueryPandasTypeHandler(BigQueryPandasTypeHandler):
def _get_bq_partition_type(self, schedule_type: ScheduleType):
if schedule_type == ScheduleType.HOURLY:
raise NotImplementedError()
if schedule_type == ScheduleType.DAILY:
return bigquery.TimePartitioningType.DAY
if schedule_type == ScheduleType.WEEKLY:
return bigquery.TimePartitioningType.DAY
if schedule_type == ScheduleType.MONTHLY:
return bigquery.TimePartitioningType.MONTH
def handle_output(
self,
context: OutputContext,
table_slice: TableSlice,
obj: pd.DataFrame,
connection: bigquery.Client,
):
context_metadata = context.metadata or {}
job_config = None
if context.has_asset_partitions:
partition_field = context_metadata.get("partition_field")
if partition_field is None:
raise ValueError("`partition_field` metadata is missing. Value is mandatory if asset is partitioned.")
partition_type = self._get_bq_partition_type(context.asset_partitions_def.schedule_type)
job_config = bigquery.LoadJobConfig(
time_partitioning=bigquery.TimePartitioning(
type_=partition_type,
field=partition_field,
),
)
job = connection.load_table_from_dataframe(
dataframe=obj,
destination=f"{table_slice.schema}.{table_slice.table}",
project=table_slice.database,
location=context.resource_config.get("location") if context.resource_config else None,
timeout=context.resource_config.get("timeout") if context.resource_config else None,
job_config=job_config,
)
job.result()
context.add_output_metadata(
{
"row_count": obj.shape[0],
"dataframe_columns": MetadataValue.table_schema(
TableSchema(
columns=[TableColumn(name=name, type=str(dtype)) for name, dtype in obj.dtypes.iteritems()]
)
),
}
)
class CustomBigQueryPandasIOManager(BigQueryPandasIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [CustomBigQueryPandasTypeHandler()]
Guillaume Onfroy
07/06/2023, 3:42 PMmetadata={
"partition_field": "Date",
"partition_expr": "TIMESTAMP(Date)",
},
jamie
07/06/2023, 4:43 PMjamie
07/06/2023, 4:47 PMGuillaume Onfroy
07/06/2023, 4:55 PMjamie
07/06/2023, 4:56 PMGuillaume Onfroy
07/06/2023, 4:56 PMjamie
07/06/2023, 4:57 PMGuillaume Onfroy
07/06/2023, 4:58 PM