nickvazz
12/06/2022, 9:00 PMassets
?
import glob
import os
from dagster import asset, DynamicPartitionsDefinition
def get_partitions(_):
return map(os.path.basename, glob.glob("/some/path/*"))
@asset(
group_name='test_group',
# partitions_def=DynamicPartitionsDefinition(get_partitions), # this line makes it fail
)
def partitioned_asset(context):
<http://context.log.info|context.log.info>()
owen
12/06/2022, 9:16 PMnickvazz
12/06/2022, 9:21 PMowen
12/06/2022, 9:24 PMReload Definitions
button in the UI).nickvazz
12/06/2022, 9:36 PMowen
12/06/2022, 9:45 PMStaticPartitionsDefinition
will be used. So basically every single dagster subprocess will have the most recent set of partitions at the time it executes.
the only case where you need to worry about the partitions being out of date with reality is in Dagit, which does not constantly reload the code (basically just loads it once then caches that representation of the objects in the repository). So it's only the UI that might be out of date. There is a programatic way to reload the UI though (essentially does the same thing as hitting "reload definitions"), via the dagster graphql client: https://docs.dagster.io/concepts/dagit/graphql-client#overview (reload_repository_location)nickvazz
12/06/2022, 9:53 PMevery time your repository code is loaded, the newest version of thatinteresting! The way I was thinking of working was having a job that creates a directory (and a bunch of tasks that will get tossed to dask as futures within a single `op`/`asset`) and submits awill be used. So basically every single dagster subprocess will have the most recent set of partitions at the time it executes.StaticPartitionsDefinition
run_request_for_partition
afterwards that would kick off the job that runs for a partitioned asset using a run_status_sensor
Would this update the static partitions quick enough from
• create_directory_which_is_actually_the_partition
job
• on success, a sensor ticks and submits run
• run is kicked off with new process that is forced to reload the static partitions
• asset run sees new partition and does not fail?owen
12/06/2022, 10:06 PMsome/path/
(which presumably all workers can access)
2. a run_status_sensor that waits for that job to complete, then submits a run request of a partitioned asset job for that new partition
I think the one issue here is that the run_status_sensor
does not continuously reload the code location (it's a daemon process that only reloads the code once in a while, not after every tick). So calling run_request_for_partition
within that process would sometimes work, sometimes fail, depending on if a reload had happened between the job succeeding and the sensor ticking. Under the hood, if you look at what run_request_for_partition
is doing, it's just returning a RunRequest
with some specific properties set (important ones are tags and run config), but if you wanted you could write your own version of that function that didn't call partition_set.get_partition(...)
, as that's the part which will fail if an old partitions definition is still loaded and you try to get a new partition keynickvazz
12/06/2022, 10:46 PMRunRequest
, looks like as you said, getting the tags right will be the tricky part if the partition hasnt been updated within the sensor. Do you know how often it gets updated?owen
12/06/2022, 10:48 PMdagster/partition
tag, which will have a value of the string representation of the partition that's being executednickvazz
12/06/2022, 10:49 PMdagster/partition
and dagster/partition_set
owen
12/06/2022, 10:49 PMnickvazz
12/06/2022, 10:49 PMowen
12/06/2022, 10:50 PMnickvazz
12/06/2022, 10:52 PM