Hi team, I have a pipeline that goes something like this: 1. A sensor reads from a Google Cloud Storage bucket for new files. For each new file (e.g. <some_id>.txt) a new Job run_request is created. 2. The Job performs a series of transforms and writes to various database tables with <some_id> as the key. If I understand correctly: • I cannot write my operations as software-defined assets (
) because <some_id> is only known at Job run time and not at Definition/Deployment time (so I cannot use <some_id> as a partition key). • I cannot write a custom
to handle the writing and reading of these database rows because there’s no way to pass the
the value of <some_id>. So, instead, what I’ve done instead is: • Have a generic Op
that is then configured for each database table. This allows me to also log an AssetMaterialization event within
. Can you please let me know if my understanding is correct? Or, is there a better/canonical way to achieve what I need? Ideally I would love runtime asset partitions to get all the benefits of software-defined assets for my pipeline.
I think you've hit the nail on the head here. This is on our current roadmap: we hope to have it done within the next six weeks.
If you'd be up for writing up your use case as a comment on that issue (basically what you wrote here), it would help us make sure we get the feature right.
Thanks for getting back to me, Sandy. And great to hear that it’s on the roadmap! I have added my use-case to the GH issue.
Hi @sandy just wondering if there’s been progress on this? Are you still planning to release this in the next few weeks?
hey Alvin - @claire is actively working on this, and she's got a couple of PRs in review: • https://github.com/dagster-io/dagster/pull/11994https://github.com/dagster-io/dagster/pull/12000 we're still aiming to release it in the next few weeks
Great! Thank you for the update. You all are fantastic.
Hello! I have a use case very similar to what Alvin described. I saw the PR were merged, and there is a dynamic_partitioned_config decorator documented. Was this released and is the decorator related ? I was wondering if the
function could use resources of the project to fetch the list of partitions ?
Hi Louis, the
decorator existed prior to new functionality introduced on
. We don't currently have built in support for
to fetch the list of partitions, though you could create your own
object that accepts a
instance. Something to be aware of is that this functionality is experimental, so still subject to change.
To be sure I understood. My use case is that we have files arriving on an s3 folder and I would like to have one partition per file. I understood I should use DynamicPartitionsDefinition and call
each time a sensor detects a new file ? Is this function idempotent ?
Yep, that's right! There's an example in the unreleased documentation here that implements the sensor pattern you mentioned: https://docs.dagster.io/master/concepts/partitions-schedules-sensors/partitions#dynamically-partitioned-assets The function is idempotent, so you can call it multiple times with the same partition key.
Oh thanks that’s exactly what I was looking for !
Hey @Alvin Yeap and @Louis Auneau - when you have the chance to try this out, I'm curious to hear how it goes for you
Thank you @sandy and @claire for working on this. I have been eagerly awaiting this feature. I also found this repo as a template (https://github.com/slopp/dagster-dynamic-partitions). I’m planning to give it a try this weekend and let you know how I go 🤘
Hello @sandy ! I played with the feature lately (not yet in production). I works fine and the master documentation helped a lot to get it working 🙂 .
awesome to hear
Hi @sandy and @claire I gave the feature a go as well and it works a treat. Thank you to you both for making it happen.
I did get an error when I try to materialize more than one dynamic partition at a time:
dagster._check.CheckError: Failure condition: Tried to access partition key for output 'result' of step 'my_test_asset', but the step output has a partition range: '3ed0c08f-83e8-4cad-8e32-7023bcb412aa' to '87f3d90d-f5c9-445b-909b-1c6b521b0c03'.
I think you're probably running into this issue: https://github.com/dagster-io/dagster/issues/10490 which occurs because the IO manager you're using doesn't support executing on ranges of partitions