hey all, trying to launch a backfill and it is fa...
# ask-community
o
hey all, trying to launch a backfill and it is failing. If I look in the daemon logs I see
Copy code
2022-09-09 07:04:45 +0000 - dagster.daemon.BackfillDaemon - ERROR - Backfill failed for dimvhovl: dagster._core.errors.DagsterExecutionStepNotFoundError: Can not build subset plan from unknown step: preprocessed_texts.preprocessed_texts_op
not sure if related to https://github.com/dagster-io/dagster/issues/7899 i am trying to backfill a job with multiple assets, one of which is a graph backed asset. Everything was working fine until adding the graph backed asset. I am on
1.0.4
---- As an aside, gearing up to launch a fairly large job. I'm doing NER on 500 million texts of varying length, my setup is a partitioned job with ~20k partitions running in k8s with the k8s step launcher. Any hints/tips appreciated ^^\
c
Hi Oliver, would you mind sharing your code for the graph-backed asset?
o
sure
Copy code
from dagster import AssetsDefinition, graph, op
import pandas as pd
from datarwe_nlp.pipelines.freetext_ner_inference.data import preprocessed_texts_op, partition
import dagster as dg


@dg.op(out=dg.DynamicOut())
def get_batches(texts: pd.DataFrame):
    batch_size = 1000
    starts = range(0, texts.shape[0], batch_size)
    ends = range(batch_size, texts.shape[0]+batch_size, batch_size)
    
    for i, (start, end) in enumerate(zip(starts, ends)):
        yield dg.DynamicOutput(texts.iloc[start:end], mapping_key=str(i))

@dg.op
def join_batches(texts: list[pd.DataFrame]):
    return pd.concat(texts)

@dg.graph(
)
def preprocessed_texts(dataset):

    batches = get_batches(dataset)
    preprocessed_batches = batches.map(preprocessed_texts_op)

    return join_batches(preprocessed_batches.collect())


hyperscaled_preprocessed_texts_asset = AssetsDefinition.from_graph(
    preprocessed_texts,
    partitions_def=partition
)
and excerpt of the
datarwe_nlp.pipelines.freetext_ner_inference.data
module
Copy code
@op(
    tags={
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "7", "memory": "10Gi"},
                    "limits": {"cpu": "7", "memory": "10Gi"},
                }
            },
            'job_spec_config': {**COMMON_JOB_SPEC},
            'pod_spec_config': {
                "tolerations": [{
                    "key": "dagster",
                    "operator": "Exists",
                    "effect": "NoSchedule"
                }]
            }
        }
    }
)
def preprocessed_texts_op(context, dataset: pd.DataFrame):
    """
    Be careful, mutates dataset in place
    
    """
    config = context.op_config or {}
    multiprocess = config.get('multiprocess', True)

    mapper = map
    if multiprocess:
        p = multiprocessing.Pool(multiprocessing.cpu_count())
        mapper = p.imap
    # with torch.device
    ## TODO bottleneck. these can be done in parallel
    # preprocess_tokenized = partial(preprocess_one, tokenizer=SpacyTokenizer())
    dataset['sentence'] = list(mapper(preprocess_one, dataset['text']))
    dataset = dataset.drop(columns=['text'])

    return dataset
--- • Tried with both k8s environment that uses the helm charts to deploy and a single code deploy server • tried with local docker compose that uses the same image for daemon and dagit (no code deploy in that setup)
bumping to
1.0.8
has resolved this