Resolved Feel like I'm going a bit crazy since I ...
# ask-community
n
Resolved Feel like I'm going a bit crazy since I must be completely missing something! I'm trying to define a Bigquery resource but cannot see where I'm supposed to pass info to authenticate such as the JSON credentials and project id! My goal is to be able to query my BQ tables in an asset. It works by simply adding
bigquery_resource
to my repository but I'd like to know why it works, mostly because I have two different BQ warehouses and I want to be able to create resources for both! I'm quite confused where
bigquery_resource
is a able to authenticate given that I haven't passed anything to it. --- For those confused as well, I was able to write a more generalizable resource with the below, and I simply add it to my resources with
"bigquery": bq_connection.configured({"credentials": [enter bq json-like string]})
Copy code
from dagster import StringSource, resource 
from google.cloud import bigquery  # type: ignore
from google.oauth2 import service_account
import json

@resource(
    config_schema={
        "credentials": StringSource,
    }, 
) 

def bq_connection(init_context):

    credentials = (
        service_account.
        Credentials.
        from_service_account_info(json.loads(init_context.resource_config['credentials']))
    )
    return bigquery.Client(credentials=credentials, project=credentials.project_id)
j
hey man from what i've done, bigquery actually requires you to have an environment variable
GOOGLE_APPLICATION_CREDENTIALS='C:\Users\my_user_eredentials
🙏 1
n
What about if I have multiple BQ warehouses?
j
that i have no idea about...you might use something like
dotenv
??
i
If I understand correctly what you need, you can parametrize resource with credentials the following way:
Copy code
@resource(
    config_schema={
        "gcs_credential": StringSource,
    }
)
def big_query_adapter(init_context):
    return BigQueryAdapter(
        init_context.resource_config["gcs_credential"]
    )

class BigQueryAdapter:
    def __init__(self, gcs_credential: str):
        creds = self._create_gcp_credentials(gcs_credential)
        self._client = bigquery.Client(credentials=creds)

    @staticmethod
    def _create_gcp_credentials(gcs_credential: str) -> service_account.Credentials:
            parsed_creds = json.loads(gcs_credential, strict=False)
            parsed_creds["private_key"] = parsed_creds["private_key"].replace("\\n", "\n")
            creds = service_account.Credentials.from_service_account_info(parsed_creds)
            return creds


# here we can create as many different BigQueryAdapter-based resources as needed 
# with different credentials stored in variables like GOOGLE_APPLICATION_CREDENTIALS_1
# Dagster will load envar values for us since we set it as {"env": "name_of_variable"}
our_graph_name.to_job(
    resource_defs={
        "big_query_adapter_1": big_query_adapter.configured(
            {"gcs_credential": {"env": "GOOGLE_APPLICATION_CREDENTIALS_1"}}
        ),
        "big_query_adapter_2": big_query_adapter.configured(
            {"gcs_credential": {"env": "GOOGLE_APPLICATION_CREDENTIALS_2"}}
        )
    }
)