https://dagster.io/ logo
Title
k

Kevin Schaper

11/29/2021, 10:24 PM
Are there any examples of a configuration driven fixed fan-in? What I’d like to do is analogous to taking the fixed fan in example (https://docs.dagster.io/concepts/ops-jobs-graphs/jobs-graphs#fixed-fan-in) but rather than having return_one() be fixed as returning 1, have it sum a set of values that come in via configuration. Is there a middle ground between hardcoding each op and the full dynamic map & collect example?
c

chris

11/30/2021, 1:32 AM
I could be misunderstanding your use case, but could you not just provide different configuration values to each invocation of the op? As in, if I have the following op;
@op(config_schema=int)
def my_op(context):
    return context.op_config
and the following job:
@job
def my_job():
    fan_outs = []
    for i in range(10):
        fan_outs.append(my_op())
    sum_fan_in(fan_outs)
Then I can configure each op like so:
ops:
  my_op:
    config: 1
  my_op_2:
    config: 2
  ...
🙏 1
k

Kevin Schaper

12/06/2021, 11:53 PM
@chris Can this be taken another step so that the the number of values to be summed isn’t hardcoded in the job code? or, maybe a better way to ask - is it possible to define the pipeline so that I can pass either
ops:
  my_op:
    config: 1
  my_op_2:
    config: 2
or
ops:
  my_op:
    config: 1
  my_op_2:
    config: 2
  my_op_3:
    config: 3
c

chris

12/07/2021, 12:07 AM
It's tricky but possible. I think that instead of calling the same op a bunch of times, you want to have one dynamic op that calls a function an arbitrary number of times based on some config value. In order to provide the additional config that you need to determine the values for each call, you'll need some arbitrary list. Within the body of the function you can verify that the list is the length you expect.
from dagster import DynamicOut

@op(config={"num_iters": int, "values": list}, out=DynamicOut())
def dynamic_outputs(context):
    num_iters = context.op_config["num_iters"]
    values = context.op_config["values"]
    assert num_iters == len(values)
    
    for val in values:
        yield DynamicOutput(val)

@job
def my_job():
    fan_outs = dynamic_outputs()
    sum_fan_in(fan_outs)
Something like this should do the trick
k

Kevin Schaper

12/07/2021, 12:17 AM
ok, that boundary between fixed and dynamic was what I was trying to figure out - I thought maybe dynamic was only required if the fan-out is based on something that isn’t known until the job is already running — or maybe a better way for me to think about that is the configuration counts as something that isn’t known until the job is running.
c

chris

12/07/2021, 12:26 AM
that's exactly right - configuration isn't something known by the job until it's running. The actual configuration is evaluated at runtime, because there could be default values, config mapping, etc
since in this case, config is changing the actual shape of the dag, we need to use dynamicism