https://dagster.io/ logo
Title
b

Bennett Norman

10/03/2022, 8:00 PM
I have a function that returns a dictionary of data frames where the key is the name of the table. The dictionary is passed to another function which passes the data frames to other functions based on the table name. How would you recommend I apply dagster concepts in this situation? DynamicOutputs doesn’t seem quite right because the outputs are handled by different functions. Specifying the dictionary as multiple outputs of an op doesn’t allow you to pass the outputs to other functions based on table names.
s

sandy

10/03/2022, 8:49 PM
hi Bennett - are the set of tables known before the run is launched?
b

Bennett Norman

10/03/2022, 8:50 PM
Yup!
s

sandy

10/03/2022, 8:52 PM
and are you using ops or software-defined assets? or open to either?
b

Bennett Norman

10/03/2022, 8:53 PM
I’ve been using mostly ops but i’m open to either.
s

sandy

10/03/2022, 10:25 PM
from dagster import op, Out, Output, job


@op(out={"table1": Out(), "table2": Out()})
def op1():
    table_dict = {"table1": ..., "table2": ...}
    for table_name, value in table_dict.items():
        yield Output(output_name=table_name, value=value)


@op
def process_table1(table1):
    ...


@job
def job1():
    op1_result = op1()
    table1 = op1_result.table1
    process_table1(table1)
how about this?
b

Bennett Norman

10/03/2022, 10:36 PM
Oh that does work thank you! I didn’t realize you could access outputs as an attribute of an op. Is it possible to then pass outputs of
process_*
ops to a single op without passing everything single output as an input?
from dagster import op, Out, Output, job


@op(out={"table1": Out(), "table2": Out()})
def op1():
    table_dict = {"table1": ..., "table2": ...}
    for table_name, value in table_dict.items():
        yield Output(output_name=table_name, value=value)


@op
def process_table1(table1):
    ...

@op
def process_table2(table2):
    ...

@op
def process_all(table1, table2):
    ...

@job
def job1():
    op1_result = op1()
    table1 = op1_result.table1
    table2 = op1_result.table2
    processed_table_1 = process_table1(table1)
    processed_table_2 = process_table1(table2)
    
    # Is there a way to avoid listing all of the inputs here? We have dozens of process_table* functions. 
    process_all(processed_table_1, processed_table_2)
s

sandy

10/04/2022, 3:06 PM
how about this?
from dagster import op, Out, Output, job


@op(out={"table1": Out(), "table2": Out()})
def op1():
    table_dict = {"table1": ..., "table2": ...}
    for table_name, value in table_dict.items():
        yield Output(output_name=table_name, value=(table_name, value))


@op
def process_table1(table1):
    table_name, df = table1
    ...


@op
def process_all_tables(all_tables):
    dfs_by_name = {name: df for name, df in all_tables}
    ...


@job
def job1():
    op1_result = op1()
    table1 = op1_result.table1
    process_table1(table1)
    process_all_tables([*op1_result])
👍 1
b

Bennett Norman

10/18/2022, 5:32 PM
Ah this is great thank you!