won
02/25/2023, 3:26 PMpd.DataFrame(['510365552', '564032331', '505153022', '547927024', '556154751', '569685159'])
TypeError: descriptor 'upper' for 'str' objects doesn't apply to a 'int' object
...
File "....virtualenvs\dagster\lib\site-packages\dagster_gcp_pandas\bigquery\bigquery_pandas_type_handler.py", line 39, in handle_output
with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns")
won
02/25/2023, 5:15 PMBigQueryPandasTypeHandler
in my case obj.dtypes returns ('items', dtype('O')),
context.add_output_metadata(
{
"row_count": obj.shape[0],
"dataframe_columns": MetadataValue.table_schema(
TableSchema(
columns=[
TableColumn(name=name, type=str(dtype))
for name, dtype in obj.dtypes.iteritems()
]
)
),
}
)
can i use sqlalchemy.types ?
json_columns = ['foo','baz']
data_type = dict.fromkeys(json_columns, types.JSON) if json_columns else {}
df.to_sql(
table_name,
connection,
dtype=data_type,
if_exists="append",
index=False,
chunksize=1000,
)
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#array-typejamie
02/27/2023, 4:30 PMBenedikt Buchert
02/27/2023, 9:53 PMmarts
but the correct dataset is prefixed by dbt abc_marts
. Also it is looking inside US location, which is not where the dataset is located. I tried setting the default location to "location": "europe-west3"
in my definitions which seems to have no effect. Is it possible to specify the dataset location and datset name per asset or asset group as well?
from dagster import asset, AssetIn
import pandas as pd
@asset(
ins={"dim_ga4__users": AssetIn(
key_prefix=["dbt_models", "marts"]
)},
group_name="marts"
)
def show_users_head(dim_ga4__users) -> pd.DataFrame:
print(dim_ga4__users.head)
return dim_ga4__users
Juan
02/28/2023, 9:00 PMjamie
03/03/2023, 4:18 PMlocation
configuration was not being used when storing Pandas DataFrames
Thanks again to everyone who has tried out the I/O manager and given feedback so far!jamie
03/10/2023, 3:25 PMgoogle.bigquery.Client
methods load_table_from_dataframe
and query
instead of the pandas_gbq
library. This shouldn’t cause any changes in functionality on your end, but do open a GitHub issue if you run into any problemsjamie
03/21/2023, 1:51 PM@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
ins={
"self_dependent_asset": AssetIn(
key=AssetKey(["self_dependent_asset"]),
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1),
),
},
metadata={
"partition_expr": "date",
},
)
def self_dependent_asset(context, self_dependent_asset: pd.DataFrame) -> pd.DataFrame:
date = pd.Timestamp(context.asset_partition_key_for_output())
if self_dependent_asset.empty:
return pd.DataFrame({"date": [date], "value": [1]})
return pd.DataFrame({"date": [date], "value": [self_dependent_asset["value"][0] + 1]})
The BigQuery I/O manager now accepts timeout
configuration. Currently, this configuration will only be used when working with Pandas DataFrames. It will set the number of seconds to wait for a request before using a retry.Dagster Jarred
04/05/2023, 9:50 PMjamie
04/14/2023, 7:49 PMBigQueryPandasIOManager
(api docs) and BigQueryPySparkIOManager
(api docs) that follow the new Pythonic Config and Resources system. The existing bigquery_pandas_io_manager
and bigquery_pyspark_io_manager
are still and will continue to be supported. However, we think that the new Pythonic system has huge ergonomic benefits so we recommend you check it out! You can read more about the Pythonic Config and Resources system here , and expect to see even more resources (including updated guides) in the coming weeks!
Additionally, the BigQuery resource now accepts authentication credentials via configuration (like the BigQuery I/O manager).Akira Chang
04/18/2023, 8:49 AMbigquery_resource
API from dagster_gcp
?jamie
04/20/2023, 8:23 PMRob Martorano
04/24/2023, 11:45 PMload_table_from_dataframe
by default doesn't allow column addition. The 2 config options that bigquery seems to expose are:
• setting the job WriteDisposition to WRITE_TRUNCATE
(this replaces the whole table and is nice because it also allows incompatible changes like changing a column type, but doesn't fit in as well with the DBIOManager abstraction and would only work for non partitioned tables)
• passing ALLOW_FIELD_ADDITION
as a schemaUpdateOption
I'm not sure the best way to expose these options but one or both would very helpful (we currently have a handwritten bq io manager that we've been using which uses `WRITE_TRUNCATE`ours doesn't support partitions though so excited to migrate!)Riley Runnoe
04/25/2023, 2:33 PMruntime_metadata_fn
to gather this metadata about partitioned builds.
The custom function I am using and the load_assets_from_dbt_project
read as follows:
def custom_metadata_fn(context, node_info):
table_name = node_info["name"]
table_schema = node_info["schema"]
n_rows = f"SELECT 1 from {table_schema}.{table_name} where date = {context.partition_key}"
return {"n_rows": n_rows}
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_PATH,
profiles_dir=DBT_PROFILES,
use_build_command=True,
partitions_def=DailyPartitionsDefinition(start_date="2023-04-14"),
partition_key_to_vars_fn=partition_key_to_dbt_vars,
runtime_metadata_fn=custom_metadata_fn,
)
The question I have is how do I pass that n_rows
query to either an io_manager or some other mechanism to actually query the {table_schema}.{table_name}
table? do i just have to access the bigquery sdk? or is there a dagster api I can interface with?
using:
dbt-bigquery==1.1.0
dagster==1.3.0
dagit==1.3.0
dagster-dbt==0.19.0
Akira Chang
04/25/2023, 3:16 PMBigquery
is updated(a row is added). I'm wondering if I could use sensor
to monitor new changes to my table, and perhaps use bigquery_resource
in an asset
to retrieve data from Bigquery?Luis Pinto
05/03/2023, 3:00 PMLuis Pinto
05/03/2023, 3:00 PMLuis Pinto
05/03/2023, 3:01 PMPablo Beltran
05/03/2023, 11:24 PMQwame
05/04/2023, 10:33 PMBigQUeryResource
to load data incrementally into a table in BQ? I am thinking the static partitions will do but I'm not sure if that'll give me what I want. Essentially, i have data that comes at a regular frequency. For each time the data comes, the output of the asset is essentially the name of the data for that delivery period. I would want it to be appended to the table in BQ. Is this possible currently with the BigqueryResource?jamie
05/05/2023, 4:37 PMBigQueryResource
follows the new Pythonic Config and Resources system. The existing bigquery_resource
will continue to be supported. This BigQueryResource works slightly differently than the bigquery_resource
. Using the new BigQueryResource
you create a bigquery.Client
(bigquery docs) and use that to run queries
from dagster import asset
from dagster_gcp import BigQueryResource
@asset
def small_petals(bigquery: BigQueryResource):
with bigquery.get_client() as client:
return client.query(
(
'SELECT * FROM IRIS.IRIS_DATA WHERE "petal_length_cm" < 1 AND'
' "petal_width_cm" < 1'
),
).result()
Qwame
05/09/2023, 7:28 PMhandle_output
section of the pandas type handler file that skips the creation of tables if the data frame has 0 rows or some sort of a check to ensure that 0 rows data frames are not passed as output to the IO managerJustin Albinet
05/12/2023, 4:53 PMBigQueryResource
(and it works like a charm 😛artydagster: ) but if I then try to add an io_manager linked to BigQuery on the same asset I have the following error: Resource config error: gcp_credentials config for BigQuery resource cannot be used if GOOGLE_APPLICATION_CREDENTIALS environment variable is set.
My asset is declared this way:
@asset(io_manager_key="bigquery_io_manager")
def apiBatch(bigquery: BigQueryResource):
And my definition is:
defs = Definitions(
assets = all_assets,
resources={
"bigquery": BigQueryResource(
project = "PROJECT",
location = "EU",
gcp_credentials = EnvVar("GCP_CREDS"),
),
"bigquery_io_manager": BigQueryPandasIOManager(
project="PROJECT",
location="EU",
dataset="mydataset",
gcp_credentials = EnvVar("GCP_CREDS"),
timeout=15.0,
)
},
)
I don't have this issue if I declare the io_manager_key to another asset; is it normal?Justin Albinet
05/16/2023, 2:37 PMWRITE_TRUNCATE
and WRITE_APPEND
with BigQueryPandasIOManager
? Seems like it's always an overwrite (I haven't tried partitioned assets, it would overwrite only the partition?)Akira Chang
05/22/2023, 2:29 PMyield
instead of return
to push data into BigQuery
with BigQueryPandasIOManager
?
I have a loop that pushes data into BigQuery
every iteration, so theoretically I would need to use yield
so that the asset won't terminate.
However, I would get this error when using yield
any help would be very helpful!!Dennis Hendricks
05/22/2023, 2:33 PMBigQueryPandasIOManager
. The partition in Dagster itself works (using DailyPartitionsDefinition
), but the result in BigQuery is a regular table instead of one with the table type Partitioned
. Is there a way to specify the table type directly via Dagster or is my only option to create the table manually in BigQuery and populate it via Dagster?Miguel Caballero Pinto
05/23/2023, 12:08 AMBigQueryPandasIOManager
. When the job is going to write to the table it fails because one of the columns in the dataframe is being translated to integer instead of string. This seems to happen because the column in the dataframe is None and dagster by default may infer that the type is an integer. This is the error I get:
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table innate-empire-283902:enrichment.person_batch_enrichment_output. Field error_code has changed type from STRING to INTEGER
Is there a way to workaround this? How can I tell dagster that the output type of the column is a string?Dennis Hendricks
05/23/2023, 9:00 AMBigQueryPandasIOManager
, and then model the data via DBT. Is there a way to skip the saving step and pass the data directly to DBT? This would allow us to omit saving the raw data and just store the final modeled data.Josh Kutsko
05/30/2023, 9:36 PMDennis Hendricks
06/01/2023, 7:29 AM