Hello! My team is building a nested dynamic graph ...
# ask-community
c
Hello! My team is building a nested dynamic graph that: • Fans out to process multiple files in parallel • Splits each file into multiple parts, then processes each file part in parallel 🧵 (See thread for code example) The number of files and the number of parts are both dynamic. While our nested dynamic graph definition appears to be valid - we can load and explore the graph in the Dagit UI, when we try to launch a run we get the following error:
Copy code
dagster._core.errors.DagsterInvariantViolationError: In job 'parse_files' op 'count_all_files', input file_subtotals must get a value either (a) from a dependency or (b) from the inputs section of its configuration.
Copy code
# Parsing subgraph
@op
def split_file(filename: str) -> list[str]:
    # Dynamic number of file parts
    return ["file_part_1.csv", "file_part_2.csv", "file_part_3.csv"]


@op(out=DynamicOut(str))
def fan_out_file_parts(parts: list[str]):
    for index, file_part in enumerate(parts):
        yield DynamicOutput(file_part, mapping_key=str(index))


@op
def parse_file_part(file_part: str) -> int:
    # Parse file_part and return subtotal
    return 5


@op
def count_file(part_subtotals: list[int]) -> int:
    return sum(part_subtotals)


@graph
def parse_file(filename: str) -> int:
    file_parts = split_file(filename)
    dynamic_outs = fan_out_file_parts(file_parts)
    results = dynamic_outs.map(parse_file_part).collect()
    return count_file(results)


# Main parsing graph
@op
def get_files() -> list[str]:
    # Dynamic number of files
    return ["file_1.csv", "file_2.csv", "file_3.csv"]


@op(out=DynamicOut(str))
def fan_out_files(filenames: list[str]):
    for index, filename in enumerate(filenames):
        yield DynamicOutput(filename, mapping_key=str(index))


@op
def count_all_files(file_subtotals: list[int]) -> int:
    return sum(file_subtotals)


@graph
def parse_files() -> int:
    filenames = get_files()
    dynamic_outs = fan_out_files(filenames)
    results = dynamic_outs.map(parse_file).collect()
    total_count = count_all_files(results)
    return total_count


# Job
parse_files_job = parse_files.to_job()
A couple of questions: 1. Is nested dynamic fan-out/fan-in like this supported? 2. Can
DynamicOut.map()
accept a graph, or does
DynamicOut.map()
only support ops? Thanks in advance for your support!
j
hi @Carlton Duffett! for 1 - right now only one level of dynamic fan out is supported for 2 - let me check and get back to you
i think the main issue mapping to a graph is that graphs don’t really accept input other than through config. So you’d have a hard time passing the outputs to the mapped graph. However, you can still compose ops in the map function. Here’s an example
Copy code
from dagster import DynamicOut, Field, graph, op
from dagster._core.definitions.events import DynamicOutput


@op
def multiply_by_two(context, y):
    <http://context.log.info|context.log.info>("echo_again is returning " + str(y * 2))
    return y * 2


@op(config_schema={"fail_on_first_try": Field(bool, default_value=False)})
def multiply_inputs(context, y, ten):
    if context.op_config["fail_on_first_try"]:
        current_run = context.instance.get_run_by_id(context.run_id)
        if y == 2 and current_run.parent_run_id is None:
            raise Exception()
    <http://context.log.info|context.log.info>("echo is returning " + str(y * ten))
    return y * ten


@op
def emit_ten():
    return 10


@op
def sum_numbers(base, nums):
    return base + sum(nums)


@op(out=DynamicOut())
def emit():
    for i in range(3):
        yield DynamicOutput(value=i, mapping_key=str(i))


@graph
def dynamic():
    # pylint: disable=no-member
    result = emit().map(lambda num: multiply_by_two(multiply_inputs(num, emit_ten())))
    multiply_by_two.alias("double_total")(sum_numbers(emit_ten(), result.collect()))


dynamic_job = dynamic.to_job()
in this example, putting the lambda function in the
map
passes each dynamic output as the
num
param, and then there are nested ops that do a series of operations
c
Thanks, this example is helpful! I don't think chaining ops in the lambda function will work for our use case because we need that second level of fan-out. Some alternatives I'm considering: 1. Use a
ProcessPoolExecutor
to parallelize processing file parts within a single op 2. Execute the
parse_file
subgraph within a single op using
execute_in_process()
3. Break this into two jobs Do any of those stand out as better or worse approaches?
j
using execute_in_process to kick off another job within an op is not something we support, so you may run into some weird edge cases/bugs with that that could be pretty hard to debug. I’m not familiar with the ProcessPoolExecutor, so i can’t speak to whether that would work or not, but i think it’s worth experimenting with! splitting into two jobs would probably work pretty well! you could store the outputs of the first job in a known location and then use a sensor to kick off the second job. then the second job would be able to pull in any outputs from the known location. it’s maybe a little bit against the ideal dagster design principle of having jobs not depend on data from other jobs, but this is a complicated use case, so i think it’s a find compromise
thank you box 2
z
+1 for being able to have multiple levels of fan out/in, I think that's something that would be super powerful
244 Views