Cris
08/21/2020, 3:54 PMalex
08/21/2020, 4:32 PMyield Output
for each thing that should be computed in the current run and then any down stream solids on the skipped `Output`s would just also skipCris
08/21/2020, 4:38 PMalex
08/21/2020, 4:41 PMdef example():
@solid(
output_defs=[
OutputDefinition(name='foo', is_required=False),
OutputDefinition(name='bar', is_required=False),
OutputDefinition(name='biz', is_required=False),
],
config_schema={'what_to_execute': [str]},
)
def control_node(context):
for x in context.solid_config['what_to_execute']:
yield Output(value=something, output_name=x)
@pipeline
def pipeline():
outputs = control_node()
compute_stuff.alias('compute_foo')(outputs.foo)
compute_stuff.alias('compute_bar')(outputs.bar)
compute_stuff.alias('compute_biz')(<http://outputs.biz|outputs.biz>)
Cris
08/21/2020, 4:41 PMalex
08/21/2020, 4:42 PM@composite_solid
yet?should_execute
function on ScheduleDefinition
. Here is an example where we have a fast ticking schedule that uses should_execute to progressively do a backfill
https://github.com/dagster-io/dagster/blob/master/python_modules/dagster-test/dagster_test/toys/schedules.py#L9-L87Cris
08/21/2020, 5:16 PMoutput['foo']
DATA_SUBSETS = ['subset_1', 'subset_2', 'subset_3']
@pipeline
def heavy_models_pipeline():
# outputs a dict with the outputs specified in the order of DATA_SUBSETS elements
subsets_ouputs = data_subset_dispatcher()
subsets_dict = {subset_name: output for subset_name, output in zip(DATA_SUBSETS, subsets_ouputs)}
for subset_name in subsets_dict:
single_dataset_heavy_model.alias(subset_name)(subsets_dict[subset_name])
It seems prety hacky haha, so lemme know if you have a cleaner solution@solid(
output_defs=[OutputDefinition(name=n, is_required=False) for n in DATA_SUBSETS],
config_schema={
'data_subsets': Field([str], description='list names of subdatasets to process at runtime')
})
def data_subset_dispatcher(context):
# data_subsets can be read from DB for initialization instead of config,
# However, all the subsets in OutputDef should be fixed beforehand.
for subset_name in context.solid_config['subsets_to_process']:
yield Output(value=subset_name, output_name=subset_name)
alex
08/21/2020, 8:19 PMnamedtuple
so you should be able to call _asdict()
on it