https://dagster.io/ logo
Title
j

Jorge Lima

10/31/2022, 10:22 AM
Hey all!! Was dependency management changed in recent versions of Dagster? We’re facing a weird issue with dynamic outputs and dependents on the outcome of that op. If the dynamic output op is skipped, we expected that the rest would be skipped too, but this is not the case (and was in past versions). Any help/insight would be appreciated. Thanks!
:dagster-bot-resolve-to-issue: 1
s

sean

10/31/2022, 7:53 PM
Hi Jorge, would you mind sharing some code (maybe a job def)? When you say “skipped”, what exactly do you mean?
j

Jorge Lima

10/31/2022, 8:08 PM
Hi Sean, thanks for the reply. The op in grey in the image above was skipped because its dependencies (ops) were also skipped.
Skipping step fetch_..._request due to skipped dependencies: ['fetch_..._other_op'].
We expected that the op in green, as it’s a dependency on the one in grey to be skipped, but it wasn’t. The op in grey returns as a DynamicOutput as follows:
@timed_op(
    ins={"until_date": In(datetime)},
    out={"result": DynamicOut(Tuple[str, str])},
)
def build_report_request(_context: dagster.OpExecutionContext, until_date: datetime):
    for dt in rrule(MONTHLY, dtstart=date(2018, 1, 1), until=until_date):
        period = dt.strftime("%b %Y")
        for sub in Subsidiaries:
            key = re.sub(r"[. ]+", "_", f"{period}_{sub}")
            yield DynamicOutput((period, sub), mapping_key=key)
And the definition for the green op, which shouldn’t have run:
@timed_op(
    out={"result": DynamicOut(Optional[str])},
)
def fetch_reports(
    context: dagster.OpExecutionContext,
    report_requests: List[Tuple[str, str]],
):
Partial graph setup:
report_requests = build_report_request(
    until_date=process_time,
).collect()
reports = fetch_reports(report_requests).collect()
s

sean

10/31/2022, 8:20 PM
I see, I’m looking into this-- what version of dagster are you on btw?
j

Jorge Lima

10/31/2022, 8:21 PM
1.0.14
thanks, appreciate that!
s

sean

11/01/2022, 3:57 PM
I was able to repro with this (
gamma
should be skipped but isn’t):
from dagster import DynamicOut, DynamicOutput, Out, Output, repository, op, job


@op(out={"foo": Out(is_required=False), "bar": Out(is_required=False)})
def alpha():
    if True:
        yield Output(5, "foo")
    else:
        yield Output(5, "bar")


@op(out=DynamicOut())
def beta(bar):
    for i in range(bar):
        yield DynamicOutput(i, mapping_key=str(i))

@op
def gamma(context, values):
    context.log(values)
    

@job
def my_job():
    _foo, bar = alpha()
    dyn_results = beta(bar)
    gamma(dyn_results.collect())

@repository
def repo():
    return [my_job]
Creating an issue and we will aim to fix ASAP. @Dagster Bot issue [bug] ops downstream of skipped dynamic outputs are not skipped
d

Dagster Bot

11/01/2022, 3:57 PM