Chris Hansen
08/04/2022, 7:12 PMbq_op_for_queries
is what i want to use, but the docs lost me at Expects a BQ client to be provisioned in resources as context.resources.bigquery.
owen
08/04/2022, 8:11 PMdagster_gcp.bigquery_resource
to your job when setting your resource_defs, i.e.
from dagster_gcp import bigquery_resource, bq_op_for_queries
my_op = bq_op_for_queries(["select 1", "select * from foo"])
@job(resource_defs={"bigquery": bigquery_resource})
def my_job():
my_op()
Chris Hansen
08/04/2022, 8:14 PMowen
08/04/2022, 8:18 PM@job(
resource_defs={
"bigquery": bigquery_resource.configured(
{
"project": "xyz",
"location": "abc",
}
)
}
)
not super familiar with gcp, but it looks like it's a thin wrapper over the bigquery.Client API (https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/libraries/dagster-gcp/dagster_gcp/bigquery/resources.py), so if you want to set settings other than project or location, I think you could just make your own custom resource with a more permissive config schemaChris Hansen
08/04/2022, 8:20 PMbq_op_for_queries
use that client?_bq_core_command
. i think that’s where i’d want to speciy the write behavior of the queryquery_job_config = _preprocess_config(context.op_config.get("query_job_config", {}))
context.op_config.get("query_job_config", {})
?owen
08/04/2022, 8:24 PMmy_op = bq_op_for_queries(["select 1", "select * from foo"]).configured({"dictionary": "of config"}, name="name_for_op")
Chris Hansen
08/04/2022, 8:29 PMowen
08/04/2022, 8:29 PMChris Hansen
08/04/2022, 10:12 PMowen
08/04/2022, 10:17 PM...
from dagster import AssetsDefinition
my_op = bq_op_for_queries(["select 1"]).configured({"dictionary": "of config"}, name="name_for_op")
my_bq_asset = AssetsDefinition.from_op(
my_op,
keys_by_output_name={"result": AssetKey(["my_schema", "my_table"]),
)
@job
decorator, you'll call with_resources: https://docs.dagster.io/concepts/resources#providing-resources-to-software-defined-assetsChris Hansen
08/04/2022, 10:20 PMowen
08/04/2022, 10:22 PM{"kind": "sql"}
, so there will be a little pill on the bottom of the asset that says "sql" on it. If you click on the asset, I believe it'll reference the fact that it uses a resources named "bigquery" somewhere in the sidebarChris Hansen
08/04/2022, 10:22 PMowen
08/04/2022, 10:23 PM.from_op()
. i.e. AssetsDefinition.from_op(..., metadata_by_output_name={"result": {"database": "bigquery", "schema": "cool schema"}})
Chris Hansen
08/04/2022, 11:34 PMowen
08/04/2022, 11:37 PMChris Hansen
08/04/2022, 11:47 PMowen
08/05/2022, 12:49 AMDAGSTER_CLOUD_DEPLOYMENT_NAME
DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT
On branch deployments, the following env vars are set (and I'm told all deployments will get these vars set soon):
DAGSTER_CLOUD_GIT_SHA
DAGSTER_CLOUD_GIT_TIMESTAMP
DAGSTER_CLOUD_GIT_AUTHOR_EMAIL
DAGSTER_CLOUD_GIT_AUTHOR_NAME
DAGSTER_CLOUD_GIT_MESSAGE
DAGSTER_CLOUD_GIT_BRANCH
DAGSTER_CLOUD_GIT_REPO
DAGSTER_CLOUD_PULL_REQUEST_ID
DAGSTER_CLOUD_PULL_REQUEST_STATUS
Chris Hansen
08/05/2022, 1:32 AMowen
08/05/2022, 4:02 PMChris Hansen
08/05/2022, 4:30 PMowen
08/05/2022, 4:30 PMcontext.partition_key
Chris Hansen
08/05/2022, 4:31 PMowen
08/05/2022, 4:32 PMChris Hansen
08/05/2022, 4:32 PMowen
08/05/2022, 4:34 PMChris Hansen
08/05/2022, 8:20 PMbq_op_for_queries
?owen
08/05/2022, 8:23 PMChris Hansen
08/05/2022, 8:27 PMowen
08/05/2022, 8:31 PMChris Hansen
08/05/2022, 8:32 PM<project>.<dataset>.<table>
, so i was thinking AssetKey([<project>, <dataset>, <table>]
would make sense. but the @asset decorator doesn’t ahve an asset key parameterowen
08/05/2022, 8:33 PMnamespace
is deprecated, but you'd basically just do either:
@asset(key_prefix=[<project>, <dataset>])
def <table>():
# do stuff
or
@asset(name=<table>, key_prefix=[<project>, <dataset>])
def some_fn_name():
# do stuff
Chris Hansen
08/05/2022, 8:35 PMdefine_asset_job
, which requires a selection
parameter. how do i specify and AssetKey there? or is there a better way to go from asset to job?owen
08/05/2022, 8:46 PMselection=AssetSelection.keys(AssetKey([...]))
or, you can do selection="project/dataset/table"
for a terser format.Chris Hansen
08/05/2022, 8:47 PM/
is the delimiter?owen
08/05/2022, 8:47 PMChris Hansen
08/05/2022, 8:47 PMowen
08/05/2022, 9:25 PMfs_io_manager
which adds a bit of metadata to each output it processes, indicating the local filepath that the asset is stored inChris Hansen
08/05/2022, 9:55 PMowen
08/05/2022, 9:57 PMbq_op_for_queries
? if so, what query(ies) are you running? are they essentially insert statements?Chris Hansen
08/05/2022, 9:59 PMcontext.partition_key
, i’m using bigquery.Client
directly in an @asset methodowen
08/05/2022, 10:04 PM@asset
def my_asset(...) -> None:
# ...
and just not returning anything.Chris Hansen
08/05/2022, 10:07 PM@asset(
name="simple_bq_output_3",
key_prefix=["project", "dataset"],
partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"),
required_resource_keys={"bigquery"},
group_name="project"
)
def simple_bq_output(context: OpExecutionContext):
bq: bigquery.Client = context.resources.bigquery
query = """
...
""".format(partition_key=context.partition_key)
<http://context.log.info|context.log.info>("Running query:\n" + query)
query_job = bq.query(query,
QueryJobConfig(
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_APPEND",
destination="project.datset.simple_bq_output_3")
)
<http://context.log.info|context.log.info>(query_job.result().num_results)
owen
08/05/2022, 10:13 PMNone
(pickled) in that filepath. You can stop it from doing this by giving the output the None
type (so Dagster knows you never want to return anything):
def simple_bq_output(context: OpExecutionContext) -> None:
or by setting the output type to Nothing
i.e.
from dagster import Nothing, asset
@asset(..., dagster_type=Nothing)
Chris Hansen
08/05/2022, 10:14 PM