YH
06/27/2023, 8:44 AMCompute for op "gather.out_dynamically" yielded a DynamicOutput with mapping_key "key_abc" multiple times.
my code resembles the following:
@op(out=DynamicOut())
def out_dynamically(values: List[str]) -> DynamicOutput[str]:
memoize = set()
for item in values:
if item not in memoize:
memoize.update(item)
yield DynamicOutput(mapping_key=item, value=item)
# downstream op calls api with the value passed in
I added the memoize step to ensure that the list contains unique values after encountering the issue first time. though in the upstream op, I have already ensured that values are unique.sean
06/27/2023, 11:44 AMfrom typing import Iterator, List
from dagster import DynamicOut, DynamicOutput, job, op
from dagster._core.execution.context.compute import OpExecutionContext
@op
def upstream() -> List[str]:
return ["a", "b", "a", "b"]
@op(out=DynamicOut())
def out_dynamically(context: OpExecutionContext, values: List[str]) -> Iterator[DynamicOutput[str]]:
memoize = set()
for item in values:
if item not in memoize:
memoize.update(item)
<http://context.log.info|context.log.info>(f"Yielding {item}")
yield DynamicOutput(mapping_key=item, value=item)
@job
def dynamic_job():
out_dynamically(upstream())
And it worked as expected. Can you:
• Confirm that the snippet I posted above executes as expected for you?
• Try to provide a self-contained repro?
One thing that might be going on is that you didn’t reload your code location after adding the unique-filtering code. Can you confirm you’ve done that?YH
06/27/2023, 12:09 PMsean
06/27/2023, 1:12 PMI’m not sure if a race condition could possibly occur. It runs well and randomly breaks at different values during my tests.What do you mean by “runs well”? Does this op ever execute completely successfully? Does it only fail on random executions? Or are you saying that it fails on every execution, but fails on a random item/value?
In the toy example the code should work properly, even with my own unit test coverage (small subset of data).Are you referring to your example or my example? Did you try mine? I agree in both examples it should work properly, but the question is in which cases it does work properly.
YH
06/27/2023, 4:16 PMYH
06/28/2023, 3:25 AMop_invocation
, context.observe_output
is called, just wondering out loud if context
could possibly be mutated by different threads with the same value?
from my logs:
10:57:36.036
gather_organizations
↳output_shortha…es_dynamically
STEP_OUTPUT
Yielded output "result" mapping key "westcon_comstor_australia" of type "Any". (Type check passed).
then:
10:59:49.225
gather_organizations
↳output_shortha…es_dynamically
STEP_FAILURE
dagster._core.errors.DagsterInvariantViolationError: Compute for op "gather_organizations.output_shorthand_names_dynamically" yielded a DynamicOutput with mapping_key "westcon_comstor_australia" multiple times.
in the list we have:
'westcon---comstor-australia', 'westcon-comstor-australia',
I checked observe_output
where _seen_outputs
is modified with the mapping_key
so even with the values above, it would have been treated as unique values.
hence, we would need to check what happens when we generate DynamicOutputs, whether the generator only iterates through each value once. just not sure where to look for thissean
06/28/2023, 11:28 AMfrom dagster import op, job, DynamicOutput, DynamicOut
NAME = "westcon---comstor-australia"
@op(out=DynamicOut())
def my_op():
for i in range(3):
yield DynamicOutput(mapping_key=NAME, value=NAME)
@job
def my_job():
my_op()
my_job.execute_in_process()
The above produces an error. So to get the error you’re getting with duplicate mapping keys, you must be normalizing 'westcon---comstor-australia'
to 'westcon_comstor_australia'
somewhere, since you also have 'westcon-comstor-australia'
getting normalized to the same value this would explain why it complains about dups.