https://dagster.io/ logo
Title
a

Arun Kumar

09/26/2021, 9:42 AM
Hi team, working on the migration to new core API (graphs, jobs,..). I could not figure out exactly how to emit a RunRequest from a sensor to trigger a partition on a particular job. Could someone shed some light or point to some doc? Here is the code that I have, to do the same with partition sets.
partition = partition_set.get_partition(name=partition_date)
yield RunRequest(
    run_key=f"{source.name}"
    run_config=partition_set.run_config_for_partition(partition),
    tags={
          **partition_set.tags_for_partition(partition),
          "event_source": source.name,
         },
)
I have created a partitioned job instance like below
@daily_partitioned_config(start_date="2020-01-01")
 def daily_partitioned_run_config(start: datetime, _end: datetime):
        return {
            "resources": {
                "partition_config": {
                    "config": {"date": start.strftime(constants.PARTITION_DATE_FORMAT)}
                }
           }

 graph.to_job(
    name=source.name,
    config=daily_partitioned_run_config,
    resource_defs={
        "partition_config": partition_config,
    },
)
@sandy do you know what is the equivalent function for
partition_set.tags_for_partition(partition)
with the new APIs?
p

prha

09/27/2021, 7:53 PM
@Arun Kumar thanks for highlighting some rough spots in our API. There’s not an elegant way to do this right now. We should make it easier to issue run requests for partitioned jobs.
@Dagster Bot issue enable easy way to build run requests from partitioned jobs
d

Dagster Bot

09/27/2021, 7:53 PM
p

prha

09/27/2021, 7:54 PM
cc @chris
a

Arun Kumar

09/27/2021, 7:58 PM
Thanks @prha for the response. Do we have any less elegant way to do this right now? 🙂
s

sandy

09/27/2021, 8:00 PM
@Arun Kumar in the mean time, you should be able to do something like:
from dagster.core.definitions.time_window_partitions import TimeWindow

partition = Partition(value=TimeWindow(start=..., end=None), name="something")
run_config = daily_partitioned_run_config.run_config_for_partition_fn(partition)
yield RunRequest(run_config=run_config, ...)
a

Arun Kumar

09/27/2021, 8:02 PM
Thanks 🙏 How about the tags? What are the exact tags that I need to set to replace
partition_set.tags_for_partition(partition)
?
p

prha

09/27/2021, 8:02 PM
for tags you could do something like this:
my_job = graph.to_job(...)

tags = my_job.get_partition_set_def().tags_for_partition(partition)
eventually, we’d like to deprecate partition sets altogether, and put some of these directly on the job definition itself
that partition set will exist only for jobs with partitioned config
a

Arun Kumar

09/27/2021, 8:54 PM
You guys are great :clapping-all: Thanks for all the help and I will give it a try