Kazushi Nagayama
08/09/2023, 6:00 AMVinnie
08/09/2023, 6:08 AMKazushi Nagayama
08/09/2023, 6:10 AMVinnie
08/09/2023, 6:11 AMKazushi Nagayama
08/09/2023, 6:14 AMVinnie
08/09/2023, 6:16 AMKazushi Nagayama
08/09/2023, 6:17 AMVinnie
08/09/2023, 6:18 AMKazushi Nagayama
08/09/2023, 6:20 AMjamie
08/09/2023, 3:14 PMBigQueryResource
(just a configurable wrapper around a bigquery client) and set up asset dependencies using deps
from dagster_gcp import BigQueryResource
@asset
def orders(bigquery: BigQueryResource) -> None:
sql = "SOME SQL THAT CREATES A TABLE NAMED ORDERS"
with bigquery.get_client() as client:
client.query(sql)
@asset(
deps=[orders]
)
def orders(bigquery: BigQueryResource) -> None:
with bigquery.get_client() as client:
client.query("SELECT * FROM ORDERS WHERE returned='True'")
jamie
08/09/2023, 3:15 PMquickstart-gcp
example you see a custom BQ IO manager? i thought we changed that a while ago to use the built in dagster BQ IO managerKazushi Nagayama
08/09/2023, 3:29 PMjamie
08/09/2023, 3:36 PMquickstart-gcp
so i was looking at the wrong filesKazushi Nagayama
08/09/2023, 4:12 PMKazushi Nagayama
08/09/2023, 4:13 PMdef execute_query_from_file(bigquery:BigQueryResource, file:str) -> bool:
with bigquery.get_client() as client, open(file, "r") as f:
query = f.read()
client.query(query)
return True
@op
def generate_processed_logs(bigquery:BigQueryResource) -> bool:
return execute_query_from_file(bigquery=bigquery, file="queries/processed_logs.sql")
@op
def generate_annotated_logs(bigquery:BigQueryResource, previous_step:bool) -> bool:
return execute_query_from_file(bigquery=bigquery, file="queries/annotated_logs.sql")
@job
def processe_and_annotate_logs() -> None:
generate_annotated_logs(previous_step=generate_processed_logs())
Kazushi Nagayama
08/09/2023, 4:15 PMKazushi Nagayama
08/09/2023, 4:15 PMjamie
08/09/2023, 4:17 PMKazushi Nagayama
08/09/2023, 4:32 PM