Spencer Guy
09/23/2022, 8:08 PM@op(
description="Get data shard1",
required_resource_keys={"db_resource1"}
)
def get_data_shard1(context):
query = "select * from table"
db = context.resources.db_resource1
df = db.fetch_data_by_query(query)
return df
@op(
description="Get data shard2",
required_resource_keys={"db_resource2"}
)
def get_data_shard2(context):
query = "select * from table"
db = context.resources.db_resource2
df = db.fetch_data_by_query(query)
return df
@op(description="Merge cross shard data")
def merge_data(context, df1, df2):
merged = pd.concat([df1, df2])
return merged
@job(
resource_defs={
"db_resource1": db_resource.configured({"database": "shard1"}),
"db_resource2": db_resource.configured({"database": "shard2"}),
}
)
def multi_shard_data_job():
df1 = get_data_shard1()
df2 = get_data_shard2()
merged = merge_data(df1, df2)
Thanks for reading!owen
09/23/2022, 8:32 PM.collect()
statement in the above docs) to merge everything back togetherSpencer Guy
09/23/2022, 11:01 PM