Tom
12/22/2022, 3:25 PMorganisations
) has a dependency on an additional upstream asset?
The upstream asset (named organisations_stats
) may also be part of a different multi_asset definition. I see that non_argument_deps
are only supported as a decorator argument (not an AssetOut argument) and is applied to all assets, which results in a lineage showing all 4 assets as downstream of the dependency, which would be incorrect.
The docs on defining and constructing dependencies describe op
definitions, but as I'm new to Dagster and have so far been working at the level of assets, I haven't been able to see how to apply that guidance to asset
definitions.
The real-world sequence I'm trying to represent in Dagster is this:
1. A single API call is made to a data warehouse to refresh 4 materialised views represented by the outs
shown below, every 30 minutes
2. One of those views, organisations
, depends on another materialised view organisation_stats
which is refreshed once daily due to expensive computation but remains valid in that time
3. I would like to use Dagster to manage both of these refresh schedules, and to display the dependency between these two views
4. At the same time, it would be convenient to be able to define the other views (there may be several more) together using a multi_asset, or another approach, because they are all materialised as part of the same operation / API call
What do you recommend? Many thanks for your help! 😀
@multi_asset(
outs={
"opportunities": AssetOut(),
"contacts": AssetOut(),
"projects": AssetOut(),
"organisations": AssetOut(), # <- has an upstream dependency...
},
# non_argument_deps={"organisations_stats"}, # ...which is a dep of organisations only, so this doesn't work
)
def multi_asset_example(context):
# function continued...
@asset
def organisations_stats():
"""This is the additional asset upstream of organisations. It might be defined like this, or as part of a different multi_asset."""
sandy
12/27/2022, 5:41 PMinternal_asset_deps
argument. Example:
from dagster import asset, multi_asset, AssetOut, AssetKey, Nothing
@asset
def asset1():
...
@multi_asset(
outs={"asset2": AssetOut(dagster_type=Nothing), "asset3": AssetOut(dagster_type=Nothing)},
non_argument_deps={"asset1"},
internal_asset_deps={"asset2": set(), "asset3": {AssetKey("asset1")}},
)
def multi():
...
Tom
12/28/2022, 9:56 AMnon_argument_deps
, internal_asset_deps
and perhaps also the explicit dagster_type=Nothing
, those have all helped. I've now been able to achieve workflow I was after, by using these along with AssetMaterialization events in ops. Thank you!