https://dagster.io/ logo
f

Fran Sanchez

07/02/2020, 9:57 PM
I'm experimenting with the optional inputs/outputs of the solids but I got to something that I don't understand, in this example:
Copy code
@solid(output_defs=[OutputDefinition(name="num", dagster_type=int, is_required=False)])
def optional_input_output(_, num: Optional[int] = None):
    if num:
        yield Output(num, "num")


@solid
def takes_optional(_, num: Optional[int] = 333) -> int:
    return num*2


@pipeline
def optional_pipeline():
    takes_optional(optional_input_output())
If I don't provide any input to to
optional_input_output
then
takes_optional
is always skipped, regardless of the input marked as Optional and having a default value... how can I achieve this then?
a

alex

07/02/2020, 10:01 PM
We need to document this better, but optional outputs perform the behavior you encountered where their omission causes all downstream dependencies to skip.
the input type being
Optional
just means it accepts
None
or the type, in this case
int
f

Fran Sanchez

07/02/2020, 10:06 PM
So, my best chance is to make sure that every solid returns its outputs, even if its with a None value, right?
Also, it seems that the behaviour of using
is_required
in the
OutputDefinition
is different to the behaviour of declaring it
Optional
with the typing system...
Which I found non-intuitive initially
Do you know if there is any other way to achieve something like this?
Basically, I build my pipeline programatically. I have one particular solid that needs all the outputs from the rest of the solid, but some of these solids might be skipped or might not produce all their outputs, but I still want to run this fan-in solid regardless of that
a

alex

07/02/2020, 10:36 PM
Also, it seems that the behaviour of using is_required in the OutputDefinition is different to the behaviour of declaring it Optional with the typing system
Ya it used to be worse - the flag used to be called
is_optional
- we renamed it to
is_required
to try to help with this confusion.
Did you find that the fan-in style dependency demonstrates the same behavior? I believe that the skipping should not progress through
MultiDependency
s
👍 1
f

Fran Sanchez

07/02/2020, 10:40 PM
Oh, no I haven't seen this. I will have a look
a

alex

07/02/2020, 10:41 PM
let us know - that i think we can fix in the near term if it doesn’t work
f

Fran Sanchez

07/02/2020, 10:47 PM
So I can basically replace
DependencyDefinition
by
MultiDependencyDefinition
when building my
PipelineDefinition
dependencies argument, right?
I think it doesn't work
👍 1
Well, this is using the functional API, not the
MultiDependencyDefinition
one, but I guess it should be the same?
Copy code
@solid(output_defs=[OutputDefinition(name="num", dagster_type=int, is_required=False)])
def optional_input_output(_, num: Optional[int] = None):
    if num:
        yield Output(num, "num")


@solid(input_defs=[InputDefinition("nums", List[int])])
def takes_optional(_, nums) -> int:
    return sum(nums)



@pipeline
def optional_pipeline():
    takes_optional(
        [
            optional_input_output.alias("does_not_return_anything")(),
            optional_input_output.alias("returns_something")()
        ]
    )
a

alex

07/06/2020, 2:56 PM
alright I think i should be able to get a fix out for the release this week
2 Views