Hi team, when ingesting APIs which are partitioned...
# ask-community
m
Hi team, when ingesting APIs which are partitioned by a key/id, are dynamic partitions the recommended way of structuring Dagster's assets? For example:
Copy code
from typing import Dict, List
from dagster import DataVersion, DynamicPartitionsDefinition, MetadataValue, OpExecutionContext, Output, asset
import pandas as pd
from upp.api import ingest_api
from upp.utils import get_dict_hash, json_output

details_partition = DynamicPartitionsDefinition(name="details_partition")

@asset
def suppliers():
    return json_output(ingest_api("suppliers"))

@asset
def answers() -> Output[Dict]:
    answers: Dict = ingest_api("answers")
    return json_output(answers)

@asset
def answer_uids(context : OpExecutionContext, answers):
    answerIds = [str(answer['UID']) for answer in answers['results']]
    context.instance.add_dynamic_partitions(details_partition.name, answerIds)
    return json_output(answerIds)

@asset(partitions_def=details_partition, non_argument_deps=[answer_uids.asset_key])
def answer_details(context : OpExecutionContext, answers):
    answerId = int(context.partition_key)
    answer_results : List[Dict] = answers['results']
    answer_results = [x for x in answer_results if x['UID'] == answerId][0]
    
    result = json_output(answer_results, data_version=DataVersion(get_dict_hash(answer_results)))

    return result

@asset(partitions_def=details_partition)
def answer_group_flatened(answer_details):
    answers = answer_details.pop('Answer Groups')
    [answer_details.update(f['Answers']) for f in answers]
    
    result = pd.json_normalize(answer_details).to_dict(orient='records')[0]
    
    return Output(result, metadata={
        "preview": MetadataValue.json(result)
    })
đź‘€ 2
o
hi @Maitos! in this case, how many unique answers are you imagining encountering? if it's in the range of a few thousand, this seems fine, but if you are imagining 10s-100s of thousands, this probably would not scale well
m
Hi @owen! For this specific use case, there will only be a couple hundred. But I am evaluating Dagster to be used like this for a suite of data warehouse projects, where I am ingesting enterprise APIs with often hundreds of thousands of distinct entities. Often, api endpoints depend on the results of others. Partitioning by id allows me to give each “job” resiliency and write integrations on a row by row basis. For example, if an endpoint is paginated, I have found it useful to load each page as a separate job (which I’m representing as asset partitions in dagster) so that a transient failure doesn’t restart the entire process from scratch without any extra code from me, and also allows me to refresh specific pages. Dagster allows me to define assets and build a data lake really easily for my customers, where the files are saved to the I/o manager. It’s a very powerful aspect of this tech! Generating a hierarchical data lake structure declaratively is so awesome. How should I adopt my philosophy to best work with dagster? Am I approaching this in the wrong way? Thanks for your help.
o
Ah interesting -- another possible approach to consider would be to use Dynamic Graphs. You're still able to individually re-execute specific downstreams (so you won't have to start over from scratch if just a few of the distinct operations fail), and it won't have similar scaling issues to the dynamic partitions approach
f
@owen sorry for the bumping this old thread, but this is very similar to my use case. I've been really struggling to model it in a way that feels natural in dagster.
That said, this is our setup. We have projects and they are based on the same pipeline. For each project we capture datasets and run each of them through our pipeline, but these datasets are actually composed of smaller parts, data files we can process individually
I have asked for nested dynamic partitions, but while we don't have that, I modelled it like this: 1. Projects have a separate code location for each of them. All the assets are prefixed with a project ID to make them unique 2. Datasets are modelled as partitions of the assets in our pipeline so we can track progress and reprocess them individually if they fail 3. Data Files are not modelled yet, but we would very much like to model and track them through dagster, specially for granular reprocessing. Making them our partitions and deriving dataset state from their state is possible, but we have millions of files per project so it probably won't scale (I don't know if in terms of performance, but it certainly won't scale in Dagit)
do you think Dynamic Graphs could come into play in this scenario?
sorry for pinging you here @sandy, but I wonder if you could contribute here blob thinking
o
My high level view would be that data files would make sense to model as dynamic outputs of your graph. Essentially, (pseudocode):
Copy code
@op
def get_all_files(context):
    # do something based off of context.partition
    ...

@op
def process_file(context, file):
    ...

@op
def merge_files(context, processed_files):
    # if you don't have any specific reprocessing you need to do, this could just be a no-op
    ...

@graph_asset
def datasets():
    all_files = get_all_files()
    return merge_files(all_files.map(process_file).collect())
f
thanks a lot for the update! Looks great and it is diving me more ideas blob thinking eyes
if we had two assets that depend on each other (a -> b) and they both were graph_assets, would I be able to chain them together and start parts of b without waiting all the parts of a to finish?
looking at this in the docs: https://docs.dagster.io/concepts/assets/graph-backed-assets#advanced-subsetting-graph-backed-assets This isn't quite what I'm looking for but maybe close enough? 🤔
o
hi @Fabio Picchi -- that docs link you shared is the direction I'd recommend going in if a and b share any computation steps