Hi Everyone, can anyone with experience throw in a...
# ask-community
j
Hi Everyone, can anyone with experience throw in a couple of key concepts with regard to the following use case? We want to store SEO results from a third party API provider in Postgres. There is a table with keywords. Those keywords are sent via POST to the API and are appended to a queue at the provider side. The list of keywords can grow on our side. A request ID is returned for each request. There is a method to check for the request's status by submitting the request ID at frequent intervals. As soon as the request was processed, a status changes and a download ID is provided. Then json data can be downloaded via a third method. These results should stored into postgres and should be cached (or a database check...) so that subsequent identical keyword requests for an already requested timeframe are not requested a second time, unless the results are older than one month. Should we better use sensors or a connected pipeline of OPs where the initial op is triggered and the user functions decide whether or not to proceed to subsequent OPs? Is there any kind of caching facility in dagster or should we resort to something like requests-cache... Overall, how would you pros setup dagster for something like this? Any design ideas/ patterns would be greatly appreciated. Thx Jens
o
Hi @Jens, interesting question. My intuition is that this is a good use case for a combination of dynamic mapping and conditional branching. The dynamic mapping bit lets you copy a subgraph n times during execution (so do X thing for each keyword). That X thing would in this case be something of the form "check if we already have recent data for this keyword, if so, stop executing, if not, submit request, poll for it to complete, download data, store in postgres". A rough sketch of this setup would look like:
Copy code
from dagster import op, job, Out, DynamicOut, DynamicOutput


@op(out=DynamicOut())
def get_all_keywords():
    # yield an output for each keyword that we have -- this can change
    # between runs, so use a DynamicOut
    for kw in ["a", "b", "c"]:
        yield DynamicOutput(value=kw, mapping_key=kw)


@op
def submit_request(keyword):
    ...


@op
def poll_for_completion(request_id):
    ...


@op(out=Out(is_required=False))
def check_for_keyword(keyword):
    # query postgres...
    have_recent_data = ...
    if not have_recent_data:
        yield Output(keyword)


@graph(ins={"keyword": GraphIn()})
def update_keyword_data(keyword):

    # this output will only fire if keyword doesn't have recent data
    keyword = check_for_keyword(keyword)
    # these will not run if the above doesn't run
    request_id = submit_request(keyword)
    download_link = poll_for_completion(request_id)
    ...


@graph
def update_all_keywords():
    get_all_keywords().map(update_keyword_data)
this implementation uses the database itself as the "cache" just for simplicity (no need to maintain two sources of truth)
j
This is wonderful. Thanks for your effort. Will give it a try and feedback in this quorum.
j
Hi Owen, is the graph concept final already? Seems like an experimental feature? Will this be a good idea to build an inplementation with regard to future compatibility and breaking changes?