sk4la
02/24/2021, 8:29 PMDynamicOutput
assignation and I find the example in the documentation a bit vague in this regard.
Quick example from the documentation:
@pipeline
def process_directory():
files_in_directory().map(process_file)
How would one pass the DynamicOutput
from files_in_directory
as a specific keyword parameter to process_file
, as in this:
@pipeline
def process_directory():
files_in_directory().map(process_file(file_path=mapped_value, other_arg="fixed value"))
Does the mapping rely on the name given to the DynamicOutput
? Is something like this even possible at the moment?alex
02/24/2021, 9:01 PM@pipeline
def process_directory():
other_arg = other_solid()
files_in_directory().map(
lambda mapped_value: process_file(file_path=mapped_value, other_arg=other_arg)
)
sk4la
02/24/2021, 9:06 PMdef example_pipeline():
resolve_source_path = compose_string.alias("resolve_source_path")
resolve_destination_path = compose_string.alias("resolve_destination_path")
# Gather some files from the local filesystem
# and return them as a `DynamicOutput` instance.
#
gathered_files = gather_files(
source_path=resolve_source_path(),
)
# Compute an ephemeral (output) file path to
# each of these files for the upcoming task.
#
transcient_files = gathered_files.map(
lambda gathered_file: compose_output_path(
file_path=gathered_file,
destination_directory_path=resolve_destination_path(),
),
).map(
# This step is supposed to federate two upstream `DynamicOutput`:
# * `gathered_file` from the `gathered_files` sub-pipeline
# * `transcient_file` from the current (implicit) sub-pipeline
#
lambda gathered_file, transcient_file: encode_file(
input_file_path=gathered_file,
output_file_path=transcient_file, # <--- This is wrong, since the `map`
# method of `DynamicOutput` only
# takes one parameter.
),
)
# Finally, compute the hash of every encoded file
# by iterating over the `transcient_files` sub-pipeline.
#
file_hashes = transcient_files.map(
lambda transcient_file: computing.compute_file_hash(
file_path=transcient_file,
),
)
As DynamicOutput
can only map around its own iterative value, I cannot manage to find a suitable solution for my problem.
Based on this, is there currently a way to “iterate over” multiple DynamicOutput
instances, as shown here?alex
02/25/2021, 8:15 PMsk4la
02/25/2021, 8:19 PMalex
02/25/2021, 8:23 PMdef example_pipeline():
source_path = compose_string.alias("resolve_source_path")()
destination_path = compose_string.alias("resolve_destination_path")()
# Gather some files from the local filesystem
# and return them as a `DynamicOutput` instance.
gathered_files = gather_files(
source_path=source_path,
)
def _hash(file):
output_path = compose_output_path(
file_path=file,
destination_directory_path=destination_path,
)
encoded_file = encode_file(
input_file_path=file,
output_file_path=output_path
)
# need to establish some data dependency to
# sequence hash after encode
# assume just returns output_path again
computing.compute_file_hash(encoded_file)
gathered_files.map(_hash)
sk4la
02/25/2021, 8:24 PMoutput.map(func)
) instead of mapping the function over the iterative (cf. func.map(output)
) ? Coming from Prefect I’m more familiar with the latter, and for the current example I think it’s a bit more straightforward.alex
02/25/2021, 8:35 PMfunc.map
. In the case func
needs other non-mapped inputs you need a solution for thatsk4la
02/25/2021, 8:42 PMfunc.map(will_map=iterative, will_not_map=mapper_ignore(non_iterative))
) that instructs the mapping engine to interpret the parameter as a simple non-iterative value ?Josh Lloyd
06/15/2021, 4:10 PMthe api isnt set in stone since this is still new and experimental - so we could consider addingThis seems like a critical ability for DynamicOutputs. In the casefunc.map
needs other non-mapped inputs you need a solution for thatfunc
alex
06/15/2021, 4:20 PMI’d like to see a way to set DynamicOutputs as specific inputs to downstream solidscan you provide more details on what you are trying to do and what is not working? Does the snippet above not apply to your case?
Josh Lloyd
06/16/2021, 8:04 PMgathered_file_results = gathered_files.map(_hash)
next_solid(gathered_file_results.collect())
but I get an attribute errors saying that gathered_file_results
is a noneType
object even thought that’s what the docs say to do.
I also tried:
next_solid(gathered_fiels.collect())
Which actually ran but it ran before any of the solids within the defined function completed._hash
function was not returning anything. As soon as I added the return
command to it, it worked as expected (my first attempt from above). the function docs should probably use the same example code provided at this link because it’s more complete