Maitos
07/13/2023, 11:52 PMfrom typing import Dict, List
from dagster import DataVersion, DynamicPartitionsDefinition, MetadataValue, OpExecutionContext, Output, asset
import pandas as pd
from upp.api import ingest_api
from upp.utils import get_dict_hash, json_output
details_partition = DynamicPartitionsDefinition(name="details_partition")
@asset
def suppliers():
return json_output(ingest_api("suppliers"))
@asset
def answers() -> Output[Dict]:
answers: Dict = ingest_api("answers")
return json_output(answers)
@asset
def answer_uids(context : OpExecutionContext, answers):
answerIds = [str(answer['UID']) for answer in answers['results']]
context.instance.add_dynamic_partitions(details_partition.name, answerIds)
return json_output(answerIds)
@asset(partitions_def=details_partition, non_argument_deps=[answer_uids.asset_key])
def answer_details(context : OpExecutionContext, answers):
answerId = int(context.partition_key)
answer_results : List[Dict] = answers['results']
answer_results = [x for x in answer_results if x['UID'] == answerId][0]
result = json_output(answer_results, data_version=DataVersion(get_dict_hash(answer_results)))
return result
@asset(partitions_def=details_partition)
def answer_group_flatened(answer_details):
answers = answer_details.pop('Answer Groups')
[answer_details.update(f['Answers']) for f in answers]
result = pd.json_normalize(answer_details).to_dict(orient='records')[0]
return Output(result, metadata={
"preview": MetadataValue.json(result)
})
owen
07/14/2023, 10:49 PMMaitos
07/15/2023, 11:29 PMowen
07/18/2023, 9:16 PMFabio Picchi
08/02/2023, 5:43 PMFabio Picchi
08/02/2023, 5:45 PMFabio Picchi
08/02/2023, 5:47 PMFabio Picchi
08/02/2023, 5:47 PMFabio Picchi
08/03/2023, 9:32 AMowen
08/03/2023, 8:41 PM@op
def get_all_files(context):
# do something based off of context.partition
...
@op
def process_file(context, file):
...
@op
def merge_files(context, processed_files):
# if you don't have any specific reprocessing you need to do, this could just be a no-op
...
@graph_asset
def datasets():
all_files = get_all_files()
return merge_files(all_files.map(process_file).collect())
Fabio Picchi
08/07/2023, 5:32 PMFabio Picchi
08/07/2023, 5:33 PMFabio Picchi
08/07/2023, 5:42 PMowen
08/07/2023, 9:58 PM