Hiya, I've got a question about dynamically buildi...
# ask-community
f
Hiya, I've got a question about dynamically building graphs/jobs. We're investigating using a DAG based approach to perform various calculations on time series sensor data that we're collecting from 100+ installations. These installations aren't really consistent. As the hardware can vary on an installation by installation basis. So some sensors might be available at installation A, but not at installation B. And in that case we need to calculate that value ourselves. What we have at the moment (still all outside of Dagster) is a clear list of all calculations we need (let's call them ops/tasks), what sensors each calculation requires, and what values they produce. We're also able to dynamically connect all of these ops based on an installation's hardware specifications. This produces a (anonymised) DAG as seen in the attached example image. Where yellow is a specific sensor that's either read from, or written to, a database, orange is the actual op/task, and blue is the data sent between tasks. Note that for a different installation, entire sections of this graph might disappear, and other nodes might be added. At the moment I'm struggling with how to do something similar in Dagster. As I'm not really planning on making a job method per installation. I've been looking through the documentation, but most of the references to "dynamic" I find are largely about DynamicOut, and scaling a specific op. But maybe I've simply been searching for the wrong terms, so I'll keep looking. In the meantime, is there maybe someone who could maybe point me in the right direction?
dagster bot responded by community 1
Hm, just found the GraphDefinition and the GraphDSL sections. I'll also give those a closer look. Guess the right term was "programmatically".
s
the logic that comes to mind for me is: 1. for each installation, you create an object with attributes (dynamicout) 2. you pass this object into a subgraph. that subgraph contains all the "if else" logic to evaluate any sensor. 3. it returns some output it would look like this (pseudocode0
Copy code
@op(out=DynamicOut)
def create_sensor_objects():
    sensors = get_sensor_objects()
    for s in sensors:
          yield DynamicOutput(s)

@op(outs={"plan_a": Out(required=False), "plan_b": Out(required=False), "plan_c": Out(required=False))
def determine_sensor_plan(sensor_object: SensorObject):
    if sensor_object.is_cool:
          yield Output("plan_a)
    elif sensor_boject.is_uncool:
          yield Output("plan_b")
    else:
          yield Output("plan_c")

@graph
def generate_sensor_output(sensor_object):
     plan_a, plan_b, plan_c = determine_sensor_plan(sensor_object)

     # if plan_a is returned, do plan_a things
     output = calculate_plan_a_output(sensor_object, plan_a)

     # if plan_b is returned, do plan_b things
     output = calculate_plan_b_output(sensor_object, plan_b)

     # if plan_c is returned, do plan_c things
     output = calculate_plan_c_output(sensor_object, plan_c)

     return output

@job
def do_dynamic_things():
     sensors = create_sensor_objects()
     results = sensors.map(generate_sensor_output)
f
Thanks Stephen! I'll read up on that approach 👍
s
totally. a lot going on there!