# integration-bigquery


02/24/2023, 4:47 PM
Welcome blob wave This channel is a place where you can get the latest updates on using Dagster with BigQuery, ask questions, provide feedback, and discuss your use cases with other BigQuery users! To start, the BigQuery I/O manager will be marked experimental, so do note that there may be some changes in the next few weeks. All changes and new features will be announced here! Right now the BigQuery I/O manager only supports Pandas DataFrames, but PySpark support is in the works. Additionally, comprehensive getting started and advanced guides will be released soon! In the meantime, here’s a code snippet to help you get started:
Copy code
import pandas as pd

from dagster import asset, Definitions
from dagster_gcp_pandas import bigquery_pandas_io_manager

def iris_data() -> pd.DataFrame:
    return pd.read_csv(
            "Sepal length (cm)",
            "Sepal width (cm)",
            "Petal length (cm)",
            "Petal width (cm)",

defs = Definitions(
        "io_manager": bigquery_pandas_io_manager.configured(
                "project": "my-gcp-project",  # required
                "location": "us-east5",  # optional, defaults to the default location for the project - see <> for a list of locations
                "dataset": "IRIS",  # optional, defaults to PUBLIC
If you have any questions or feedback please let us know!

Chris Histe

02/24/2023, 8:38 PM
That’s awesome news. I looked very briefly at the code. Here’s a couple thing I would love to see: • An option to configure the timeout. By default there is none on
methods. • An option to pass a custom
object even if the default is usually good enough I’m sure some people will want that. • I would love to configure how to get the results. For example QueryJob object handles dataframe and arrow. • I think it would be good to use
this way we could use the parameter
to set the LoadJobConfig. Useful for example if we don’t want to truncate all the records in the table. • Using transaction would help ensure we not are messing up the data when errors happen. We can rollback when there is a Dagster error. • A way to set a
clause would be amazing for reducing debugging/pre-prod cost on very large tables. Those are all things we are doing today in our Dagster pipelines and would need before I can migrate to the IO Manager. Happy to chat more.


02/24/2023, 8:59 PM
thanks for the feedback! this all seems super reasonable to me - i’ll open some issues and add them to my backlog funnily enough when i was writing the proof of concept for this io manager a couple months ago, all of the documentation i found pointed to using
and the bigquery client methods never showed up (even in google’s own examples!) so thanks for pointing those out!

Benedikt Buchert

02/25/2023, 1:20 PM
Hi @jamie thanks for building this! I just ran your code locally and tried materializing and got the following error with the example above:
Copy code
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "iris_data":

  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/", line 269, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/", line 386, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event):
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/", line 439, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output):
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/", line 640, in _store_output
    for elt in iterate_with_context(
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_utils/", line 469, in iterate_with_context
    with context_fn():
  File "/opt/homebrew/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/", line 85, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
AttributeError: 'NoneType' object has no attribute 'write'

  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/", line 55, in op_execution_error_boundary
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_utils/", line 471, in iterate_with_context
    next_output = next(iterator)
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/", line 630, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/storage/", line 139, in handle_output
    self._handlers_by_type[obj_type].handle_output(context, table_slice, obj, conn)
  File "/opt/homebrew/lib/python3.10/site-packages/dagster_gcp_pandas/bigquery/", line 41, in handle_output
  File "/opt/homebrew/lib/python3.10/site-packages/pandas_gbq/", line 1220, in to_gbq
  File "/opt/homebrew/lib/python3.10/site-packages/pandas_gbq/", line 614, in load_data
    chunks = tqdm.tqdm(chunks)
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/", line 1109, in __init__
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/", line 1361, in refresh
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/", line 1509, in display
    self.sp(self.__str__() if msg is None else msg)
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/", line 350, in print_status
    fp_write('\r' + s + (' ' * max(last_len[0] - len_s, 0)))
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/", line 343, in fp_write
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/", line 89, in __getattr__
    return getattr(self._wrapped, name)
Does the io manager create the dataset and table automatically? Also I adjusted bellow so i can use these as the column headers in Bigquery:
Copy code

Chris Histe

02/27/2023, 2:26 PM
@jamie the docs for the BigQuery client are notoriously hard to find and google examples are way to simplistic 😉 looking forward any improvement you include
🎉 1


02/27/2023, 4:29 PM
hey @Benedikt Buchert i’m going to split your issue out into a new thread so we can debug!

Chris Histe

03/01/2023, 10:26 PM
it is unclear is load_table_from_dataframe will append or overwrite an existing table
FYI this is configurable with the config:
Copy code
WRITE_TRUNCATE: If the table already exists, BigQuery overwrites the table data and uses the schema from the load.
WRITE_APPEND: If the table already exists, BigQuery appends the data to the table.
WRITE_EMPTY: If the table already exists and contains data, a 'duplicate' error is returned in the job result.


03/01/2023, 10:28 PM
yeah - in testing the default behavior is to append, so it should be a drop in replacement of the current system (yay!). In term of specifying JobConfig - i’ll need to think about how we can support this. the main issue is passing the config from the asset to the IO manager. I’m not totally sure we’ll be able to handle some of the bigquery data types. I’ll need to do more testing on that so i’m not going to include it as part of converting to using the built in methods

Chris Histe

03/01/2023, 10:35 PM
I wrote the IO Manager for W&amp;B and used the metadata object to pass this kind of user defined configuration. You could use a similar pattern e.g
Copy code
import bigquery

load_job_config = bigquery.LoadJobConfig(

           "bigquery_configuration": {
               "load_job_config": load_job_config,
def create_table():
   return [1, 2, 3]


03/01/2023, 10:38 PM
oh great! if that works then it shouldn’t be an issue! i just wasn’t sure off the top of my head of passing arbitrary classes as metadata would break in some way. regardless, adding support for it will take a little bit longer since i’ll need to add more testing. but i’m trying to tackle the list of issues i linked this week, so i should get to it soon!

Chris Histe

03/01/2023, 10:40 PM
If I remember correctly I was able to pass entire modules. I didn't like that design and didn't keep it but it was working.
No rush, it's better to have a robust io manager for everyone 😁

Alexander Butler

03/02/2023, 11:32 PM
No one would believe me if I told them how hard it was awhile back to find out the storage write API ships with an Async client. Lol.

Jimmy Rivera

05/29/2023, 11:27 AM
QQ: was this implemented already? I'm trying to do WRITE_APPEND but I'm struggling to keep it from deleting old data.