Welcome :blob-wave: This channel is a place where ...
# integration-bigquery
j
Welcome blob wave This channel is a place where you can get the latest updates on using Dagster with BigQuery, ask questions, provide feedback, and discuss your use cases with other BigQuery users! To start, the BigQuery I/O manager will be marked experimental, so do note that there may be some changes in the next few weeks. All changes and new features will be announced here! Right now the BigQuery I/O manager only supports Pandas DataFrames, but PySpark support is in the works. Additionally, comprehensive getting started and advanced guides will be released soon! In the meantime, here’s a code snippet to help you get started:
Copy code
import pandas as pd

from dagster import asset, Definitions
from dagster_gcp_pandas import bigquery_pandas_io_manager

@asset
def iris_data() -> pd.DataFrame:
    return pd.read_csv(
        "<https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data>",
        names=[
            "Sepal length (cm)",
            "Sepal width (cm)",
            "Petal length (cm)",
            "Petal width (cm)",
            "Species",
        ],
    )

defs = Definitions(
    assets=[iris_data],
    resources={
        "io_manager": bigquery_pandas_io_manager.configured(
            {
                "project": "my-gcp-project",  # required
                "location": "us-east5",  # optional, defaults to the default location for the project - see <https://cloud.google.com/bigquery/docs/locations> for a list of locations
                "dataset": "IRIS",  # optional, defaults to PUBLIC
            }
        )
    },
)
If you have any questions or feedback please let us know!
c
That’s awesome news. I looked very briefly at the code. Here’s a couple thing I would love to see: • An option to configure the timeout. By default there is none on
Client
methods. • An option to pass a custom
Retry
object even if the default is usually good enough I’m sure some people will want that. • I would love to configure how to get the results. For example QueryJob object handles dataframe and arrow. • I think it would be good to use
load_table_from_dataframe
this way we could use the parameter
job_config
to set the LoadJobConfig. Useful for example if we don’t want to truncate all the records in the table. • Using transaction would help ensure we not are messing up the data when errors happen. We can rollback when there is a Dagster error. • A way to set a
LIMIT
or
TABLESAMPLE
clause would be amazing for reducing debugging/pre-prod cost on very large tables. Those are all things we are doing today in our Dagster pipelines and would need before I can migrate to the IO Manager. Happy to chat more.
j
thanks for the feedback! this all seems super reasonable to me - i’ll open some issues and add them to my backlog funnily enough when i was writing the proof of concept for this io manager a couple months ago, all of the documentation i found pointed to using
pandas_gbq
and the bigquery client methods never showed up (even in google’s own examples!) so thanks for pointing those out!
b
Hi @jamie thanks for building this! I just ran your code locally and tried materializing and got the following error with the example above:
Copy code
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "iris_data":

  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_plan.py", line 269, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 386, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event):
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 439, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output):
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 640, in _store_output
    for elt in iterate_with_context(
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 469, in iterate_with_context
    with context_fn():
  File "/opt/homebrew/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 85, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
AttributeError: 'NoneType' object has no attribute 'write'

  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
    yield
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 471, in iterate_with_context
    next_output = next(iterator)
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 630, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/storage/db_io_manager.py", line 139, in handle_output
    self._handlers_by_type[obj_type].handle_output(context, table_slice, obj, conn)
  File "/opt/homebrew/lib/python3.10/site-packages/dagster_gcp_pandas/bigquery/bigquery_pandas_type_handler.py", line 41, in handle_output
    pandas_gbq.to_gbq(
  File "/opt/homebrew/lib/python3.10/site-packages/pandas_gbq/gbq.py", line 1220, in to_gbq
    connector.load_data(
  File "/opt/homebrew/lib/python3.10/site-packages/pandas_gbq/gbq.py", line 614, in load_data
    chunks = tqdm.tqdm(chunks)
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/std.py", line 1109, in __init__
    self.refresh(lock_args=self.lock_args)
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/std.py", line 1361, in refresh
    self.display()
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/std.py", line 1509, in display
    self.sp(self.__str__() if msg is None else msg)
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/std.py", line 350, in print_status
    fp_write('\r' + s + (' ' * max(last_len[0] - len_s, 0)))
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/std.py", line 343, in fp_write
    fp.write(_unicode(s))
  File "/opt/homebrew/lib/python3.10/site-packages/tqdm/utils.py", line 89, in __getattr__
    return getattr(self._wrapped, name)
Does the io manager create the dataset and table automatically? Also I adjusted bellow so i can use these as the column headers in Bigquery:
Copy code
names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
c
@jamie the docs for the BigQuery client are notoriously hard to find and google examples are way to simplistic 😉 looking forward any improvement you include
🎉 1
j
hey @Benedikt Buchert i’m going to split your issue out into a new thread so we can debug!
c
Thanks.
it is unclear is load_table_from_dataframe will append or overwrite an existing table
FYI this is configurable with the config: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.write_disposition
Copy code
WRITE_TRUNCATE: If the table already exists, BigQuery overwrites the table data and uses the schema from the load.
WRITE_APPEND: If the table already exists, BigQuery appends the data to the table.
WRITE_EMPTY: If the table already exists and contains data, a 'duplicate' error is returned in the job result.
j
yeah - in testing the default behavior is to append, so it should be a drop in replacement of the current system (yay!). In term of specifying JobConfig - i’ll need to think about how we can support this. the main issue is passing the config from the asset to the IO manager. I’m not totally sure we’ll be able to handle some of the bigquery data types. I’ll need to do more testing on that so i’m not going to include it as part of converting to using the built in methods
c
I wrote the IO Manager for W&amp;B and used the metadata object to pass this kind of user defined configuration. You could use a similar pattern e.g
Copy code
import bigquery

load_job_config = bigquery.LoadJobConfig(
    write_disposition="WRITE_TRUNCATE"
)

@op(
   out=Out(
       metadata={
           "bigquery_configuration": {
               "load_job_config": load_job_config,
           }
       }
   )
)
def create_table():
   return [1, 2, 3]
j
oh great! if that works then it shouldn’t be an issue! i just wasn’t sure off the top of my head of passing arbitrary classes as metadata would break in some way. regardless, adding support for it will take a little bit longer since i’ll need to add more testing. but i’m trying to tackle the list of issues i linked this week, so i should get to it soon!
c
If I remember correctly I was able to pass entire modules. I didn't like that design and didn't keep it but it was working.
No rush, it's better to have a robust io manager for everyone 😁
a
No one would believe me if I told them how hard it was awhile back to find out the storage write API ships with an Async client. Lol.
j
QQ: was this implemented already? I'm trying to do WRITE_APPEND but I'm struggling to keep it from deleting old data.