Hi team, I have a pipeline that goes something li...
# ask-community
a
Hi team, I have a pipeline that goes something like this: 1. A sensor reads from a Google Cloud Storage bucket for new files. For each new file (e.g. <some_id>.txt) a new Job run_request is created. 2. The Job performs a series of transforms and writes to various database tables with <some_id> as the key. If I understand correctly: • I cannot write my operations as software-defined assets (
@asset
) because <some_id> is only known at Job run time and not at Definition/Deployment time (so I cannot use <some_id> as a partition key). • I cannot write a custom
io_manager
to handle the writing and reading of these database rows because there’s no way to pass the
io_manager
the value of <some_id>. So, instead, what I’ve done instead is: • Have a generic Op
write_to_db
that is then configured for each database table. This allows me to also log an AssetMaterialization event within
write_to_db
. Can you please let me know if my understanding is correct? Or, is there a better/canonical way to achieve what I need? Ideally I would love runtime asset partitions to get all the benefits of software-defined assets for my pipeline.
s
Or, is there a better/canonical way to achieve what I need? Ideally I would love runtime asset partitions to get all the benefits of software-defined assets for my pipeline.
I think you've hit the nail on the head here. This is on our current roadmap: we hope to have it done within the next six weeks.
If you'd be up for writing up your use case as a comment on that issue (basically what you wrote here), it would help us make sure we get the feature right.
a
Thanks for getting back to me, Sandy. And great to hear that it’s on the roadmap! I have added my use-case to the GH issue.
🙏 1
Hi @sandy just wondering if there’s been progress on this? Are you still planning to release this in the next few weeks?
s
hey Alvin - @claire is actively working on this, and she's got a couple of PRs in review: • https://github.com/dagster-io/dagster/pull/11994https://github.com/dagster-io/dagster/pull/12000 we're still aiming to release it in the next few weeks
a
Great! Thank you for the update. You all are fantastic.
l
Hello! I have a use case very similar to what Alvin described. I saw the PR were merged, and there is a dynamic_partitioned_config decorator documented. Was this released and is the decorator related ? I was wondering if the
partition_fn
function could use resources of the project to fetch the list of partitions ?
c
Hi Louis, the
dynamic_partitioned_config
decorator existed prior to new functionality introduced on
DynamicPartitionsDefinition
. We don't currently have built in support for
partition_fn
to fetch the list of partitions, though you could create your own
PartitionedConfig
object that accepts a
DynamicPartitionsDefinition(name=...)
instance. Something to be aware of is that this functionality is experimental, so still subject to change.
l
To be sure I understood. My use case is that we have files arriving on an s3 folder and I would like to have one partition per file. I understood I should use DynamicPartitionsDefinition and call
DynamicPartitionsDefinition.add_partitions
each time a sensor detects a new file ? Is this function idempotent ?
D 1
c
Yep, that's right! There's an example in the unreleased documentation here that implements the sensor pattern you mentioned: https://docs.dagster.io/master/concepts/partitions-schedules-sensors/partitions#dynamically-partitioned-assets The function is idempotent, so you can call it multiple times with the same partition key.
l
Oh thanks that’s exactly what I was looking for !
s
Hey @Alvin Yeap and @Louis Auneau - when you have the chance to try this out, I'm curious to hear how it goes for you
a
Thank you @sandy and @claire for working on this. I have been eagerly awaiting this feature. I also found this repo as a template (https://github.com/slopp/dagster-dynamic-partitions). I’m planning to give it a try this weekend and let you know how I go 🤘
l
Hello @sandy ! I played with the feature lately (not yet in production). I works fine and the master documentation helped a lot to get it working 🙂 .
s
awesome to hear
a
Hi @sandy and @claire I gave the feature a go as well and it works a treat. Thank you to you both for making it happen.
I did get an error when I try to materialize more than one dynamic partition at a time:
Copy code
dagster._check.CheckError: Failure condition: Tried to access partition key for output 'result' of step 'my_test_asset', but the step output has a partition range: '3ed0c08f-83e8-4cad-8e32-7023bcb412aa' to '87f3d90d-f5c9-445b-909b-1c6b521b0c03'.
c
I think you're probably running into this issue: https://github.com/dagster-io/dagster/issues/10490 which occurs because the IO manager you're using doesn't support executing on ranges of partitions