Jorge Lima
10/31/2022, 10:22 AMsean
10/31/2022, 7:53 PMJorge Lima
10/31/2022, 8:08 PMSkipping step fetch_..._request due to skipped dependencies: ['fetch_..._other_op'].
We expected that the op in green, as it’s a dependency on the one in grey to be skipped, but it wasn’t.
The op in grey returns as a DynamicOutput as follows:
@timed_op(
ins={"until_date": In(datetime)},
out={"result": DynamicOut(Tuple[str, str])},
)
def build_report_request(_context: dagster.OpExecutionContext, until_date: datetime):
for dt in rrule(MONTHLY, dtstart=date(2018, 1, 1), until=until_date):
period = dt.strftime("%b %Y")
for sub in Subsidiaries:
key = re.sub(r"[. ]+", "_", f"{period}_{sub}")
yield DynamicOutput((period, sub), mapping_key=key)
@timed_op(
out={"result": DynamicOut(Optional[str])},
)
def fetch_reports(
context: dagster.OpExecutionContext,
report_requests: List[Tuple[str, str]],
):
report_requests = build_report_request(
until_date=process_time,
).collect()
reports = fetch_reports(report_requests).collect()
sean
10/31/2022, 8:20 PMJorge Lima
10/31/2022, 8:21 PM1.0.14
thanks, appreciate that!sean
11/01/2022, 3:57 PMgamma
should be skipped but isn’t):
from dagster import DynamicOut, DynamicOutput, Out, Output, repository, op, job
@op(out={"foo": Out(is_required=False), "bar": Out(is_required=False)})
def alpha():
if True:
yield Output(5, "foo")
else:
yield Output(5, "bar")
@op(out=DynamicOut())
def beta(bar):
for i in range(bar):
yield DynamicOutput(i, mapping_key=str(i))
@op
def gamma(context, values):
context.log(values)
@job
def my_job():
_foo, bar = alpha()
dyn_results = beta(bar)
gamma(dyn_results.collect())
@repository
def repo():
return [my_job]
Creating an issue and we will aim to fix ASAP.
@Dagster Bot issue [bug] ops downstream of skipped dynamic outputs are not skippedDagster Bot
11/01/2022, 3:57 PM