https://dagster.io/ logo
c

Cris

08/21/2020, 3:54 PM
Hi! I was wondering how would you advise in solving the following issue: I have a pipeline that takes certain input of data (with a long sql query) and does some heavy calculations and reporting on top. This pipeline is schedule to run every day. We now have the requirement on running the same process on different subsets of the data(that may be overlapping), ideally we would like this to be dynamic, like we have a tag that marks what data corresponds to which partitions and gets redirected to the pipeline, but only for that particular subset. What mechanism could allow us to implement this clearly in dagster, one thing I would rather not do is to modify the current pipeline to put `for`s inside every solid to execute each subset of the data. as the execution is quite heavy and also because a failure in a subset would damage the state for all subsets.
a

alex

08/21/2020, 4:32 PM
So one path (until we have proper map / fan out support) is to use skippable outputs. So you would code the pipeline to support having to compute the full breadth of data, then in some controlling solid you would
yield Output
for each thing that should be computed in the current run and then any down stream solids on the skipped `Output`s would just also skip
this assumes you know the full set of possibilities at pipeline generation time
c

Cris

08/21/2020, 4:38 PM
I see, this would still be a single pipeline right?
a

alex

08/21/2020, 4:41 PM
Copy code
def example():
    @solid(
        output_defs=[
            OutputDefinition(name='foo', is_required=False),
            OutputDefinition(name='bar', is_required=False),
            OutputDefinition(name='biz', is_required=False),
        ],
        config_schema={'what_to_execute': [str]},
    )
    def control_node(context):
        for x in context.solid_config['what_to_execute']:
            yield Output(value=something, output_name=x)

    @pipeline
    def pipeline():
        outputs = control_node()
        compute_stuff.alias('compute_foo')(outputs.foo)
        compute_stuff.alias('compute_bar')(outputs.bar)
        compute_stuff.alias('compute_biz')(<http://outputs.biz|outputs.biz>)
c

Cris

08/21/2020, 4:41 PM
And I guess the most dynamic we can get is in the solids instantiation right? Like: for subset in subsets create_solid(subset)
Do you know a good way to abstract a good chunk of the solids, this single pipeline is a mess haha
a

alex

08/21/2020, 4:42 PM
have you started using
@composite_solid
yet?
The other dimension of cleverness you can toy with is the
should_execute
function on
ScheduleDefinition
. Here is an example where we have a fast ticking schedule that uses should_execute to progressively do a backfill https://github.com/dagster-io/dagster/blob/master/python_modules/dagster-test/dagster_test/toys/schedules.py#L9-L87
c

Cris

08/21/2020, 5:16 PM
ooooh okok
Lemme see these examples then. Thank you very much!
@alex in the example you have with the control node i though outputs should be a tuple, is there a way to access these output elements by name? like
output['foo']
Ended up doing something like this:
Copy code
DATA_SUBSETS = ['subset_1', 'subset_2', 'subset_3']

@pipeline
def heavy_models_pipeline():
    # outputs a dict with the outputs specified in the order of DATA_SUBSETS elements
    subsets_ouputs = data_subset_dispatcher()
    subsets_dict = {subset_name: output for subset_name, output in zip(DATA_SUBSETS, subsets_ouputs)}

    for subset_name in subsets_dict:
        single_dataset_heavy_model.alias(subset_name)(subsets_dict[subset_name])
It seems prety hacky haha, so lemme know if you have a cleaner solution
This is the dispatcher
Copy code
@solid(
    output_defs=[OutputDefinition(name=n, is_required=False) for n in DATA_SUBSETS],
    config_schema={
        'data_subsets': Field([str], description='list names of subdatasets to process at runtime')
    })
def data_subset_dispatcher(context):
    # data_subsets can be read from DB for initialization instead of config,
    # However, all the subsets in OutputDef should be fixed beforehand.

    for subset_name in context.solid_config['subsets_to_process']:
        yield Output(value=subset_name, output_name=subset_name)
a

alex

08/21/2020, 8:19 PM
its a
namedtuple
so you should be able to call
_asdict()
on it
2 Views