Hey, Can I create a job that includes multiple ope...
# dagster-plus
s
Hey, Can I create a job that includes multiple operations and generates multiple assets, with the job being triggered when one of the assets is materialized? I wanted to have a workflow similar to what we have in airflow, multiple tasks in order to create a single or multiple Asset, but the workflow will be similar to
Multi-Assets
. I want to keep the atomicity of the operations so I can retry a single operation instead of retrying the whole thing when using
@multi_asset
decorator for a function i.e: This graph will have as input 2 assets, and 3 assets as output.
s
I think what you're describing is a
graph
and the
AssetsDefinition.from_graph(...)
functionality as noted here. But, you might also just want to define the assets and select them directly?
Copy code
@asset
def my_triggering_asset():
   return "baz"

@asset
def my_first_asset(my_triggering_asset):
    return "foo"

@asset
def my_second_asset(my_first_asset):
    return my_first_asset + "bar"

selection =. AssetSelection.assets([my_first_asset, my_second_asset])
custom_asset_job = define_asset_job(selection=selection, name="my_multi_asset_job")

# create a sensor that checks for AssetMaterialization of `my_triggering_asset` and kicks off the custom_asset_job
s
But what if we want an asset definition to be composed by two operation for example computing + validation
and to have the abilitiy to retry/skip validation without needing to retry computing
s
graph approach
Copy code
@op
def compute_op():
   return "foo"

@op
def validate_op(compute_output: str):   
    return compute_output

@graph
def full_graph():
    output_1 = validate_op(compute_op())
    return output

my_asset = AssetsDefintion.from_graph(full_graph, asset_keys_by_output={"output": AssetKey(...)})
That's pseudocode based on memory, but that's the idea. FWIW, I have moved towards building in that sort of validation logic directly into the asset logic. I find it to be more readable, and although you don't get the dopamine hit of seeing all the individual steps dispalyed as a graph, I've found it to basically amount to the same process in practice and align more mentally with the actual job-being-done. Definitely just my loose opinion though:
Copy code
def compute_fxn():
   return "foo"

def validate_fxn(compute_output: str):   
    return compute_output

@asset
def my_asset():
    output_1 = compute_fxn()

    if some_config:
        validated=validate_fxn(output_1)

    return output
❤️ 1