Noah Ford
01/16/2023, 6:16 PMbigquery_resource
to my repository but I'd like to know why it works, mostly because I have two different BQ warehouses and I want to be able to create resources for both!
I'm quite confused where bigquery_resource
is a able to authenticate given that I haven't passed anything to it.
---
For those confused as well, I was able to write a more generalizable resource with the below, and I simply add it to my resources with "bigquery": bq_connection.configured({"credentials": [enter bq json-like string]})
from dagster import StringSource, resource
from google.cloud import bigquery # type: ignore
from google.oauth2 import service_account
import json
@resource(
config_schema={
"credentials": StringSource,
},
)
def bq_connection(init_context):
credentials = (
service_account.
Credentials.
from_service_account_info(json.loads(init_context.resource_config['credentials']))
)
return bigquery.Client(credentials=credentials, project=credentials.project_id)
Jake Kagan
01/16/2023, 7:07 PMGOOGLE_APPLICATION_CREDENTIALS='C:\Users\my_user_eredentials
Noah Ford
01/16/2023, 7:13 PMJake Kagan
01/16/2023, 7:16 PMdotenv
??Ivan Tsarev
01/16/2023, 8:40 PM@resource(
config_schema={
"gcs_credential": StringSource,
}
)
def big_query_adapter(init_context):
return BigQueryAdapter(
init_context.resource_config["gcs_credential"]
)
class BigQueryAdapter:
def __init__(self, gcs_credential: str):
creds = self._create_gcp_credentials(gcs_credential)
self._client = bigquery.Client(credentials=creds)
@staticmethod
def _create_gcp_credentials(gcs_credential: str) -> service_account.Credentials:
parsed_creds = json.loads(gcs_credential, strict=False)
parsed_creds["private_key"] = parsed_creds["private_key"].replace("\\n", "\n")
creds = service_account.Credentials.from_service_account_info(parsed_creds)
return creds
# here we can create as many different BigQueryAdapter-based resources as needed
# with different credentials stored in variables like GOOGLE_APPLICATION_CREDENTIALS_1
# Dagster will load envar values for us since we set it as {"env": "name_of_variable"}
our_graph_name.to_job(
resource_defs={
"big_query_adapter_1": big_query_adapter.configured(
{"gcs_credential": {"env": "GOOGLE_APPLICATION_CREDENTIALS_1"}}
),
"big_query_adapter_2": big_query_adapter.configured(
{"gcs_credential": {"env": "GOOGLE_APPLICATION_CREDENTIALS_2"}}
)
}
)