hey all, trying to launch a backfill and it is fa...
# ask-community
hey all, trying to launch a backfill and it is failing. If I look in the daemon logs I see
Copy code
2022-09-09 07:04:45 +0000 - dagster.daemon.BackfillDaemon - ERROR - Backfill failed for dimvhovl: dagster._core.errors.DagsterExecutionStepNotFoundError: Can not build subset plan from unknown step: preprocessed_texts.preprocessed_texts_op
not sure if related to https://github.com/dagster-io/dagster/issues/7899 i am trying to backfill a job with multiple assets, one of which is a graph backed asset. Everything was working fine until adding the graph backed asset. I am on
---- As an aside, gearing up to launch a fairly large job. I'm doing NER on 500 million texts of varying length, my setup is a partitioned job with ~20k partitions running in k8s with the k8s step launcher. Any hints/tips appreciated ^^\
Hi Oliver, would you mind sharing your code for the graph-backed asset?
Copy code
from dagster import AssetsDefinition, graph, op
import pandas as pd
from datarwe_nlp.pipelines.freetext_ner_inference.data import preprocessed_texts_op, partition
import dagster as dg

def get_batches(texts: pd.DataFrame):
    batch_size = 1000
    starts = range(0, texts.shape[0], batch_size)
    ends = range(batch_size, texts.shape[0]+batch_size, batch_size)
    for i, (start, end) in enumerate(zip(starts, ends)):
        yield dg.DynamicOutput(texts.iloc[start:end], mapping_key=str(i))

def join_batches(texts: list[pd.DataFrame]):
    return pd.concat(texts)

def preprocessed_texts(dataset):

    batches = get_batches(dataset)
    preprocessed_batches = batches.map(preprocessed_texts_op)

    return join_batches(preprocessed_batches.collect())

hyperscaled_preprocessed_texts_asset = AssetsDefinition.from_graph(
and excerpt of the
Copy code
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "7", "memory": "10Gi"},
                    "limits": {"cpu": "7", "memory": "10Gi"},
            'job_spec_config': {**COMMON_JOB_SPEC},
            'pod_spec_config': {
                "tolerations": [{
                    "key": "dagster",
                    "operator": "Exists",
                    "effect": "NoSchedule"
def preprocessed_texts_op(context, dataset: pd.DataFrame):
    Be careful, mutates dataset in place
    config = context.op_config or {}
    multiprocess = config.get('multiprocess', True)

    mapper = map
    if multiprocess:
        p = multiprocessing.Pool(multiprocessing.cpu_count())
        mapper = p.imap
    # with torch.device
    ## TODO bottleneck. these can be done in parallel
    # preprocess_tokenized = partial(preprocess_one, tokenizer=SpacyTokenizer())
    dataset['sentence'] = list(mapper(preprocess_one, dataset['text']))
    dataset = dataset.drop(columns=['text'])

    return dataset
--- • Tried with both k8s environment that uses the helm charts to deploy and a single code deploy server • tried with local docker compose that uses the same image for daemon and dagit (no code deploy in that setup)
bumping to
has resolved this