jamie
02/24/2023, 4:47 PMimport pandas as pd
from dagster import asset, Definitions
from dagster_gcp_pandas import bigquery_pandas_io_manager
@asset
def iris_data() -> pd.DataFrame:
return pd.read_csv(
"<https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data>",
names=[
"Sepal length (cm)",
"Sepal width (cm)",
"Petal length (cm)",
"Petal width (cm)",
"Species",
],
)
defs = Definitions(
assets=[iris_data],
resources={
"io_manager": bigquery_pandas_io_manager.configured(
{
"project": "my-gcp-project", # required
"location": "us-east5", # optional, defaults to the default location for the project - see <https://cloud.google.com/bigquery/docs/locations> for a list of locations
"dataset": "IRIS", # optional, defaults to PUBLIC
}
)
},
)
If you have any questions or feedback please let us know!Chris Histe
02/24/2023, 8:38 PMClient
methods.
• An option to pass a custom Retry
object even if the default is usually good enough I’m sure some people will want that.
• I would love to configure how to get the results. For example QueryJob object handles dataframe and arrow.
• I think it would be good to use load_table_from_dataframe
this way we could use the parameter job_config
to set the LoadJobConfig. Useful for example if we don’t want to truncate all the records in the table.
• Using transaction would help ensure we not are messing up the data when errors happen. We can rollback when there is a Dagster error.
• A way to set a LIMIT
or TABLESAMPLE
clause would be amazing for reducing debugging/pre-prod cost on very large tables.
Those are all things we are doing today in our Dagster pipelines and would need before I can migrate to the IO Manager. Happy to chat more.jamie
02/24/2023, 8:59 PMpandas_gbq
and the bigquery client methods never showed up (even in google’s own examples!) so thanks for pointing those out!Benedikt Buchert
02/25/2023, 1:20 PMdagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "iris_data":
File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_plan.py", line 269, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 386, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event):
File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 439, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output):
File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 640, in _store_output
for elt in iterate_with_context(
File "/opt/homebrew/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 469, in iterate_with_context
with context_fn():
File "/opt/homebrew/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 85, in op_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
AttributeError: 'NoneType' object has no attribute 'write'
File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
yield
File "/opt/homebrew/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 471, in iterate_with_context
next_output = next(iterator)
File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 630, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/opt/homebrew/lib/python3.10/site-packages/dagster/_core/storage/db_io_manager.py", line 139, in handle_output
self._handlers_by_type[obj_type].handle_output(context, table_slice, obj, conn)
File "/opt/homebrew/lib/python3.10/site-packages/dagster_gcp_pandas/bigquery/bigquery_pandas_type_handler.py", line 41, in handle_output
pandas_gbq.to_gbq(
File "/opt/homebrew/lib/python3.10/site-packages/pandas_gbq/gbq.py", line 1220, in to_gbq
connector.load_data(
File "/opt/homebrew/lib/python3.10/site-packages/pandas_gbq/gbq.py", line 614, in load_data
chunks = tqdm.tqdm(chunks)
File "/opt/homebrew/lib/python3.10/site-packages/tqdm/std.py", line 1109, in __init__
self.refresh(lock_args=self.lock_args)
File "/opt/homebrew/lib/python3.10/site-packages/tqdm/std.py", line 1361, in refresh
self.display()
File "/opt/homebrew/lib/python3.10/site-packages/tqdm/std.py", line 1509, in display
self.sp(self.__str__() if msg is None else msg)
File "/opt/homebrew/lib/python3.10/site-packages/tqdm/std.py", line 350, in print_status
fp_write('\r' + s + (' ' * max(last_len[0] - len_s, 0)))
File "/opt/homebrew/lib/python3.10/site-packages/tqdm/std.py", line 343, in fp_write
fp.write(_unicode(s))
File "/opt/homebrew/lib/python3.10/site-packages/tqdm/utils.py", line 89, in __getattr__
return getattr(self._wrapped, name)
Does the io manager create the dataset and table automatically?
Also I adjusted bellow so i can use these as the column headers in Bigquery:
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
Chris Histe
02/27/2023, 2:26 PMjamie
02/27/2023, 4:29 PMChris Histe
03/01/2023, 10:26 PMit is unclear is load_table_from_dataframe will append or overwrite an existing tableFYI this is configurable with the config: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.write_disposition
WRITE_TRUNCATE: If the table already exists, BigQuery overwrites the table data and uses the schema from the load.
WRITE_APPEND: If the table already exists, BigQuery appends the data to the table.
WRITE_EMPTY: If the table already exists and contains data, a 'duplicate' error is returned in the job result.
jamie
03/01/2023, 10:28 PMChris Histe
03/01/2023, 10:35 PMimport bigquery
load_job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_TRUNCATE"
)
@op(
out=Out(
metadata={
"bigquery_configuration": {
"load_job_config": load_job_config,
}
}
)
)
def create_table():
return [1, 2, 3]
jamie
03/01/2023, 10:38 PMChris Histe
03/01/2023, 10:40 PMAlexander Butler
03/02/2023, 11:32 PMJimmy Rivera
05/29/2023, 11:27 AM