王昊
05/15/2023, 10:49 AMprocess_company_parquet_to_elastic
and its sub-ops in the lineage. Is there something wrong with my code?
from dagster import graph, op, OpExecutionContext, DynamicOutput, DynamicOut
@op
def load_parquet_from_s3(context: OpExecutionContext):
return 1
@op(out=DynamicOut())
def get_company_s3_keys():
keys = ['data/key1.pq', 'data/key2.pq', 'data/key3.pq']
for key in keys:
yield DynamicOutput(key, mapping_key=key)
@op()
def transfer_to_elastic(data, index):
pass
@op
def process_company_parquet_to_elastic(key: str):
transfer_to_elastic(load_parquet_from_s3(key), 'companies')
@graph()
def sync_company():
keys = get_company_s3_keys()
results = keys.map(process_company_parquet_to_elastic)
sync_company_job = sync_company.to_job()
Jakub Zgrzebnicki
05/15/2023, 10:57 AM@graph()
def sync_company():
keys = get_company_s3_keys()
results = keys.map(transfer_to_elastic(load_parquet_from_s3(key), 'companies'))
op's can't have other ops insideJakub Zgrzebnicki
05/15/2023, 10:58 AM王昊
05/15/2023, 11:27 AMimport os
from dagster import Config, graph, op, OpExecutionContext, DynamicOutput, DynamicOut
@op
def load_parquet_from_s3(context: OpExecutionContext, key: str):
return key
@op(out=DynamicOut())
def get_company_s3_keys():
keys = ['data/key1.pq', 'data/key2.pq', 'data/key3.pq']
for index, value in enumerate(keys):
yield DynamicOutput(value, mapping_key=os.path.splitext(os.path.basename(value))[0])
class TransferToElasticConfig(Config):
index: str
@op()
def transfer_to_elastic(data, config: TransferToElasticConfig):
pass
@graph()
def sync_company():
keys = get_company_s3_keys()
results = keys.map(lambda key: transfer_to_elastic(load_parquet_from_s3(key)))
sync_company_job = sync_company.to_job(
config={"ops": {"transfer_to_elastic": {"config": {"index": "companies"}}}}
)