https://dagster.io/ logo
#dagster-support
Title
# dagster-support
o

Oliver

08/25/2022, 4:48 AM
hey all 🙂 I have a compute heavy asset that needs to be partitioned such that each partition has
n
rows. The source asset will provide a pandas array with
m
rows where
m>>n
. What is the best way to apporoach this? I can probably use a graph backed asset with some fancy dynamic mappings but i was thinking this might be a fairly common usecase so there might be an easier solution im missing thanks
c

chris

08/25/2022, 7:18 PM
when you say it needs to be partitioned, are you thinking of it as storing/processing each partition of data under a different partition key?
o

Oliver

08/25/2022, 9:43 PM
sorry partition was probably the wrong word. batching maybe better? all the results need to be dumped in a single athena table
c

chris

08/25/2022, 9:46 PM
I see - I think that dynamic mappings actually is exactly the right solution there - one big graph-backed asset that represents the athena table, and internally, the dynamic mappings handle batching the work
o

Oliver

08/25/2022, 9:54 PM
ok cool, I did go an implement that 🙂 Is there a way to push the batches through the assets io_manager individually? currently having a rejoin step that `pd.concat`s all my dataframes together, not sure how well that will work on the full dataset fwiw current code
Copy code
@asset(
    io_manager_key='inferences_io'
)
def inference(model: FlairNerModel, dataset: list[object]):
        
    inference = model.predict_many(preprocess)

    return inference

@op(out=DynamicOut())
def batch(context, dataset):

    batch_size = context.op_config['batch_size']

    n_batches = int(ceil(len(dataset)/batch_size))
    get_batch = lambda x: dataset.iloc[x*batch_size:(x+1)*batch_size]
    batches = map(get_batch, range(n_batches))
    indexed_batches = zip(batches, range(n_batches))

    wrap_batches = lambda data, idx: DynamicOutput(data, mapping_key=str(idx))
    outputs = starmap(wrap_batches, indexed_batches)


    yield from outputs

@op
def collect(results):
    return pd.concat(results, ignore_index=True)


@graph
def batch_inference(model, dataset):

    batch_size = 50
    batcher = batch.configured({'batch_size': batch_size}, f'batch_{batch_size}')

    batches = batcher(dataset)
    inferenced = batches.map(lambda x: inference(model, x))
    
    return collect(inferenced.collect())
c

chris

08/25/2022, 10:05 PM
What do you need the collect step for here exactly? Is it so that you can write everything to the athena table at once? I'm wondering if you can just set an io manager s.t. you write that batch's data immediately to athena, without having to have that expensive collect step
As in, in inferences_io, just set a particular table that you're always writing to? Not sure how athena works, but if that's an option, could save you from needing the collect step at all
Also, now noticing, this isn't exactly how I was envisioning the graph-backed asset would work. I think that you would want to structure this s.t. inference is an
op
(you can't interleave assets inside of graphs), and then wrap the entire graph in a call to
AssetsDefinition.from_graph
o

Oliver

08/25/2022, 10:14 PM
What do you need the collect step for here exactly? Is it so that you can write everything to the athena table at once?
I thought its required that a graph returns a single
Output
, not a list of
DynamicOutput
As in, in inferences_io, just set a particular table that you're always writing to? Not sure how athena works, but if that's an option, could save you from needing the collect step at all
athena is just presto so essentially writing some parquets to s3 so yea definetly would work
Also, now noticing, this isn't exactly how I was envisioning the graph-backed asset would work. I think that you would want to structure this s.t. inference is an
op
(you can't interleave assets inside of graphs), and then wrap the entire graph in a call to
AssetsDefinition.from_graph
ah I was thinking this was pretty clever 😅 I envision data scientists developing the initial set of assets and then data engineer comes along and uses existing assets as ops in more complex/scalable pipelines
c

chris

08/25/2022, 10:19 PM
You can potentially get away with using the asset as an op but it's generally considered bad practice yea. Potentially a different way we can get to your last point there though
I thought its required that a graph returns a single
Output
, not a list of
DynamicOutput
The graph in this case wouldn't return anything - the io manager for each dynamic mapped step would handle just writing to athena
What you could do is write
inference
as an op, and then wrap it in an
AssetsDefinition
to expose asset information about it for usage elsewhere
o

Oliver

08/25/2022, 10:25 PM
ah cool, just overthinking it 🙂
big fan of the asset api though and would rather the data scientists not have to know about AssetsDefintion, a lot of them don't have deep code background and I think it's a bit more intimidating
c

chris

08/25/2022, 10:34 PM
I see - well feel free to start a new thread around best practices for organizing around your use case, can see if anything comes to mind. For this particular example, I think graph-backed is def the way to go
🌈 1
o

Oliver

08/29/2022, 6:54 AM
I tried returning nothing from the graph and I'm hitting this error
Copy code
Invariant failed. Description: All leaf nodes within graph 'batched_pipeline' must generate outputs which are mapped to outputs of the graph, and produce assets. The following leaf node(s) are non-asset producing ops: {'postprocess'}. This behavior is not currently supported because these ops are not required for the creation of the associated asset(s).
postprocess looks like this (not actually saving anything yet)
Copy code
@op(
    # io_manager_key='inferences_io'
)
def postprocess(context, model, inference):
    
    inference['entities'] = inference['sentence'].map(model.postprocess_one)
    
    results = inference[['time', 'patientid', 'entities']]
    normalised_results = results.explode('entities')

    from dagster import AssetMaterialization, AssetKey
    context.log_event(
        AssetMaterialization(
            asset_key=AssetKey('hahaha'),
            description="Persisted result to storage.",
        )
    )
c

chris

08/29/2022, 7:54 PM
what's the context in which you're using the graph? Is post-process an op within the graph, or is it consuming the graph in some way?
o

Oliver

08/29/2022, 11:04 PM
Copy code
@graph()
def batched_pipeline(model, dataset):

    batch_size = 50
    batcher = batch.configured({'batch_size': batch_size}, f'batch_{batch_size}')

    dataset_batches = batcher(dataset)

    preprocessed = dataset_batches.map(lambda x: preprocess(model, x))
    inferences = preprocessed.map(lambda x: inference(model, x))
    postprocessed = inferences.map(lambda x: postprocess(model, x))

batch_inference_asset = AssetsDefinition.from_graph(
    batched_pipeline,
)
sorry here is the graph definition, so post process is part of the graph
ah nvm got it by adding a null collector
Copy code
@op
def collect(results): return None
👍 1