Can I not use dynamic partitions with `assets` ? ...
# ask-community
n
Can I not use dynamic partitions with
assets
?
Copy code
import glob
import os
from dagster import asset, DynamicPartitionsDefinition

def get_partitions(_):
    return map(os.path.basename, glob.glob("/some/path/*"))
    

@asset(
    group_name='test_group',
    # partitions_def=DynamicPartitionsDefinition(get_partitions), # this line makes it fail
)
def partitioned_asset(context):
    <http://context.log.info|context.log.info>()
🤖 1
o
hi @nickvazz! that's correct that dynamic partitions are not currently supported w/ assets (although these are on our roadmap and we're actively looking into supporting them -- it's a highly requested feature!)
n
Hi @owen is there a suggested direction to go in instead? Could I reload static partitions easily?
o
ah yeah sorry I should have mentioned -- creating a StaticPartitionsDefinition using a function will generally work. To reload dagit's understanding of what partitions exist, you'd just need to reload the code location (that would be the
Reload Definitions
button in the UI).
n
Is there a programatic way to reload definitions? Maybe using the context.instance?
o
just to be clear about the lifecycle of these things (because it's pretty tricky), every time your repository code is loaded, the newest version of that
StaticPartitionsDefinition
will be used. So basically every single dagster subprocess will have the most recent set of partitions at the time it executes. the only case where you need to worry about the partitions being out of date with reality is in Dagit, which does not constantly reload the code (basically just loads it once then caches that representation of the objects in the repository). So it's only the UI that might be out of date. There is a programatic way to reload the UI though (essentially does the same thing as hitting "reload definitions"), via the dagster graphql client: https://docs.dagster.io/concepts/dagit/graphql-client#overview (reload_repository_location)
n
every time your repository code is loaded, the newest version of that
StaticPartitionsDefinition
will be used. So basically every single dagster subprocess will have the most recent set of partitions at the time it executes.
interesting! The way I was thinking of working was having a job that creates a directory (and a bunch of tasks that will get tossed to dask as futures within a single `op`/`asset`) and submits a
run_request_for_partition
afterwards that would kick off the job that runs for a partitioned asset using a
run_status_sensor
Would this update the static partitions quick enough from •
create_directory_which_is_actually_the_partition
job • on success, a sensor ticks and submits run • run is kicked off with new process that is forced to reload the static partitions • asset run sees new partition and does not fail?
o
Hm interesting -- so just to be clear the setup would be: 1. a regular job which will create a directory underneath
some/path/
(which presumably all workers can access) 2. a run_status_sensor that waits for that job to complete, then submits a run request of a partitioned asset job for that new partition I think the one issue here is that the
run_status_sensor
does not continuously reload the code location (it's a daemon process that only reloads the code once in a while, not after every tick). So calling
run_request_for_partition
within that process would sometimes work, sometimes fail, depending on if a reload had happened between the job succeeding and the sensor ticking. Under the hood, if you look at what
run_request_for_partition
is doing, it's just returning a
RunRequest
with some specific properties set (important ones are tags and run config), but if you wanted you could write your own version of that function that didn't call
partition_set.get_partition(...)
, as that's the part which will fail if an old partitions definition is still loaded and you try to get a new partition key
n
Oh interesting! Didn't realize it was just filling in a
RunRequest
, looks like as you said, getting the tags right will be the tricky part if the partition hasnt been updated within the sensor. Do you know how often it gets updated?
o
the tags actually should be fairly straightforward (although the code path looks really nasty!), basically everything in the tags will be the same from partition to partition except for a
dagster/partition
tag, which will have a value of the string representation of the partition that's being executed
n
looks like
dagster/partition
and
dagster/partition_set
o
one implementation path would be to just generate a run_request for a partition you know already exists, then replace the dagster/partition tag yep!
n
awesome, thats not so bad!
thanks for the help
o
no problem -- admittedly a bit jank but hopefully we'll have native support for dynamic partitions in the near-medium future
lmk if you run into any issues!
n
will do!