MikeVL
02/06/2023, 9:32 PM@io_manager(
config_schema={
"credentials": StringSource,
"project_id": StringSource,
"dataset_id": Field(
str, default_value="my_dataset", description="Dataset ID. Defaults to 'my_dataset'"
),
}
)
def bigquery_pandas_io_manager(init_context: InitResourceContext) -> BigQueryDataframeIOManager:
return BigQueryDataframeIOManager(
credentials=json.loads(init_context.resource_config["credentials"]),
project_id=init_context.resource_config["project_id"],
dataset_id=init_context.resource_config["dataset_id"],
)
From:
https://github.com/dagster-io/dagster/blob/78df951f1f0d3934a5ca3ea34a84036f31d3525c/examples/quickstart_gcp/quickstart_gcp/io_managers.py@asset(
io_manager_key="bq_io_manager",
required_resource_keys={"bq_auth"},
ins = {key: AssetIn(key) for key in asset_keys},
partitions_def=DailyPartitionsDefinition(start_date="2022-10-05"),
key_prefix="snocountry"
)
def resort_raw(context, **resort_assets) -> pd.DataFrame:
"""Insert clean resort records to BQ"""
Sean Lopp
02/07/2023, 4:07 PMdataset_table = f"{context.asset_key.path[-2]"}.{context.asset_key.path[-1]}"
MikeVL
02/07/2023, 4:16 PM