Oliver
08/25/2022, 4:48 AMn
rows. The source asset will provide a pandas array with m
rows where m>>n
. What is the best way to apporoach this?
I can probably use a graph backed asset with some fancy dynamic mappings but i was thinking this might be a fairly common usecase so there might be an easier solution im missing
thankschris
08/25/2022, 7:18 PMOliver
08/25/2022, 9:43 PMchris
08/25/2022, 9:46 PMOliver
08/25/2022, 9:54 PM@asset(
io_manager_key='inferences_io'
)
def inference(model: FlairNerModel, dataset: list[object]):
inference = model.predict_many(preprocess)
return inference
@op(out=DynamicOut())
def batch(context, dataset):
batch_size = context.op_config['batch_size']
n_batches = int(ceil(len(dataset)/batch_size))
get_batch = lambda x: dataset.iloc[x*batch_size:(x+1)*batch_size]
batches = map(get_batch, range(n_batches))
indexed_batches = zip(batches, range(n_batches))
wrap_batches = lambda data, idx: DynamicOutput(data, mapping_key=str(idx))
outputs = starmap(wrap_batches, indexed_batches)
yield from outputs
@op
def collect(results):
return pd.concat(results, ignore_index=True)
@graph
def batch_inference(model, dataset):
batch_size = 50
batcher = batch.configured({'batch_size': batch_size}, f'batch_{batch_size}')
batches = batcher(dataset)
inferenced = batches.map(lambda x: inference(model, x))
return collect(inferenced.collect())
chris
08/25/2022, 10:05 PMop
(you can't interleave assets inside of graphs), and then wrap the entire graph in a call to AssetsDefinition.from_graph
Oliver
08/25/2022, 10:14 PMWhat do you need the collect step for here exactly? Is it so that you can write everything to the athena table at once?I thought its required that a graph returns a single
Output
, not a list of DynamicOutput
As in, in inferences_io, just set a particular table that you're always writing to? Not sure how athena works, but if that's an option, could save you from needing the collect step at allathena is just presto so essentially writing some parquets to s3 so yea definetly would work
Also, now noticing, this isn't exactly how I was envisioning the graph-backed asset would work. I think that you would want to structure this s.t. inference is anah I was thinking this was pretty clever 😅 I envision data scientists developing the initial set of assets and then data engineer comes along and uses existing assets as ops in more complex/scalable pipelines(you can't interleave assets inside of graphs), and then wrap the entire graph in a call toop
AssetsDefinition.from_graph
chris
08/25/2022, 10:19 PMI thought its required that a graph returns a singleThe graph in this case wouldn't return anything - the io manager for each dynamic mapped step would handle just writing to athena, not a list ofOutput
DynamicOutput
inference
as an op, and then wrap it in an AssetsDefinition
to expose asset information about it for usage elsewhereOliver
08/25/2022, 10:25 PMchris
08/25/2022, 10:34 PMOliver
08/29/2022, 6:54 AMInvariant failed. Description: All leaf nodes within graph 'batched_pipeline' must generate outputs which are mapped to outputs of the graph, and produce assets. The following leaf node(s) are non-asset producing ops: {'postprocess'}. This behavior is not currently supported because these ops are not required for the creation of the associated asset(s).
postprocess looks like this (not actually saving anything yet)
@op(
# io_manager_key='inferences_io'
)
def postprocess(context, model, inference):
inference['entities'] = inference['sentence'].map(model.postprocess_one)
results = inference[['time', 'patientid', 'entities']]
normalised_results = results.explode('entities')
from dagster import AssetMaterialization, AssetKey
context.log_event(
AssetMaterialization(
asset_key=AssetKey('hahaha'),
description="Persisted result to storage.",
)
)
chris
08/29/2022, 7:54 PMOliver
08/29/2022, 11:04 PM@graph()
def batched_pipeline(model, dataset):
batch_size = 50
batcher = batch.configured({'batch_size': batch_size}, f'batch_{batch_size}')
dataset_batches = batcher(dataset)
preprocessed = dataset_batches.map(lambda x: preprocess(model, x))
inferences = preprocessed.map(lambda x: inference(model, x))
postprocessed = inferences.map(lambda x: postprocess(model, x))
batch_inference_asset = AssetsDefinition.from_graph(
batched_pipeline,
)
sorry here is the graph definition, so post process is part of the graph@op
def collect(results): return None