Carlton Duffett
02/02/2023, 9:32 PMdagster._core.errors.DagsterInvariantViolationError: In job 'parse_files' op 'count_all_files', input file_subtotals must get a value either (a) from a dependency or (b) from the inputs section of its configuration.
Carlton Duffett
02/02/2023, 9:32 PM# Parsing subgraph
@op
def split_file(filename: str) -> list[str]:
# Dynamic number of file parts
return ["file_part_1.csv", "file_part_2.csv", "file_part_3.csv"]
@op(out=DynamicOut(str))
def fan_out_file_parts(parts: list[str]):
for index, file_part in enumerate(parts):
yield DynamicOutput(file_part, mapping_key=str(index))
@op
def parse_file_part(file_part: str) -> int:
# Parse file_part and return subtotal
return 5
@op
def count_file(part_subtotals: list[int]) -> int:
return sum(part_subtotals)
@graph
def parse_file(filename: str) -> int:
file_parts = split_file(filename)
dynamic_outs = fan_out_file_parts(file_parts)
results = dynamic_outs.map(parse_file_part).collect()
return count_file(results)
# Main parsing graph
@op
def get_files() -> list[str]:
# Dynamic number of files
return ["file_1.csv", "file_2.csv", "file_3.csv"]
@op(out=DynamicOut(str))
def fan_out_files(filenames: list[str]):
for index, filename in enumerate(filenames):
yield DynamicOutput(filename, mapping_key=str(index))
@op
def count_all_files(file_subtotals: list[int]) -> int:
return sum(file_subtotals)
@graph
def parse_files() -> int:
filenames = get_files()
dynamic_outs = fan_out_files(filenames)
results = dynamic_outs.map(parse_file).collect()
total_count = count_all_files(results)
return total_count
# Job
parse_files_job = parse_files.to_job()
Carlton Duffett
02/02/2023, 9:35 PMDynamicOut.map()
accept a graph, or does DynamicOut.map()
only support ops?
Thanks in advance for your support!jamie
02/02/2023, 9:47 PMjamie
02/02/2023, 9:52 PMfrom dagster import DynamicOut, Field, graph, op
from dagster._core.definitions.events import DynamicOutput
@op
def multiply_by_two(context, y):
<http://context.log.info|context.log.info>("echo_again is returning " + str(y * 2))
return y * 2
@op(config_schema={"fail_on_first_try": Field(bool, default_value=False)})
def multiply_inputs(context, y, ten):
if context.op_config["fail_on_first_try"]:
current_run = context.instance.get_run_by_id(context.run_id)
if y == 2 and current_run.parent_run_id is None:
raise Exception()
<http://context.log.info|context.log.info>("echo is returning " + str(y * ten))
return y * ten
@op
def emit_ten():
return 10
@op
def sum_numbers(base, nums):
return base + sum(nums)
@op(out=DynamicOut())
def emit():
for i in range(3):
yield DynamicOutput(value=i, mapping_key=str(i))
@graph
def dynamic():
# pylint: disable=no-member
result = emit().map(lambda num: multiply_by_two(multiply_inputs(num, emit_ten())))
multiply_by_two.alias("double_total")(sum_numbers(emit_ten(), result.collect()))
dynamic_job = dynamic.to_job()
jamie
02/02/2023, 9:53 PMmap
passes each dynamic output as the num
param, and then there are nested ops that do a series of operationsCarlton Duffett
02/02/2023, 10:08 PMProcessPoolExecutor
to parallelize processing file parts within a single op
2. Execute the parse_file
subgraph within a single op using execute_in_process()
3. Break this into two jobs
Do any of those stand out as better or worse approaches?jamie
02/02/2023, 10:14 PMZachary Bluhm
04/11/2023, 12:00 PM