Hi! I was wondering how would you advise in solvin...
# announcements
Hi! I was wondering how would you advise in solving the following issue: I have a pipeline that takes certain input of data (with a long sql query) and does some heavy calculations and reporting on top. This pipeline is schedule to run every day. We now have the requirement on running the same process on different subsets of the data(that may be overlapping), ideally we would like this to be dynamic, like we have a tag that marks what data corresponds to which partitions and gets redirected to the pipeline, but only for that particular subset. What mechanism could allow us to implement this clearly in dagster, one thing I would rather not do is to modify the current pipeline to put `for`s inside every solid to execute each subset of the data. as the execution is quite heavy and also because a failure in a subset would damage the state for all subsets.
So one path (until we have proper map / fan out support) is to use skippable outputs. So you would code the pipeline to support having to compute the full breadth of data, then in some controlling solid you would
yield Output
for each thing that should be computed in the current run and then any down stream solids on the skipped `Output`s would just also skip
this assumes you know the full set of possibilities at pipeline generation time
I see, this would still be a single pipeline right?
Copy code
def example():
            OutputDefinition(name='foo', is_required=False),
            OutputDefinition(name='bar', is_required=False),
            OutputDefinition(name='biz', is_required=False),
        config_schema={'what_to_execute': [str]},
    def control_node(context):
        for x in context.solid_config['what_to_execute']:
            yield Output(value=something, output_name=x)

    def pipeline():
        outputs = control_node()
And I guess the most dynamic we can get is in the solids instantiation right? Like: for subset in subsets create_solid(subset)
Do you know a good way to abstract a good chunk of the solids, this single pipeline is a mess haha
have you started using
The other dimension of cleverness you can toy with is the
function on
. Here is an example where we have a fast ticking schedule that uses should_execute to progressively do a backfill https://github.com/dagster-io/dagster/blob/master/python_modules/dagster-test/dagster_test/toys/schedules.py#L9-L87
ooooh okok
Lemme see these examples then. Thank you very much!
@alex in the example you have with the control node i though outputs should be a tuple, is there a way to access these output elements by name? like
Ended up doing something like this:
Copy code
DATA_SUBSETS = ['subset_1', 'subset_2', 'subset_3']

def heavy_models_pipeline():
    # outputs a dict with the outputs specified in the order of DATA_SUBSETS elements
    subsets_ouputs = data_subset_dispatcher()
    subsets_dict = {subset_name: output for subset_name, output in zip(DATA_SUBSETS, subsets_ouputs)}

    for subset_name in subsets_dict:
It seems prety hacky haha, so lemme know if you have a cleaner solution
This is the dispatcher
Copy code
    output_defs=[OutputDefinition(name=n, is_required=False) for n in DATA_SUBSETS],
        'data_subsets': Field([str], description='list names of subdatasets to process at runtime')
def data_subset_dispatcher(context):
    # data_subsets can be read from DB for initialization instead of config,
    # However, all the subsets in OutputDef should be fixed beforehand.

    for subset_name in context.solid_config['subsets_to_process']:
        yield Output(value=subset_name, output_name=subset_name)
its a
so you should be able to call
on it