Hi, I am getting the following error while using D...
# announcements
a
Hi, I am getting the following error while using DynamicOutput
dagster.core.errors.DagsterSubprocessError: dagster.check.CheckError: Member of list mismatches type. Expected <class 'dagster.core.execution.plan.inputs.StepInput'>. Got UnresolvedStepInput(name='region', dagster_type_key='String', source=FromPendingDynamicStepOutput(step_output_handle=StepOutputHandle(step_key='get_regions', output_name='result', mapping_key=None), solid_handle=SolidHandle(name='execute_per_region', parent=None), input_name='region')) of type <class 'dagster.core.execution.plan.inputs.UnresolvedStepInput'>.
Copy code
from dagster import solid, pipeline, composite_solid
from dagster import String, InputDefinition
from dagster.experimental import DynamicOutputDefinition, DynamicOutput

vehicle_config = {
    "JPN": ['DYZ', 'DYR', 'NTE'],
    "USA": ['ROG', 'ALT', 'MAX']
}

@solid(
    output_defs=[DynamicOutputDefinition(String)]
)
def get_regions(_):
    for region in vehicle_config:
        yield DynamicOutput(value=region, mapping_key=f"region_{region}")

@solid(
    input_defs=[InputDefinition("region", String)],
    output_defs=[DynamicOutputDefinition(String)]
)
def get_vehicles_in_the_region(_, region):
    vehicles = vehicle_config[region]
    for vehicle in vehicles:
        yield DynamicOutput(value=vehicle, mapping_key=f"vehicle_{vehicle}")

@solid(
    input_defs=[InputDefinition("region", String), InputDefinition("vehicle", String)]
)
def execute_complex_process(context, region, vehicle):
    <http://context.log.info|context.log.info>(f"Executing complex process for {region} {vehicle}")
    pass

@composite_solid(
    input_defs=[InputDefinition("region", String)]
)
def execute_per_region(region):
    get_vehicles_in_the_region(region).map(lambda x: execute_complex_process(region, x))

@pipeline
def ml_pipeline():
    get_regions().map(execute_per_region)
the process is to get available regions, and vehicles for that particular region and run individual task for region & vehicle combined
complex process can take anywhere between 5 mins to 30 mins, I want to see the status at region+vehicle level. If I can't use DynamicOutput, can anyone explain how to solve this kind of process in dagster
New to dagster*
any updates on this?
s
Hi @Avinash Varma Kanumuri - I believe we currently only support one "level" of dynamic DAGs. I.e. a solid that takes a dynamic output as input can't have it's own dynamic output. This may change in the future. However, in the meantime, would it make sense to have a single solid with a dynamic output that emits a set of (vehicle, region) pairs?
a
If these configurations are known before computation starts, you can also just create the initial pipeline dynamically
Copy code
from dagster import solid, pipeline, composite_solid
from dagster import String, InputDefinition
from dagster.experimental import DynamicOutputDefinition, DynamicOutput
vehicle_config = {
    "JPN": ['DYZ', 'DYR', 'NTE'],
    "USA": ['ROG', 'ALT', 'MAX']
}


@solid(
    config_schema={'region': str, 'vehicle': str}
)
def execute_complex_process(context, region, vehicle):
    <http://context.log.info|context.log.info>(f"Executing complex process for {region} {vehicle}")
    pass

@pipeline
def ml_pipeline():
    for region, vehicles in vehicle_config.values():
        for vehicle in vehicles:
            # create pre-configured solid
            complex = execute_complex_process.configured(
                {'vehicle': vehicle, 'region': region}, 
                name=f"complex_{region}_{vehicle}"
            )
            
            # add it to the pipeline
            complex()
a
Hi @sandy, yeah that would work just checking out on how I can use
DynamicOutput
feature. And also on the sidenote are there any docs or one liner kind of thing explaining which component can go into what component. For ex: solids do not trigger other solids. What ever runs in the pipeline has to be returned by a solid. This can be handy. Great library by the way. loving it.