This message was deleted.
# ask-community
s
This message was deleted.
u
Copy code
from dagster import graph, op, OpExecutionContext, DynamicOutput, DynamicOut


@op
def load_parquet_from_s3(context: OpExecutionContext):
    return 1


@op(out=DynamicOut())
def get_company_s3_keys():
    keys = ['data/key1.pq', 'data/key2.pq', 'data/key3.pq']
    for key in keys:
        yield DynamicOutput(key, mapping_key=key)


@op()
def transfer_to_elastic(data, index):
    pass


@op
def process_company_parquet_to_elastic(key: str):
    transfer_to_elastic(load_parquet_from_s3(key), 'companies')


@graph()
def sync_company():
    keys = get_company_s3_keys()
    results = keys.map(process_company_parquet_to_elastic)


sync_company_job = sync_company.to_job()