Bennett Norman
10/03/2022, 8:00 PMsandy
10/03/2022, 8:49 PMBennett Norman
10/03/2022, 8:50 PMsandy
10/03/2022, 8:52 PMBennett Norman
10/03/2022, 8:53 PMsandy
10/03/2022, 10:25 PMfrom dagster import op, Out, Output, job
@op(out={"table1": Out(), "table2": Out()})
def op1():
table_dict = {"table1": ..., "table2": ...}
for table_name, value in table_dict.items():
yield Output(output_name=table_name, value=value)
@op
def process_table1(table1):
...
@job
def job1():
op1_result = op1()
table1 = op1_result.table1
process_table1(table1)
how about this?Bennett Norman
10/03/2022, 10:36 PMprocess_*
ops to a single op without passing everything single output as an input?
from dagster import op, Out, Output, job
@op(out={"table1": Out(), "table2": Out()})
def op1():
table_dict = {"table1": ..., "table2": ...}
for table_name, value in table_dict.items():
yield Output(output_name=table_name, value=value)
@op
def process_table1(table1):
...
@op
def process_table2(table2):
...
@op
def process_all(table1, table2):
...
@job
def job1():
op1_result = op1()
table1 = op1_result.table1
table2 = op1_result.table2
processed_table_1 = process_table1(table1)
processed_table_2 = process_table1(table2)
# Is there a way to avoid listing all of the inputs here? We have dozens of process_table* functions.
process_all(processed_table_1, processed_table_2)
sandy
10/04/2022, 3:06 PMfrom dagster import op, Out, Output, job
@op(out={"table1": Out(), "table2": Out()})
def op1():
table_dict = {"table1": ..., "table2": ...}
for table_name, value in table_dict.items():
yield Output(output_name=table_name, value=(table_name, value))
@op
def process_table1(table1):
table_name, df = table1
...
@op
def process_all_tables(all_tables):
dfs_by_name = {name: df for name, df in all_tables}
...
@job
def job1():
op1_result = op1()
table1 = op1_result.table1
process_table1(table1)
process_all_tables([*op1_result])
Bennett Norman
10/18/2022, 5:32 PM