Said Mancouri
01/25/2023, 2:05 AMMulti-Assets
.
I want to keep the atomicity of the operations so I can retry a single operation instead of retrying the whole thing when using @multi_asset
decorator for a function
i.e: This graph will have as input 2 assets, and 3 assets as output.Stephen Bailey
01/25/2023, 12:06 PMgraph
and the AssetsDefinition.from_graph(...)
functionality as noted here. But, you might also just want to define the assets and select them directly?
@asset
def my_triggering_asset():
return "baz"
@asset
def my_first_asset(my_triggering_asset):
return "foo"
@asset
def my_second_asset(my_first_asset):
return my_first_asset + "bar"
selection =. AssetSelection.assets([my_first_asset, my_second_asset])
custom_asset_job = define_asset_job(selection=selection, name="my_multi_asset_job")
# create a sensor that checks for AssetMaterialization of `my_triggering_asset` and kicks off the custom_asset_job
Said Mancouri
01/25/2023, 12:10 PMSaid Mancouri
01/25/2023, 12:11 PMStephen Bailey
01/25/2023, 12:38 PM@op
def compute_op():
return "foo"
@op
def validate_op(compute_output: str):
return compute_output
@graph
def full_graph():
output_1 = validate_op(compute_op())
return output
my_asset = AssetsDefintion.from_graph(full_graph, asset_keys_by_output={"output": AssetKey(...)})
That's pseudocode based on memory, but that's the idea. FWIW, I have moved towards building in that sort of validation logic directly into the asset logic. I find it to be more readable, and although you don't get the dopamine hit of seeing all the individual steps dispalyed as a graph, I've found it to basically amount to the same process in practice and align more mentally with the actual job-being-done. Definitely just my loose opinion though:
def compute_fxn():
return "foo"
def validate_fxn(compute_output: str):
return compute_output
@asset
def my_asset():
output_1 = compute_fxn()
if some_config:
validated=validate_fxn(output_1)
return output