BC A
03/22/2023, 4:59 PMjamie
03/22/2023, 8:33 PMBC A
03/22/2023, 8:35 PMjamie
03/22/2023, 8:41 PM@asest(
io_manager_key="bigquery"
)
def my_bigquery_asset() -> pd.DataFrame:
# this return is stored using the bigquery io manager
return pd.DataFrame({"foo": [1, 2, 3]})
@asset(
io_manager_key="gcs"
)
def my_gcs_asset(my_bigquery_asset)-> pd.DataFrame:
# my_bigquery_asset is loaded using the bigquery io manager
# this return is stored using the gcs io manager
return my_bigquery_asset + 1
defs = Definitions(
assets=[my_bigquery_asset, my_gcs_asset],
resources={
"bigquery": bigquery_pandas_io_manager,
"gcs": gcs_pickle_io_manager
}
)
BC A
03/22/2023, 8:52 PMjamie
03/23/2023, 1:52 PMdbt_assets = # code to load the dbt assets
@asest(
required_resource_keys={"bigquery", "gcs"},
non_argument_deps="name_of_the_dbt_asset"
)
def my_gcs_asset(context) -> pd.DataFrame:
bigquery = context.resources.bigquery # this will be a google.cloud.bigquery.Client()
gcs = context.resources.gcs # this will be a google.storage.client.Client()
# code using bigquery and gcs clients to move the dbt table from bigquery to gcs
defs = Definitions(
assets=[my_gcs_asset, **dbt_assets],
resources={
"bigquery": bigquery_resource,
"gcs": gcs_resource
}
)
The non_argument_deps
will make my_gcs_asset
depend on name_of_the_dbt_asset
, but it will not load the table in memory. Then you can use the two resources to move the tableBC A
03/23/2023, 6:39 PMjamie
03/23/2023, 6:41 PMBC A
03/24/2023, 1:55 PMnon_argument_deps
, by simply implementing a custom IO manager using Bigquery client.
Thanks for your help!