https://dagster.io/ logo
#ask-community
Title
# ask-community
b

Bryan Wood

04/16/2022, 12:28 AM
Feel like I might be fighting against using Dagster idiomatically and would really appreciate some pros giving some advice. So say my primary job looks like the following
Copy code
@graph
def main_graph():
    document = echo_filepath()

    general_ops = [md5filehash, filetype, extract_exif, extract_text]
    general_document = collect.alias('collect_general_ops')([op(document) for op in general_ops])

    cv_ops = [yolov5_op, deepface_op]
    cv_document = collect.alias('collect_cv_ops')([op(general_document) for op in cv_ops])

    nlp_ops = [huggingface_ner, spacy_ner, text_summarization, sentiment, keyphrase_extraction]
    nlp_document = collect.alias('collect_nlp_ops')([op(general_document) for op in nlp_ops])

    final_document = collect.alias('collect_final')([general_document, cv_document, nlp_document])

    save(final_document)
and it's kicked off from a sensor like this
Copy code
@sensor(job=main_in_process_job)
def folder_watch():
    for file in Path(FOLDER_WATCH_ROOT).iterdir():
        yield RunRequest(run_key=str(file), run_config={'ops': {'echo_filepath': {'inputs': {'filepath': str(file)}}}})
Most ops look something like this
Copy code
@op
def huggingface_ner(context, document):
    results = {}
    if tika_text := document.get('tika_text'):
        try:
            tokenizer = AutoTokenizer.from_pretrained(HUGGINGFACE_NER_MODEL)
            model = AutoModelForTokenClassification.from_pretrained(HUGGINGFACE_NER_MODEL)
            nlp = pipeline('ner', model=model, tokenizer=tokenizer)
            ner_results = [{**ner_result, 'score': float(ner_result['score'])} for ner_result in nlp(tika_text)]
            for ner_result in ner_results:
                <http://context.log.info|context.log.info>(f'Named Entity Recognition (NER) results {ner_result}')
            results[HUGGINGFACE_NER_MODEL] = {'results': ner_results}
        except Exception as e:
            <http://context.log.info|context.log.info>(f'Named Entity Recognition (NER) failed to run on {document["filepath"]}')
            context.log.warn(e)
    return results
Meaning taking some previous
dict
as a param and producing a new one. The
collect
op basically just stiches them together
Copy code
@op()
def collect(context, ops_results: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Collate all the upstream ops into a single dictionary.
    """
    collected_result = {}
    for op_results in ops_results:
        collected_result.update(op_results)
    return collected_result
It's working okay but I feel like I'm fighting against the way this tool was supposed to be used in terms of how to build up something you'd push into a database after going through a bunch of ops and not executing ops if the data from previous ops would indicate you shouldn't (e.g., viz., above
huggingface_ner
should be called if the upstream text extraction op did not produce any extracted text).
plus1 1
a

Aabir Abubaker Kar

04/16/2022, 1:06 AM
High-level thoughts 1. I'm no pro but I actually really like the way you've designed things! The use of
op_name.alias()
is particularly cool to me. 2. IMO the condition you mentioned (
huggingface_ner
call depending on whether there was any input text in the first place) should be implemented at
op
-level, within
def huggingface_ner()
. You still want your graph to include those parts of the pipeline, with
op
-internal logic to skip or run things. This keeps your
job
s modular and easy to read 3. One suggestion for modularity might be to define each set of
collect.alias(x)
steps as its own subgraph. From the
dagster.graph
docs:
... allows you to build up a dependency graph by writing a function that invokes ops (or other graphs) and passes the output to subsequent invocations.
So a graph can invoke other graphs. Dagster offers deep and wide customizability on how to think about and solve each problem. It's for us community members to share knowledge and build best practices, so thanks for sharing your case. But my final evaluation criteria would just be - does it work, does it make sense to me, and does it make sense to someone else. You seem to be good on all 3!
b

Bryan Wood

04/18/2022, 7:08 PM
@Aabir Abubaker Kar thanks for the reply! On the number 2, I do agree that it needs to be at the op level. I recalled seeing the section of the documentation on conditional processing and it looked like the canonical way was to have an op return multiple values some of which are optional. Then any op downstream that depends on that as input gets skipped instead of what I did which is just bundle up everything in a dictionary which will always get returned. I think it's easy enough to split out the bits that are always returned from the bits that would be input dependencies for downstream ops but I didn't want to do that refactor until I got some feedback saying something like yep, that's the best way to do that. This will be more important when I move away from sequential in-process job running to help minimize the amount of processes / pods get spun up just to receive an input, check it, and doing nothing. On number 3, the subgraph idea is a good one: will look into it. So it does work but right now it definitely isn't fast and it won't scale well to dropping 10s or 100s of files (much less, say, millions) into the sensor folder so I'm looking speed-ups for sure. Also trying to figure out how I should be thinking about writing a pipeline in Dagster that serves something like the following purposes • fast local iteration for adding new processes into the pipeline (this can be slow / inefficient) that can be optimized to run at scale (via K8s deployment and this should be fast / efficient) • Drop files in a watched location, do stuff to them through a DAG of operations, write out results to multiple locations (e.g., some data goes to Elasticsearch, some to Postgres DB, one to disk, etc.)
j

johann

04/18/2022, 8:56 PM
What is your primary concern with scaling/speed at this point? Process spin up time? To your point about creating unnecessary processes once you’re using an executor other than in process, we offer conditional branching. You would need to add an output of the specific property you want to branch on.
fast local iteration for adding new processes into the pipeline (this can be slow / inefficient) that can be optimized to run at scale (via K8s deployment and this should be fast / efficient)
Most users have a local setup where they can run dev versions of jobs (mocked or modified resources, multiprocess or in process executor) then deploy their code somewhere like K8s where runs will be in separate K8s Jobs (and optionally each op too, if the
k8s_job_executor
is configured)
b

Bryan Wood

04/18/2022, 9:03 PM
@johann thanks for the reply Conditional branching was what I was talking about above so I think you confirmed if I can split what comes out of my ops into op Output objects for the bits that will always come out and an optional op Output for the bits that might not that is the way I can avoid executing unnecessary downstream ops, correct?
So it kind of feels like building up a JSON document the way I am where each op adds it's bit to the JSON document maybe isn't how I should be thinking about doing what I'm trying to do which is ultimately get through all the processing and have a JSON document I can push to something like Elasticsearch? Should each op be writing it's deltas to that eventual JSON document directly to Elasticsearch (say) as document updates?
And on the second bit on fast iteration locally and efficient processing in production I think you verified that I am thinking about that right. Get everything correct locally with in process executor (say) and then work through the K8s portion of the Dagster documentation for the production capability. Which is what I thought but wanted to make sure that was correct and likely to be how I would scale to production workloads.
j

johann

04/18/2022, 10:00 PM
So it kind of feels like building up a JSON document the way I am where each op adds it’s bit to the JSON document maybe isn’t how I should be thinking about doing what I’m trying to do
Dagster isn’t super opinionated about how you do the specifics of your compute. Using multiple ops makes sense when you want them to be able to observe, have them retry, etc. separately. Maybe removing the collect step makes sense if the storage overhead is causing problems. You can also always bundle things into one op and manage your own thread pool or something similar