Frank
07/05/2022, 1:52 PMFrank
07/05/2022, 2:09 PMStephen Bailey
07/05/2022, 2:54 PM@op(out=DynamicOut)
def create_sensor_objects():
sensors = get_sensor_objects()
for s in sensors:
yield DynamicOutput(s)
@op(outs={"plan_a": Out(required=False), "plan_b": Out(required=False), "plan_c": Out(required=False))
def determine_sensor_plan(sensor_object: SensorObject):
if sensor_object.is_cool:
yield Output("plan_a)
elif sensor_boject.is_uncool:
yield Output("plan_b")
else:
yield Output("plan_c")
@graph
def generate_sensor_output(sensor_object):
plan_a, plan_b, plan_c = determine_sensor_plan(sensor_object)
# if plan_a is returned, do plan_a things
output = calculate_plan_a_output(sensor_object, plan_a)
# if plan_b is returned, do plan_b things
output = calculate_plan_b_output(sensor_object, plan_b)
# if plan_c is returned, do plan_c things
output = calculate_plan_c_output(sensor_object, plan_c)
return output
@job
def do_dynamic_things():
sensors = create_sensor_objects()
results = sensors.map(generate_sensor_output)
Frank
07/05/2022, 3:38 PMStephen Bailey
07/05/2022, 4:07 PM