https://dagster.io/ logo
#ask-community
Title
# ask-community
c

Chris Hansen

08/04/2022, 7:12 PM
does anyone have a simple example of a job that runs a bigquery query? i image
bq_op_for_queries
is what i want to use, but the docs lost me at
Expects a BQ client to be provisioned in resources as context.resources.bigquery.
🤖 1
o

owen

08/04/2022, 8:11 PM
hi @Chris Hansen! That's not a great docstring 😬. It basically just means that, when you're setting up your job, you'll need to supply a
dagster_gcp.bigquery_resource
to your job when setting your resource_defs, i.e.
Copy code
from dagster_gcp import bigquery_resource, bq_op_for_queries

my_op = bq_op_for_queries(["select 1", "select * from foo"])

@job(resource_defs={"bigquery": bigquery_resource})
def my_job():
    my_op()
c

Chris Hansen

08/04/2022, 8:14 PM
ah got it! and do i need to configure that bigquery resource somewhere?
and set options like destination table, write disposition, etc
o

owen

08/04/2022, 8:18 PM
yeah -- with that code example, you'd need to configure the bq resource when kicking off a run of the job (i.e. in the Launchpad UI), but you can also configure it right there:
Copy code
@job(
    resource_defs={
        "bigquery": bigquery_resource.configured(
            {
                "project": "xyz",
                "location": "abc",
            }
         )
    }
)
not super familiar with gcp, but it looks like it's a thin wrapper over the bigquery.Client API (https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/libraries/dagster-gcp/dagster_gcp/bigquery/resources.py), so if you want to set settings other than project or location, I think you could just make your own custom resource with a more permissive config schema
c

Chris Hansen

08/04/2022, 8:20 PM
where does
bq_op_for_queries
use that client?
nvm i see it in
_bq_core_command
. i think that’s where i’d want to speciy the write behavior of the query
👍 1
it comes from”
query_job_config = _preprocess_config(context.op_config.get("query_job_config", {}))
how do i define what’s in
context.op_config.get("query_job_config", {})
?
o

owen

08/04/2022, 8:24 PM
you can do that in a few different ways, but if you don't think you'll ever want to change that config, you can do
Copy code
my_op = bq_op_for_queries(["select 1", "select * from foo"]).configured({"dictionary": "of config"}, name="name_for_op")
c

Chris Hansen

08/04/2022, 8:29 PM
amazing. thanks!
o

owen

08/04/2022, 8:29 PM
no problem 🙂
c

Chris Hansen

08/04/2022, 10:12 PM
so my job writes to a bigquery table. i imagine that should be included as an asset. how does that change the code above?
o

owen

08/04/2022, 10:17 PM
by asset, do you mean software-defined asset? If so, you can do something like:
Copy code
...
from dagster import AssetsDefinition

my_op = bq_op_for_queries(["select 1"]).configured({"dictionary": "of config"}, name="name_for_op")

my_bq_asset = AssetsDefinition.from_op(
    my_op,
    keys_by_output_name={"result": AssetKey(["my_schema", "my_table"]),
)
oh and then instead of providing the resource to the
@job
decorator, you'll call with_resources: https://docs.dagster.io/concepts/resources#providing-resources-to-software-defined-assets
c

Chris Hansen

08/04/2022, 10:20 PM
oo nice, thank you! does it being a BQ table affect how it's reflected in the UI?
i guess the op returns a dataframe
o

owen

08/04/2022, 10:22 PM
looks like the op that's generated has a tag of
{"kind": "sql"}
, so there will be a little pill on the bottom of the asset that says "sql" on it. If you click on the asset, I believe it'll reference the fact that it uses a resources named "bigquery" somewhere in the sidebar
c

Chris Hansen

08/04/2022, 10:22 PM
awesome, i'll give it a shot. thanks!
o

owen

08/04/2022, 10:23 PM
oh and you can also add extra custom metadata when calling
.from_op()
. i.e.
AssetsDefinition.from_op(..., metadata_by_output_name={"result": {"database": "bigquery", "schema": "cool schema"}})
(that will also show up in the sidebar)
c

Chris Hansen

08/04/2022, 11:34 PM
is there a way to change the asset definition depending on the deployment? for example, for a branch deployment, i want to write to a staging table and have the asset point to that.
o

owen

08/04/2022, 11:37 PM
I think the most straightforward way of doing this would be to change the config you pass to the op based on an environment variable (I believe branch deployments set an env var somewhere? not 100% sure)
c

Chris Hansen

08/04/2022, 11:47 PM
if not, could just create one in the agent
unless the agent isn't what executes that part of the code
o

owen

08/05/2022, 12:49 AM
On all deployments, the following env vars are set:
Copy code
DAGSTER_CLOUD_DEPLOYMENT_NAME
DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT
On branch deployments, the following env vars are set (and I'm told all deployments will get these vars set soon):
Copy code
DAGSTER_CLOUD_GIT_SHA
DAGSTER_CLOUD_GIT_TIMESTAMP
DAGSTER_CLOUD_GIT_AUTHOR_EMAIL
DAGSTER_CLOUD_GIT_AUTHOR_NAME
DAGSTER_CLOUD_GIT_MESSAGE
DAGSTER_CLOUD_GIT_BRANCH
DAGSTER_CLOUD_GIT_REPO
DAGSTER_CLOUD_PULL_REQUEST_ID
DAGSTER_CLOUD_PULL_REQUEST_STATUS
c

Chris Hansen

08/05/2022, 1:32 AM
ah great to know!
one more question. if i want to write to this BQ table daily and populate the date into a column, where can i get that date from?
o

owen

08/05/2022, 4:02 PM
Would you be appending to the end of the table (rather than rewriting it from scratch?) If so, I'd check out partitioned assets
c

Chris Hansen

08/05/2022, 4:30 PM
yes, exactly. is there a way to get the partition from the op context? for example if i want to populate a column with the date?
o

owen

08/05/2022, 4:30 PM
yep!
context.partition_key
c

Chris Hansen

08/05/2022, 4:31 PM
does that exist for regular ops too?
non-assets
o

owen

08/05/2022, 4:32 PM
it does
c

Chris Hansen

08/05/2022, 4:32 PM
does it derive from the schedule?
o

owen

08/05/2022, 4:34 PM
the key itself is automatically applied by the schedule yep 🙂. On the other hand, if you have a partitioned job (asset or otherwise), often the easiest thing to do is to derive the schedule from those partitions: https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#schedules-from-partitioned-assets-and-jobs (i.e. if you have an asset that's partitioned by day, build a schedule to run it once a day)
c

Chris Hansen

08/05/2022, 8:20 PM
how i get at the context if i’m using a convenience op like
bq_op_for_queries
?
o

owen

08/05/2022, 8:23 PM
there's not really a great way to change what the op is doing once it's been created -- for situations like this where you have more customized requirements, I'd recommend just creating your own op based off of that one.
c

Chris Hansen

08/05/2022, 8:27 PM
all righty, thanks!
if i’m using the @asset, how would i define what would otherwise go in AssetKey([“a”, “b”, “c”])?
o

owen

08/05/2022, 8:31 PM
as in, how do you construct the asset key itself?
c

Chris Hansen

08/05/2022, 8:32 PM
in bigquery, fully qualified names have the format
<project>.<dataset>.<table>
, so i was thinking
AssetKey([<project>, <dataset>, <table>]
would make sense. but the @asset decorator doesn’t ahve an asset key parameter
👍 1
just name, namespace, key_prefix
o

owen

08/05/2022, 8:33 PM
ah I see yeah --
namespace
is deprecated, but you'd basically just do either:
Copy code
@asset(key_prefix=[<project>, <dataset>])
def <table>():
    # do stuff
or
Copy code
@asset(name=<table>, key_prefix=[<project>, <dataset>])
def some_fn_name():
    # do stuff
c

Chris Hansen

08/05/2022, 8:35 PM
ah, got it, thanks!
🎉 1
so now i’m going from asset to job, and using
define_asset_job
, which requires a
selection
parameter. how do i specify and AssetKey there? or is there a better way to go from asset to job?
o

owen

08/05/2022, 8:46 PM
You have a few different options, which may be more or less ergonomic depending on what set of assets you want to run. If you just want a job that refreshes a single asset, you can either do
selection=AssetSelection.keys(AssetKey([...]))
or, you can do
selection="project/dataset/table"
for a terser format.
c

Chris Hansen

08/05/2022, 8:47 PM
ah, so
/
is the delimiter?
o

owen

08/05/2022, 8:47 PM
yep!
c

Chris Hansen

08/05/2022, 8:47 PM
good to know! thanks!
👍 1
what is the path that shows in the UI when i materialize a partition? what does it refer to?
o

owen

08/05/2022, 9:25 PM
do you have a screenshot handy? my guess is that this is coming from the default
fs_io_manager
which adds a bit of metadata to each output it processes, indicating the local filepath that the asset is stored in
c

Chris Hansen

08/05/2022, 9:55 PM
it’s a bigquery asset, so it lives in bigquery
o

owen

08/05/2022, 9:57 PM
is this still using the
bq_op_for_queries
? if so, what query(ies) are you running? are they essentially insert statements?
c

Chris Hansen

08/05/2022, 9:59 PM
it is not. because i wanted access to
context.partition_key
, i’m using
bigquery.Client
directly in an @asset method
o

owen

08/05/2022, 10:04 PM
got it -- do you mind sharing your code? The basic answer though is that if you return a python object from your asset, dagster will assume that is the contents of your asset, and will need to store it somewhere. The IOManager is the thing that figures out how to convert that in memory object to a persisted thing. By default, the IOManager will pickle the object and store it on the local filesystem, but you can also write your own IOManager to do the storing/loading (i.e. maybe the out python object is an "insert ..." sql query, and the IOManager's handle output function runs that sql query, and the IOManager's load_input function runs a "SELECT *" against the table, and returns a dataframe w/ the contents) You can opt out of this IOManager system (and just handle the IO within the asset itself) by setting None as the return type of the function, i.e.
Copy code
@asset
def my_asset(...) -> None:
    # ...
and just not returning anything.
c

Chris Hansen

08/05/2022, 10:07 PM
Copy code
@asset(
    name="simple_bq_output_3",
    key_prefix=["project", "dataset"],
    partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"),
    required_resource_keys={"bigquery"},
    group_name="project"
)
def simple_bq_output(context: OpExecutionContext):
    bq: bigquery.Client = context.resources.bigquery
    query = """
    ...
    """.format(partition_key=context.partition_key)
    <http://context.log.info|context.log.info>("Running query:\n" + query)
    query_job = bq.query(query,
                         QueryJobConfig(
                             create_disposition="CREATE_IF_NEEDED",
                             write_disposition="WRITE_APPEND",
                             destination="project.datset.simple_bq_output_3")
                         )
    <http://context.log.info|context.log.info>(query_job.result().num_results)
i dont’ return anything but i log something at the end
o

owen

08/05/2022, 10:13 PM
got it -- I think it's literally storing the value
None
(pickled) in that filepath. You can stop it from doing this by giving the output the
None
type (so Dagster knows you never want to return anything):
Copy code
def simple_bq_output(context: OpExecutionContext) -> None:
or by setting the output type to
Nothing
i.e.
Copy code
from dagster import Nothing, asset

@asset(..., dagster_type=Nothing)
c

Chris Hansen

08/05/2022, 10:14 PM
ah nice, i’ll try that, thanks!
2 Views