Hi all. I’m interested in using the new `DynamicOu...
# announcements
a
Hi all. I’m interested in using the new
DynamicOutput
s but ran into an issue. Is there any way to “fan-in” the dynamic outputs? I’m interested in a pattern in which I create a variable number of outputs based on a config and then reduce them with a single solid. A workaround is to lift all solids to work with lists, but that’s not as nice.
a
Is there any way to “fan-in” the dynamic outputs?
not yet
a
Is there a plan for how to do this? I could maybe help with the implementation.
a
the plan is fuzzy and the changes required are significant and invasive
a
fair enough
a
Thanks!
s
@antonl - out of curiosity, what are you using the reduce for?
j
I actually just implemented this (dynamicOutput) to test - no need for “fan-in” - so I hope it doesn’t go away any time soon. Use Case... I have a json array with 5 objects each containing different S3 bucket prefixes (among other keys) - file stored in S3. Each array item is used to run a previously existing AWS Lambda, using the json as payload. Instead of a def func looping the json array and invoking lambda, I have one func with dynamicOutput, and downstream a func which invokes lambda. Next I’ll add a validation step to makes sure the outputs (in S3) exist.
a
@sandy there are a few things I’d like to be able to do, but here’s an example. Currently we have a data-ingestion pipeline that loads a set of files (based on a config) and then applies a file-specific transformation. The resulting objects just get stored to database, so no need for fan-in yet. However, the next part of the pipeline depends on each table existing in the database. I’d like to mark the downstream solids as dependent on each ingestion step. There are a few workarounds: • I lift each solid to work on lists: this isn’t that great, because I can’t (as far as I know) reuse the functionality of processing a single file on multiple files. • I split the pipeline in two, and then run them in sequence: not ideal because traceability information is split between two places, and there is some possibility for making incompatible configurations • I use the input file to dynamically define the pipeline: probably not the best idea because the pipeline is ephemeral. Other examples of applications: • Run a hyperparameter sweep based on a grid to dynamically create tasks. After sweep, apply a reduction step based on computed metrics to select an optimal model • Process all files in a directory, generating a summary
I’ve worked with autograd/task orchestration systems before , so I get why a dynamic graph is challenging. I was just curious if you had an idea for how this implementation would work. Eg. snakemake defines the reduce step as a checkpoint during graph execution and re-evaluates the graph at that point, essentially doing the pipeline splitting automatically. You’d have to delay (some) of the config validation after the checkpoint though, but that’s the price to pay for dynamic graphs.
a
ya we need to * figure out how we want to model this “collect” / “checkpoint” in our
ExecutionPlan
(which will likely lead to revisiting how the current “unresolved” steps for mapping are modeled) * thread through dynamic resolution state in all of our delegating
Executor
implementations
b
I also ran into this issue of needing some kind of resolve step w/ DynamicOutput. My use case is in an ELT pipeline, I'm piping table names through the Dynamic Orchestration to run an "extract" and "load" step on them which basically copies the tables from my DB into data warehouse. But then I need to run a "transform" step using dagster's
dbt_cli_run
feature, but this is dependent on all of the tables being created in the warehouse (meaning all the DynamicOutputs need to be resolved). I gave a 👍 on the Github issue comment above. For now figuring out a suitable workaround. Thanks!
a
@Basil V My current workaround is to define a pipeline using a factory function, dependent on some parameters. So in your case, you could define ahead of time the list of tables you need to ingest and construct a composite solid using those names. The output of that solid would be a sentinel value indicating that those tables are present in your data warehouse. You can then use those tables in the warehouse as normal using downstream solids. The downside is that you can’t configure the table names using dagster; it’s part of your code.
b
Thanks @antonl, I've been playing around with composite_solids to achieve something like this. So did you abandon using the DynamicOutput for now with that solution? Have any code snippets you could share to help me follow what you mean exactly? Much appreciated!
a
@Basil V Yeah, I abandoned it for now. I’ve written up an example workaround analogous to yours here https://github.com/dagster-io/dagster/issues/3646. See the additional info at the bottom.
b
Cool thanks so much! I'll check this out