Chris Evans
04/19/2022, 5:02 PMdagster.check.CheckError: Invariant failed.
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/core/execution/plan/inputs.py", line 607, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/core/storage/fs_io_manager.py", line 152, in load_input
context.add_input_metadata({"path": MetadataValue.path(os.path.abspath(filepath))})
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/core/execution/context/input.py", line 325, in add_input_metadata
if self.asset_key:
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/core/execution/context/input.py", line 216, in asset_key
check.invariant(len(matching_input_defs) == 1)
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/check/__init__.py", line 1167, in invariant
raise CheckError("Invariant failed.")
johann
04/19/2022, 5:25 PMowen
04/19/2022, 5:37 PMChris Evans
04/19/2022, 5:49 PM@graph
def execute_x_di(
chunk: Any,
) -> bool:
apis = Factory.build(start_after=chunk)
Manager.execute(apis, chunk)
@graph(
description="",
)
def x_graph():
chunks = Manager.orchestrate()
chunks.map(execute_x_di)
Chris Evans
04/19/2022, 7:45 PMdagster.core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "chunk" of step "execute_x_di.build[0]":
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 232, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 306, in core_dagster_event_sequence_for_step
for event_or_input_value in ensure_gen(step_input.source.load_input_object(step_context)):
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/core/execution/plan/inputs.py", line 295, in load_input_object
yield from _load_input_with_input_manager(input_manager, load_input_context)
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/core/execution/plan/inputs.py", line 607, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/Users/chrisevans/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/chrisevans/Repositories/data-platform/dags/bi/.venv/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 73, in solid_execution_error_boundary
raise error_cls(
owen
04/19/2022, 9:04 PMdef get_dynamic_j():
@op(out=DynamicOut())
def things():
for i in range(3):
yield DynamicOutput(str(i), str(i))
@op
def do(thing):
return thing
@graph
def dos(thing):
x = do(thing)
do(x)
@job
def j():
ts = things()
ts.map(dos)
return j
def test_dynamic_with_file_manager():
from dagster.core.test_utils import instance_for_test
from dagster import reconstructable, execute_pipeline
with instance_for_test() as instance:
result = execute_pipeline(
reconstructable(get_dynamic_j),
instance=instance,
)
assert result.success
I'm thinking whatever's going on might have to do with this Factory.build() thing. Does this construct an op, or is build just a property that returns some static op definition?owen
04/19/2022, 9:05 PMowen
04/19/2022, 9:06 PMowen
04/19/2022, 9:06 PMChris Evans
04/19/2022, 10:10 PMowen
04/19/2022, 10:26 PMowen
04/19/2022, 10:28 PMChris Evans
04/20/2022, 12:50 PMFactory.build(start_after=chunk)
to Factory.build()
in the inner graph. start_after
is an ins=In(Nothing)
kwarg. This resulted in the Factory.build running top level and only the Manager.execute
op running dynamically. The Manage.execute
op failed w/ the same error.owen
04/20/2022, 10:04 PMowen
04/20/2022, 10:04 PMChris Evans
04/20/2022, 10:10 PM