https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Alec Ryan

04/21/2022, 4:22 PM
How can I pass a job run partition_key to another job?
j

johann

04/21/2022, 5:17 PM
One way to link jobs together like this is to use Asset sensors https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#asset-sensors
a

Alec Ryan

04/21/2022, 5:20 PM
Using this method can I push the partition key to the next job?
How can I do this?
j

johann

04/21/2022, 5:49 PM
Hmm assets would let you trigger the next job but I’m not sure about passing the key. cc @owen,
o

owen

04/21/2022, 5:59 PM
I believe you can get that off of the upstream job's tags. Inside the asset sensor, this should be something like
asset_event.dagster_event.logging_tags.get("dagster/partition")
a

Alec Ryan

04/21/2022, 6:00 PM
thanks @owen, any documentation on this?
o

owen

04/21/2022, 6:05 PM
not at the moment (it seems), but there should be
@Dagster Bot docs document how to grab an upstream partition (and other useful tags) inside sensors
d

Dagster Bot

04/21/2022, 6:06 PM
a

Alec Ryan

04/21/2022, 6:06 PM
Something like this might work?
Copy code
from dagster import AssetKey, asset_sensor

def create_dbt_sensor(key, job):
    @asset_sensor(
        asset_key=AssetKey(key),
        job=job
    )
    def dbt_sensor(context, asset_event):
        yield RunRequest(
            run_key=context.cursor,
            run_config={
                "ops": {
                    "read_materialization": {
                        "config": {
                            "asset_key": asset_event.dagster_event.asset_key.path,
                            "run_date" : asset_event.dagster_event.logging_tags.get("dagster/partition")
                        }
                    }
                }
            },
        )

    return dbt_sensor
I think maybe I need to change the "ops" section
o

owen

04/21/2022, 11:31 PM
hi @Alec Ryan , sorry for the slow response. I think the run config that you want to pass downstream depends on what behavior you want to vary with this partition information. for example, if your downstream job was another dbt job, you might set the resource config for the dbt_cli_resource to set a key in the vars argument to the upstream partition key. if the downstream job is just another partitioned job, you'd probably end up creating that run config via a partitioned config function: https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions#date-partitioned-job
a

Alec Ryan

04/22/2022, 1:49 AM
He @owen, no worries. The run config that I want to pass downstream is from a partitioned asset to my dbt assets job. Ideally, that run date that is used in my partition_job
I'm struggling to figure out how to pass that to a sensor
o

owen

04/22/2022, 4:43 PM
I guess I would frame it more of "grabbing the partition key from the upstream job" than "passing the partition from the upstream job to the sensor", but I might be misunderstanding. Is the basic flow "some partitioned asset job runs" -> have a sensor that detects the updated asset, and based on the partition of the asset that was updated, launch a dbt job with config that will do something for that partition? With that in mind, I now see that it's not really necessary to grab the partition key off the job's tags, as the partition key should also be present on the asset materialization event itself, but wherever you get the partition key from on the event shouldn't make a big difference. Seems to me like there are two parts of the sensor:
Copy code
@asset_sensor(...)
def my_sensor(context, asset_event):
    asset_partition = ... # either logging tags thing or asset_event.dagster_event.partition, should be identical
    run_config = ... # some way of varying the behavior of your dbt job based on this value
    yield RunRequest(...)
One way of creating some run config that varies the behavior of the dbt job based on the partition is to just set a value in the resource config for the dbt_cli_resource, and another would be to do something like what you show above, where you set some op config values and read them inside the op. was there something about that solution that didn't work? (I'm assuming read_materializations is a custom op you wrote)
a

Alec Ryan

04/22/2022, 5:34 PM
Tbh, I haven't had a chance to test that method I shared yet. Read materialization is stolen from the hacker news asset example I think (left as a placeholder) lol
You're spot on with the flow
I think I understand now. Thanks for the info
I think I'm a bit confused about resource and run config. Can a run config parametize a resource config?
Would I want to include the dbt_cli_resource in the actual sensor definition?
o

owen

04/22/2022, 10:15 PM
you can configure resources from the run config (example: https://docs.dagster.io/concepts/configuration/config-schema#passing-configuration-to-multiple-ops-in-a-job). you wouldn't need to include the actual dbt_cli_resource -- you could just keep the dbt_cli_resource unconfigured on the job you create, and supply the necessary configuration here.
a

Alec Ryan

04/23/2022, 12:59 PM
@owen this is what I'm attempting to run
which gets imported here in my repo.py
o

owen

04/25/2022, 4:06 PM
gotcha -- at a quick glance, this seems like it would work. any issues?
a

Alec Ryan

04/25/2022, 5:29 PM
This is the issue I'm running into now @owen https://dagster.slack.com/archives/C01U954MEER/p1650841077638899
o

owen

04/25/2022, 11:02 PM
oh I see -- because the second job doesn't produce a materialized asset (because the dbt SDA integration doesn't support partitioning at the moment), the materialization from dbt will not have that info available. one way to get this would be something like:
Copy code
# query the Dagster instance for information about the upstream run
upstream_run = context.instance.get_run_by_id(asset_event.run_id)

# get the run config from that upstream run
upstream_run_config = upstream_run.run_config

# ... parse the run config to figure out the date
I'll admit, it's a bit ugly, but it should do the job
🎉 1
a

Alec Ryan

04/26/2022, 12:14 AM
Any thoughts on something like a pipeline config? A variable that can be configured for all downstream assets.
I'll give the above a shot, I believe the dbt partitioning is on the horizon right?
Also, the second job in this case is an s3> snowflake copy into. That would produce an asset right?
@owen the suggested approach above works as expected. Thanks!
❤️ 1
o

owen

04/26/2022, 4:17 PM
dbt partitioning is definitely something that we want to make possible, but I'd expect it to land more like "a couple months from now" than "a couple weeks from now". and yeah it makes sense to model that process as a software-defined asset (the snowflake table being the produced asset). personally, I'd try to combine the first two jobs into one (which produces two assets), but this pattern seems totally fine as well.
a

Alec Ryan

04/26/2022, 4:37 PM
Awesome, good to know. I was considering doing that to be able to visualize the lineage
10 Views