I can’t display the relationship between `process_...
# ask-community
u
I can’t display the relationship between
process_company_parquet_to_elastic
and its sub-ops in the lineage. Is there something wrong with my code?
Copy code
from dagster import graph, op, OpExecutionContext, DynamicOutput, DynamicOut


@op
def load_parquet_from_s3(context: OpExecutionContext):
    return 1


@op(out=DynamicOut())
def get_company_s3_keys():
    keys = ['data/key1.pq', 'data/key2.pq', 'data/key3.pq']
    for key in keys:
        yield DynamicOutput(key, mapping_key=key)


@op()
def transfer_to_elastic(data, index):
    pass


@op
def process_company_parquet_to_elastic(key: str):
    transfer_to_elastic(load_parquet_from_s3(key), 'companies')


@graph()
def sync_company():
    keys = get_company_s3_keys()
    results = keys.map(process_company_parquet_to_elastic)


sync_company_job = sync_company.to_job()
dagster bot responded by community 1
j
you should remove process_company_parquet_to_elastic and do:
Copy code
@graph()
def sync_company():
    keys = get_company_s3_keys()
    results = keys.map(transfer_to_elastic(load_parquet_from_s3(key), 'companies'))
op's can't have other ops inside
u
@Jakub Zgrzebnicki “I made some modifications and resolved the issue. Thank you.
Copy code
import os
from dagster import Config, graph, op, OpExecutionContext, DynamicOutput, DynamicOut


@op
def load_parquet_from_s3(context: OpExecutionContext, key: str):
    return key


@op(out=DynamicOut())
def get_company_s3_keys():
    keys = ['data/key1.pq', 'data/key2.pq', 'data/key3.pq']
    for index, value in enumerate(keys):
        yield DynamicOutput(value, mapping_key=os.path.splitext(os.path.basename(value))[0])


class TransferToElasticConfig(Config):
    index: str


@op()
def transfer_to_elastic(data, config: TransferToElasticConfig):
    pass


@graph()
def sync_company():
    keys = get_company_s3_keys()
    results = keys.map(lambda key: transfer_to_elastic(load_parquet_from_s3(key)))


sync_company_job = sync_company.to_job(
    config={"ops": {"transfer_to_elastic": {"config": {"index": "companies"}}}}
)