Hi there. Quick feedback regarding the `BigQueryPa...
# integration-bigquery
g
Hi there. Quick feedback regarding the `BigQueryPandasIOManager`: correct me if I'm wrong, but as far as I can tell, there's no way to configure the
job_config
of the underlying BQ job. Subjectively, I think it's quite critical to at least be able to configure time partitioning and clustering for the destination table, if the table doesn't exist.
1
🤖 1
Custom IO manager which does support time partitioning:
Copy code
class CustomBigQueryPandasTypeHandler(BigQueryPandasTypeHandler):
    def _get_bq_partition_type(self, schedule_type: ScheduleType):
        if schedule_type == ScheduleType.HOURLY:
            raise NotImplementedError()
        if schedule_type == ScheduleType.DAILY:
            return bigquery.TimePartitioningType.DAY
        if schedule_type == ScheduleType.WEEKLY:
            return bigquery.TimePartitioningType.DAY
        if schedule_type == ScheduleType.MONTHLY:
            return bigquery.TimePartitioningType.MONTH

    def handle_output(
        self,
        context: OutputContext,
        table_slice: TableSlice,
        obj: pd.DataFrame,
        connection: bigquery.Client,
    ):
        context_metadata = context.metadata or {}
        job_config = None

        if context.has_asset_partitions:
            partition_field = context_metadata.get("partition_field")
            if partition_field is None:
                raise ValueError("`partition_field` metadata is missing. Value is mandatory if asset is partitioned.")

            partition_type = self._get_bq_partition_type(context.asset_partitions_def.schedule_type)

            job_config = bigquery.LoadJobConfig(
                time_partitioning=bigquery.TimePartitioning(
                    type_=partition_type,
                    field=partition_field,
                ),
            )

        job = connection.load_table_from_dataframe(
            dataframe=obj,
            destination=f"{table_slice.schema}.{table_slice.table}",
            project=table_slice.database,
            location=context.resource_config.get("location") if context.resource_config else None,
            timeout=context.resource_config.get("timeout") if context.resource_config else None,
            job_config=job_config,
        )
        job.result()

        context.add_output_metadata(
            {
                "row_count": obj.shape[0],
                "dataframe_columns": MetadataValue.table_schema(
                    TableSchema(
                        columns=[TableColumn(name=name, type=str(dtype)) for name, dtype in obj.dtypes.iteritems()]
                    )
                ),
            }
        )


class CustomBigQueryPandasIOManager(BigQueryPandasIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [CustomBigQueryPandasTypeHandler()]
Requires to have the both metadata defined:
Copy code
metadata={
    "partition_field": "Date",
    "partition_expr": "TIMESTAMP(Date)",
},
j
hey @Guillaume Onfroy - we’re aware of this. i thought there was an open issue for it but i can’t track it down. I can open a new one
g
@jamie Thank you It's a completely different issue, but I would also quickly add that the enforcement of the uppercase for the column names is a blocker for our team. If that could be configurable, that would be fantastic.
j
we’ve got an issue for that too https://github.com/dagster-io/dagster/issues/14129
g
Ah!
j
i will note though that work on the BQ IO manager is pretty low priority right now in comparison to all the other stuff we’ve got going on
g
I understand. Thanks for the feedback.