Rohil Badkundri
01/05/2023, 6:00 PMmy_asset = AssetsDefinition.from_graph(my_graph, ...)
# alternatively
another_asset = AssetDefinition.from_op(my_op, ...)
Allowing you to re-use my_graph
and my_op
across different assetsMatthew Cloney
01/05/2023, 6:37 PMBaseIntegration
, the pattern I have can be expressed as:
@op
def five_step(obj: BaseIntegration) -> BaseIntegration:
download_from_source(obj)
validate_source_data(obj)
sync_to_object_store(obj)
add_partition(obj)
remove_local_dir(obj)
each of those functions is also an @op
. I can do this:
op_asset1 = AssetsDefinition.from_op(op_def=five_step, keys_by_input_name=dict(obj=AssetKey("first_item")))
but if I try to do this, it complains that Asset key AssetKey(['fiver']) is defined multiple times
op_asset2 = AssetsDefinition.from_op(op_def=five_step, keys_by_input_name=dict(obj=AssetKey("second_item")))
How would I best reuse that five_step
function?key_prefix
in from_op()
may be the way?Rohil Badkundri
01/05/2023, 8:06 PMfive_step
op? Is it returning obj
? If you want to return multiple things you could something analogous to:
@graph(out = { "output1" : GraphOut(), ... "output_5" : GraphOut()}
def five_step(obj: BaseIntegration) -> BaseIntegration:
...
return {"output1": ..., ..., "output5": ...}
fiver_assets = AssetsDefinition.from_graph(five_step, keys_by_output = {...})
In general you want to connect/nest ops within a graphMatthew Cloney
01/05/2023, 8:14 PMRohil Badkundri
01/05/2023, 8:17 PMMatthew Cloney
01/05/2023, 8:22 PM