Spencer Guy

09/23/2022, 8:08 PM
Hey Dagster team! Looking for some advice on how to structure a job, can you help? This job pulls data from multiple database shards and simply merges that data together. I'm currently accomplishing this by creating a different op for each shard. I would like to avoid repeating the same op code for each shard but also maintain the ability to fetch data concurrently from the different shards. Here's an example of my code:
    description="Get data shard1",
def get_data_shard1(context):
    query = "select * from table"
    db = context.resources.db_resource1
    df = db.fetch_data_by_query(query)
    return df

    description="Get data shard2",
def get_data_shard2(context):
    query = "select * from table"
    db = context.resources.db_resource2
    df = db.fetch_data_by_query(query)
    return df

@op(description="Merge cross shard data")
def merge_data(context, df1, df2):
    merged = pd.concat([df1, df2])
    return merged

        "db_resource1": db_resource.configured({"database": "shard1"}),
        "db_resource2": db_resource.configured({"database": "shard2"}),
def multi_shard_data_job():
    df1 = get_data_shard1()
    df2 = get_data_shard2()
    merged = merge_data(df1, df2)
Thanks for reading!
:dagster: 1


09/23/2022, 8:32 PM
hi @Spencer Guy! it sounds like this might be a good use case for dynamic graphs. You could have a single op at the start that yields an output for the name of each shard, then an op downstream of that which does the select (which just has a generic db_resource which takes as input the name of the shard to get result for the given shard), and then a final merge op at the end (which takes the result of the
statement in the above docs) to merge everything back together

Spencer Guy

09/23/2022, 11:01 PM
Okay cool I'll check out this doc and let you know if I have any questions, thanks!