Avinash Varma Kanumuri
02/07/2021, 5:27 PMdagster.core.errors.DagsterSubprocessError: dagster.check.CheckError: Member of list mismatches type. Expected <class 'dagster.core.execution.plan.inputs.StepInput'>. Got UnresolvedStepInput(name='region', dagster_type_key='String', source=FromPendingDynamicStepOutput(step_output_handle=StepOutputHandle(step_key='get_regions', output_name='result', mapping_key=None), solid_handle=SolidHandle(name='execute_per_region', parent=None), input_name='region')) of type <class 'dagster.core.execution.plan.inputs.UnresolvedStepInput'>.
from dagster import solid, pipeline, composite_solid
from dagster import String, InputDefinition
from dagster.experimental import DynamicOutputDefinition, DynamicOutput
vehicle_config = {
"JPN": ['DYZ', 'DYR', 'NTE'],
"USA": ['ROG', 'ALT', 'MAX']
}
@solid(
output_defs=[DynamicOutputDefinition(String)]
)
def get_regions(_):
for region in vehicle_config:
yield DynamicOutput(value=region, mapping_key=f"region_{region}")
@solid(
input_defs=[InputDefinition("region", String)],
output_defs=[DynamicOutputDefinition(String)]
)
def get_vehicles_in_the_region(_, region):
vehicles = vehicle_config[region]
for vehicle in vehicles:
yield DynamicOutput(value=vehicle, mapping_key=f"vehicle_{vehicle}")
@solid(
input_defs=[InputDefinition("region", String), InputDefinition("vehicle", String)]
)
def execute_complex_process(context, region, vehicle):
<http://context.log.info|context.log.info>(f"Executing complex process for {region} {vehicle}")
pass
@composite_solid(
input_defs=[InputDefinition("region", String)]
)
def execute_per_region(region):
get_vehicles_in_the_region(region).map(lambda x: execute_complex_process(region, x))
@pipeline
def ml_pipeline():
get_regions().map(execute_per_region)
sandy
02/08/2021, 4:36 PMalex
02/08/2021, 4:49 PMfrom dagster import solid, pipeline, composite_solid
from dagster import String, InputDefinition
from dagster.experimental import DynamicOutputDefinition, DynamicOutput
vehicle_config = {
"JPN": ['DYZ', 'DYR', 'NTE'],
"USA": ['ROG', 'ALT', 'MAX']
}
@solid(
config_schema={'region': str, 'vehicle': str}
)
def execute_complex_process(context, region, vehicle):
<http://context.log.info|context.log.info>(f"Executing complex process for {region} {vehicle}")
pass
@pipeline
def ml_pipeline():
for region, vehicles in vehicle_config.values():
for vehicle in vehicles:
# create pre-configured solid
complex = execute_complex_process.configured(
{'vehicle': vehicle, 'region': region},
name=f"complex_{region}_{vehicle}"
)
# add it to the pipeline
complex()
Avinash Varma Kanumuri
02/08/2021, 5:09 PMDynamicOutput
feature. And also on the sidenote are there any docs or one liner kind of thing explaining which component can go into what component. For ex: solids do not trigger other solids. What ever runs in the pipeline has to be returned by a solid. This can be handy. Great library by the way. loving it.