Hi, I'm attempting to use DynamicOutputs to call a...
# ask-community
y
Hi, I'm attempting to use DynamicOutputs to call an external api (which may be flaky, hence moving it to an op so that I can apply a retry logic). However, I am facing a weird issue where I get the following error:
Compute for op "gather.out_dynamically" yielded a DynamicOutput with mapping_key "key_abc" multiple times.
my code resembles the following:
Copy code
@op(out=DynamicOut())
def out_dynamically(values: List[str]) -> DynamicOutput[str]:
    memoize = set()

    for item in values:
        if item not in memoize:
            memoize.update(item)

            yield DynamicOutput(mapping_key=item, value=item) 

# downstream op calls api with the value passed in
I added the memoize step to ensure that the list contains unique values after encountering the issue first time. though in the upstream op, I have already ensured that values are unique.
s
Hi YH, That’s quite puzzling and possibly a bug. However, I just tried this:
Copy code
from typing import Iterator, List

from dagster import DynamicOut, DynamicOutput, job, op
from dagster._core.execution.context.compute import OpExecutionContext


@op
def upstream() -> List[str]:
    return ["a", "b", "a", "b"]


@op(out=DynamicOut())
def out_dynamically(context: OpExecutionContext, values: List[str]) -> Iterator[DynamicOutput[str]]:
    memoize = set()

    for item in values:
        if item not in memoize:
            memoize.update(item)
            <http://context.log.info|context.log.info>(f"Yielding {item}")
            yield DynamicOutput(mapping_key=item, value=item)


@job
def dynamic_job():
    out_dynamically(upstream())
And it worked as expected. Can you: • Confirm that the snippet I posted above executes as expected for you? • Try to provide a self-contained repro? One thing that might be going on is that you didn’t reload your code location after adding the unique-filtering code. Can you confirm you’ve done that?
y
Hi @sean, the list that is being passed in could potentially contain >50k records, hence I’m not sure if a race condition could possibly occur. It runs well and randomly breaks at different values during my tests. In the toy example the code should work properly, even with my own unit test coverage (small subset of data). I’m calling this op using an asset_graph which gets added to a definition, so there’s no need to reload the code repository.
s
I’m not sure if a race condition could possibly occur. It runs well and randomly breaks at different values during my tests.
What do you mean by “runs well”? Does this op ever execute completely successfully? Does it only fail on random executions? Or are you saying that it fails on every execution, but fails on a random item/value?
In the toy example the code should work properly, even with my own unit test coverage (small subset of data).
Are you referring to your example or my example? Did you try mine? I agree in both examples it should work properly, but the question is in which cases it does work properly.
y
The op has not executed completely on the same data set, and it fails at random points. I will investigate further within Dagster’s codebase to see where mapping_key is used.
looked through the code base but not familiar with the depths, I noticed that in
op_invocation
,
context.observe_output
is called, just wondering out loud if
context
could possibly be mutated by different threads with the same value? from my logs:
Copy code
10:57:36.036
gather_organizations
↳output_shortha…es_dynamically
STEP_OUTPUT
Yielded output "result" mapping key "westcon_comstor_australia" of type "Any". (Type check passed).
then:
Copy code
10:59:49.225
gather_organizations
↳output_shortha…es_dynamically
STEP_FAILURE
dagster._core.errors.DagsterInvariantViolationError: Compute for op "gather_organizations.output_shorthand_names_dynamically" yielded a DynamicOutput with mapping_key "westcon_comstor_australia" multiple times.
in the list we have:
Copy code
'westcon---comstor-australia', 'westcon-comstor-australia',
I checked
observe_output
where
_seen_outputs
is modified with the
mapping_key
so even with the values above, it would have been treated as unique values. hence, we would need to check what happens when we generate DynamicOutputs, whether the generator only iterates through each value once. just not sure where to look for this
s
Is the snippet you posted above your exact code? If you pass a mapping key with a hyphen it should fail:
Copy code
from dagster import op, job, DynamicOutput, DynamicOut

NAME = "westcon---comstor-australia"

@op(out=DynamicOut())
def my_op():
    for i in range(3):
        yield DynamicOutput(mapping_key=NAME, value=NAME) 

@job
def my_job():
    my_op()

my_job.execute_in_process()
The above produces an error. So to get the error you’re getting with duplicate mapping keys, you must be normalizing
'westcon---comstor-australia'
to
'westcon_comstor_australia'
somewhere, since you also have
'westcon-comstor-australia'
getting normalized to the same value this would explain why it complains about dups.