Bryan Wood
04/16/2022, 12:28 AM@graph
def main_graph():
document = echo_filepath()
general_ops = [md5filehash, filetype, extract_exif, extract_text]
general_document = collect.alias('collect_general_ops')([op(document) for op in general_ops])
cv_ops = [yolov5_op, deepface_op]
cv_document = collect.alias('collect_cv_ops')([op(general_document) for op in cv_ops])
nlp_ops = [huggingface_ner, spacy_ner, text_summarization, sentiment, keyphrase_extraction]
nlp_document = collect.alias('collect_nlp_ops')([op(general_document) for op in nlp_ops])
final_document = collect.alias('collect_final')([general_document, cv_document, nlp_document])
save(final_document)
and it's kicked off from a sensor like this
@sensor(job=main_in_process_job)
def folder_watch():
for file in Path(FOLDER_WATCH_ROOT).iterdir():
yield RunRequest(run_key=str(file), run_config={'ops': {'echo_filepath': {'inputs': {'filepath': str(file)}}}})
Most ops look something like this
@op
def huggingface_ner(context, document):
results = {}
if tika_text := document.get('tika_text'):
try:
tokenizer = AutoTokenizer.from_pretrained(HUGGINGFACE_NER_MODEL)
model = AutoModelForTokenClassification.from_pretrained(HUGGINGFACE_NER_MODEL)
nlp = pipeline('ner', model=model, tokenizer=tokenizer)
ner_results = [{**ner_result, 'score': float(ner_result['score'])} for ner_result in nlp(tika_text)]
for ner_result in ner_results:
<http://context.log.info|context.log.info>(f'Named Entity Recognition (NER) results {ner_result}')
results[HUGGINGFACE_NER_MODEL] = {'results': ner_results}
except Exception as e:
<http://context.log.info|context.log.info>(f'Named Entity Recognition (NER) failed to run on {document["filepath"]}')
context.log.warn(e)
return results
Meaning taking some previous dict
as a param and producing a new one. The collect
op basically just stiches them together
@op()
def collect(context, ops_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Collate all the upstream ops into a single dictionary.
"""
collected_result = {}
for op_results in ops_results:
collected_result.update(op_results)
return collected_result
It's working okay but I feel like I'm fighting against the way this tool was supposed to be used in terms of how to build up something you'd push into a database after going through a bunch of ops and not executing ops if the data from previous ops would indicate you shouldn't (e.g., viz., above huggingface_ner
should be called if the upstream text extraction op did not produce any extracted text).Aabir Abubaker Kar
04/16/2022, 1:06 AMop_name.alias()
is particularly cool to me.
2. IMO the condition you mentioned (huggingface_ner
call depending on whether there was any input text in the first place) should be implemented at op
-level, within def huggingface_ner()
. You still want your graph to include those parts of the pipeline, with op
-internal logic to skip or run things. This keeps your job
s modular and easy to read
3. One suggestion for modularity might be to define each set of collect.alias(x)
steps as its own subgraph. From the dagster.graph
docs: ... allows you to build up a dependency graph by writing a function that invokes ops (or other graphs) and passes the output to subsequent invocations.
So a graph can invoke other graphs.
Dagster offers deep and wide customizability on how to think about and solve each problem. It's for us community members to share knowledge and build best practices, so thanks for sharing your case.
But my final evaluation criteria would just be - does it work, does it make sense to me, and does it make sense to someone else. You seem to be good on all 3!Bryan Wood
04/18/2022, 7:08 PMjohann
04/18/2022, 8:56 PMfast local iteration for adding new processes into the pipeline (this can be slow / inefficient) that can be optimized to run at scale (via K8s deployment and this should be fast / efficient)Most users have a local setup where they can run dev versions of jobs (mocked or modified resources, multiprocess or in process executor) then deploy their code somewhere like K8s where runs will be in separate K8s Jobs (and optionally each op too, if the
k8s_job_executor
is configured)Bryan Wood
04/18/2022, 9:03 PMjohann
04/18/2022, 10:00 PMSo it kind of feels like building up a JSON document the way I am where each op adds it’s bit to the JSON document maybe isn’t how I should be thinking about doing what I’m trying to doDagster isn’t super opinionated about how you do the specifics of your compute. Using multiple ops makes sense when you want them to be able to observe, have them retry, etc. separately. Maybe removing the collect step makes sense if the storage overhead is causing problems. You can also always bundle things into one op and manage your own thread pool or something similar