https://dagster.io/ logo
#ask-community
Title
# ask-community
b

BC A

03/22/2023, 4:59 PM
Hi everyone! I'd like to materialize my dbt assets (I use bigquery as a backend) and output the final result into a GCS bucket. I see that the new bigquery integration allows me to write custom sql queries / import from gcs into bq but not the other way around, so I was wondering if there was a way to do it and keep my data lineage. Thanks in adv
j

jamie

03/22/2023, 8:33 PM
the bigquery integration has a resource that just wraps the BigQuery client. you could potentially use that in conjunction with the GCS resource to move data from bigquery to GCS within an asset
b

BC A

03/22/2023, 8:35 PM
Thanks, that implies that I should be using GCS as IO manager right ? and not bigquery ? (or should I use both?)
j

jamie

03/22/2023, 8:41 PM
i think you would use both. Kinda hard to say for certain without knowing more about your use case, but you could do something like this
Copy code
@asest(
    io_manager_key="bigquery"
)
def my_bigquery_asset() -> pd.DataFrame:
    # this return is stored using the bigquery io manager
    return pd.DataFrame({"foo": [1, 2, 3]})

@asset(
    io_manager_key="gcs"
)
def my_gcs_asset(my_bigquery_asset)-> pd.DataFrame:
    # my_bigquery_asset is loaded using the bigquery io manager

    # this return is stored using the gcs io manager
    return my_bigquery_asset + 1


defs = Definitions(
    assets=[my_bigquery_asset, my_gcs_asset],
    resources={
        "bigquery": bigquery_pandas_io_manager,
        "gcs": gcs_pickle_io_manager
    }
)
b

BC A

03/22/2023, 8:52 PM
got it, here's my use case : I have tableA materialized in bigquery using DBT, and I want to have it materialized in GCS . The thing is I don't want it to be loaded in memory as a pandas as it can be really big
j

jamie

03/23/2023, 1:52 PM
ok in that case you could do this
Copy code
dbt_assets = # code to load the dbt assets

@asest(
    required_resource_keys={"bigquery", "gcs"},
    non_argument_deps="name_of_the_dbt_asset"
)
def my_gcs_asset(context) -> pd.DataFrame:
    bigquery = context.resources.bigquery # this will be a google.cloud.bigquery.Client()
    gcs = context.resources.gcs # this will be a google.storage.client.Client()
    
   # code using bigquery and gcs clients to move the dbt table from bigquery to gcs

defs = Definitions(
    assets=[my_gcs_asset, **dbt_assets],
    resources={
        "bigquery": bigquery_resource,
        "gcs": gcs_resource
    }
)
The
non_argument_deps
will make
my_gcs_asset
depend on
name_of_the_dbt_asset
, but it will not load the table in memory. Then you can use the two resources to move the table
b

BC A

03/23/2023, 6:39 PM
Thanks, I think the bigquery client already have a method to move tabls from BQ to GCS. I still have one more question though, how can I get the metadata such as project name/collection name/table name of from my DBT upstream asset using this method ? a hacky way would be to declare everything in a static way but if there's another way I'll take it
j

jamie

03/23/2023, 6:41 PM
you might be able to attach the dbt resource to the asset and inspect the configuration of the resource or get some of the info that way. i’m not super familiar with what you can get from the resource, but that’s one place to start
b

BC A

03/24/2023, 1:55 PM
I actually find a way without using the
non_argument_deps
, by simply implementing a custom IO manager using Bigquery client. Thanks for your help!
👍 1
2 Views