Oliver
09/09/2022, 7:10 AM2022-09-09 07:04:45 +0000 - dagster.daemon.BackfillDaemon - ERROR - Backfill failed for dimvhovl: dagster._core.errors.DagsterExecutionStepNotFoundError: Can not build subset plan from unknown step: preprocessed_texts.preprocessed_texts_op
not sure if related to https://github.com/dagster-io/dagster/issues/7899 i am trying to backfill a job with multiple assets, one of which is a graph backed asset. Everything was working fine until adding the graph backed asset. I am on 1.0.4
----
As an aside, gearing up to launch a fairly large job. I'm doing NER on 500 million texts of varying length, my setup is a partitioned job with ~20k partitions running in k8s with the k8s step launcher. Any hints/tips appreciated ^^\claire
09/09/2022, 5:09 PMOliver
09/12/2022, 4:48 AMfrom dagster import AssetsDefinition, graph, op
import pandas as pd
from datarwe_nlp.pipelines.freetext_ner_inference.data import preprocessed_texts_op, partition
import dagster as dg
@dg.op(out=dg.DynamicOut())
def get_batches(texts: pd.DataFrame):
batch_size = 1000
starts = range(0, texts.shape[0], batch_size)
ends = range(batch_size, texts.shape[0]+batch_size, batch_size)
for i, (start, end) in enumerate(zip(starts, ends)):
yield dg.DynamicOutput(texts.iloc[start:end], mapping_key=str(i))
@dg.op
def join_batches(texts: list[pd.DataFrame]):
return pd.concat(texts)
@dg.graph(
)
def preprocessed_texts(dataset):
batches = get_batches(dataset)
preprocessed_batches = batches.map(preprocessed_texts_op)
return join_batches(preprocessed_batches.collect())
hyperscaled_preprocessed_texts_asset = AssetsDefinition.from_graph(
preprocessed_texts,
partitions_def=partition
)
and excerpt of the datarwe_nlp.pipelines.freetext_ner_inference.data
module
@op(
tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "7", "memory": "10Gi"},
"limits": {"cpu": "7", "memory": "10Gi"},
}
},
'job_spec_config': {**COMMON_JOB_SPEC},
'pod_spec_config': {
"tolerations": [{
"key": "dagster",
"operator": "Exists",
"effect": "NoSchedule"
}]
}
}
}
)
def preprocessed_texts_op(context, dataset: pd.DataFrame):
"""
Be careful, mutates dataset in place
"""
config = context.op_config or {}
multiprocess = config.get('multiprocess', True)
mapper = map
if multiprocess:
p = multiprocessing.Pool(multiprocessing.cpu_count())
mapper = p.imap
# with torch.device
## TODO bottleneck. these can be done in parallel
# preprocess_tokenized = partial(preprocess_one, tokenizer=SpacyTokenizer())
dataset['sentence'] = list(mapper(preprocess_one, dataset['text']))
dataset = dataset.drop(columns=['text'])
return dataset
---
• Tried with both k8s environment that uses the helm charts to deploy and a single code deploy server
• tried with local docker compose that uses the same image for daemon and dagit (no code deploy in that setup)Oliver
09/12/2022, 7:24 AM1.0.8
has resolved this