https://dagster.io/ logo
#announcements
Title
# announcements
s

sk4la

02/24/2021, 8:29 PM
Hey everyone! Does anybody have some experience with the newly-added dynamic orchestration feature? I’m currently stuggling with
DynamicOutput
assignation and I find the example in the documentation a bit vague in this regard. Quick example from the documentation:
Copy code
@pipeline
def process_directory():
    files_in_directory().map(process_file)
How would one pass the
DynamicOutput
from
files_in_directory
as a specific keyword parameter to
process_file
, as in this:
Copy code
@pipeline
def process_directory():
    files_in_directory().map(process_file(file_path=mapped_value, other_arg="fixed value"))
Does the mapping rely on the name given to the
DynamicOutput
? Is something like this even possible at the moment?
a

alex

02/24/2021, 9:01 PM
so you cant specify fixed values in composition functions at this time but something like this should work
Copy code
@pipeline
def process_directory():
    other_arg = other_solid()
    files_in_directory().map(
        lambda mapped_value: process_file(file_path=mapped_value, other_arg=other_arg)
    )
s

sk4la

02/24/2021, 9:06 PM
Thanks for the tip, I’ll give this a shot ASAP.
Also thank you so much for this feature!
dagsir 1
OK, so I spent a bit of time trying to wrap my head around this concept, and here’s what I got. I have a basic example where I want to encode and then compute the hash of X files. It looks like this:
Copy code
def example_pipeline():
    resolve_source_path = compose_string.alias("resolve_source_path")
    resolve_destination_path = compose_string.alias("resolve_destination_path")

    # Gather some files from the local filesystem
    # and return them as a `DynamicOutput` instance.
    #
    gathered_files = gather_files(
        source_path=resolve_source_path(),
    )

    # Compute an ephemeral (output) file path to
    # each of these files for the upcoming task.
    #
    transcient_files = gathered_files.map(
        lambda gathered_file: compose_output_path(
            file_path=gathered_file,
            destination_directory_path=resolve_destination_path(),
        ),
    ).map(

        # This step is supposed to federate two upstream `DynamicOutput`:
        #   * `gathered_file` from the `gathered_files` sub-pipeline
        #   * `transcient_file` from the current (implicit) sub-pipeline
        #
        lambda gathered_file, transcient_file: encode_file(
            input_file_path=gathered_file,
            output_file_path=transcient_file, # <--- This is wrong, since the `map`
                                              #      method of `DynamicOutput` only
                                              #      takes one parameter.
        ),
    )

    # Finally, compute the hash of every encoded file
    # by iterating over the `transcient_files` sub-pipeline.
    #
    file_hashes = transcient_files.map(
        lambda transcient_file: computing.compute_file_hash(
            file_path=transcient_file,
        ),
    )
As
DynamicOutput
can only map around its own iterative value, I cannot manage to find a suitable solution for my problem. Based on this, is there currently a way to “iterate over” multiple
DynamicOutput
instances, as shown here?
a

alex

02/25/2021, 8:15 PM
we dont support generating permutations over two dynamic outputs, but I think what you are trying to do is not that so its just about structuring it right
s

sk4la

02/25/2021, 8:19 PM
The example is intentionally quirky, but I think this illustrate quite well the concept of mapping over multiple inputs
Do you think there will be support for this in the future?
a

alex

02/25/2021, 8:23 PM
Copy code
def example_pipeline():
    source_path = compose_string.alias("resolve_source_path")()
    destination_path = compose_string.alias("resolve_destination_path")()
    # Gather some files from the local filesystem
    # and return them as a `DynamicOutput` instance.
    gathered_files = gather_files(
        source_path=source_path,
    )
    
    def _hash(file):
        output_path = compose_output_path(
            file_path=file,
            destination_directory_path=destination_path,
        )
        encoded_file = encode_file(
            input_file_path=file,
            output_file_path=output_path
        )
        # need to establish some data dependency to 
        # sequence hash after encode
        # assume just returns output_path again
        computing.compute_file_hash(encoded_file) 

    gathered_files.map(_hash)
s

sk4la

02/25/2021, 8:24 PM
Also, I’m a bit curious, why mapping the iterative over the function (cf.
output.map(func)
) instead of mapping the function over the iterative (cf.
func.map(output)
) ? Coming from Prefect I’m more familiar with the latter, and for the current example I think it’s a bit more straightforward.
Thanks for the snippet 😊
a

alex

02/25/2021, 8:35 PM
the api isnt set in stone since this is still new and experimental - so we could consider adding
func.map
. In the case
func
needs other non-mapped inputs you need a solution for that
s

sk4la

02/25/2021, 8:42 PM
What about a simple decorator-based wrapper (e.g.
func.map(will_map=iterative, will_not_map=mapper_ignore(non_iterative))
) that instructs the mapping engine to interpret the parameter as a simple non-iterative value ?
j

Josh Lloyd

06/15/2021, 4:10 PM
I’m having a very similar issue to this one. I’d like to see a way to set DynamicOutputs as specific inputs to downstream solids.
the api isnt set in stone since this is still new and experimental - so we could consider adding 
func.map
 . In the case 
func
 needs other non-mapped inputs you need a solution for that
This seems like a critical ability for DynamicOutputs
a

alex

06/15/2021, 4:20 PM
I’d like to see a way to set DynamicOutputs as specific inputs to downstream solids
can you provide more details on what you are trying to do and what is not working? Does the snippet above not apply to your case?
j

Josh Lloyd

06/16/2021, 8:04 PM
you’re right, my bad. That snippet you posted above worked in my case. I would strongly recommend adding that or something similar to it in the docs.
👍 1
@alex do you know how to get the collect() method to work with the example script you provided above? I tried to modify the last line of your script like this
Copy code
gathered_file_results = gathered_files.map(_hash)
next_solid(gathered_file_results.collect())
but I get an attribute errors saying that
gathered_file_results
is a
noneType
object even thought that’s what the docs say to do. I also tried:
Copy code
next_solid(gathered_fiels.collect())
Which actually ran but it ran before any of the solids within the defined function completed.
nvm I just figured it out after finding this set of documentation. My mistake was that my
_hash
function was not returning anything. As soon as I added the
return
command to it, it worked as expected (my first attempt from above). the function docs should probably use the same example code provided at this link because it’s more complete
2 Views