hey folks - I want to use declarative scheduling e...
# ask-community
r
hey folks - I want to use declarative scheduling everywhere. coming from Airflow land, it's a very refreshing concept. it seems to be working for most use cases, but there's one that I'm not sure how to solve. would love some assistance I have 3 assets in a graph: A : refreshes every hour (freshness 60m) B : refreshes when A updates (auto-materialize: eager) C : pulls data from an API every 15 minutes, but also depends on asset B to filter some data (understanding that it's slower moving data) how do I set the freshness and auto-materialize policies for C? I think I need a freshness of 15m and I don't set auto-materialize. but according to the docs, this would force update asset B every 15 minutes which isn't desired. am I missing something? diagram in thread 👇 for convenience. thanks!
image.png
v
According to my very limited understanding (so far), you should be able to implement it using the following pattern: 1. Define custom schedule on
C
running every 15 minutes. 2. Attach schedule function checking materialization status of asset
B
using Dagster instance. 3. If asset
B
is "stale" (was materialized longer than XXX minutes ago), yield
SkipReason
from schedule. Otherwise, yield
RunRequest
. An example of how to fetch last mat. event: https://github.com/dagster-io/dagster/discussions/14311
I suspect,
AssetSensor
will not work, since it uses "cursor" internally and can only "consume" materialization event once.
And .. I suspect, Auto-Mat. policies are not able to describe and process this use case yet.
r
@Vitalij thanks for taking a look. I agree ... I don't think any declarative scheduling concepts can represent this use case. would be great to know for sure. I think if there was a configuration available, like "do not re-materialize based on downstream asset conditions" then this would work. a control like that seems pretty useful, since certain assets can be very expensive to run and they have no control over who sets them as dependencies. in general it feels weird that a given asset needs to be synchronized with both upstream and downstream state. my intuition says that assets should only materialize relative to upstream conditions (information and execution state flow in one direction only)
s
Hey Rob - this isn't a super well-supported use case right now, though we'll chat internally about how to support it better. A couple ideas: • This isn't well-documented because we're still stabilizing these, but you can assign B an eager AutoMaterializePolicy that won't be triggered by downstream freshness:
AutoMaterializePolicy(on_missing=True, on_new_parent_data=True, for_freshness=False, time_window_partition_scope_minutes=datetime.timedelta.resolution.total_seconds() / 60)
. A might still execute though? I'm not 100% sure. • You could add an observable source asset upstream of C and have a schedule observe it every 15 minutes and give C an eager auto-materialize policy.
r
@sandy thanks a lot! I'll test this out today or tomorrow and report back. in general I think it's unnatural to have behavior where a downstream asset state can affect upstream assets. in execution graphs / DAGs it's natural to think of execution state flowing down, not bidirectional, although I might be missing something
s
The idea behind it flowing backwards is probably best expressed by Benn Stancil in this blog post: https://benn.substack.com/p/down-with-the-dag Ie in theory life can be simpler and scheduling can be more optimal if you express when you need data by, instead of when to execute things. That said, there are also difficulties inherent in this scheduling model, as you’ve pointed out, and we certainly don’t want to force every dagster user to use it
r
@sandy I appreciate the discussion. bidirectional data flow may more optimal, I'm not 100% convinced but for sure it's more difficult to reason about. it's simple for the state of an object to be owned by the object itself, not any N number of downstream objects that rely on it (that it may not even be aware of). this spreads state and ownership all over the dependency graph and makes it hard to understand (e.g., why did this upstream object rematerialize?) either way, I'm glad Dagster can support both scheduling models
@sandy is the behavior you described available through
dagster-dbt
? I only see this in
dagster_dbit/asset_utils.py
Copy code
def default_auto_materialize_policy_fn(
    node_info: Mapping[str, Any]
) -> Optional[AutoMaterializePolicy]:
    auto_materialize_policy = node_info["config"].get("dagster_auto_materialize_policy", {})
    if auto_materialize_policy.get("type") == "eager":
        return AutoMaterializePolicy.eager()
    elif auto_materialize_policy.get("type") == "lazy":
        return AutoMaterializePolicy.lazy()
    return None
I think if the mapping from DBT accepted those custom arguments, we could just create a custom
AutoMaterializePolicy
in that method on line 91. happy to submit a PR to add this. chatting with my team it's behavior that we would use broadly, since we don't want expensive assets to get triggered incidentally. ideally they own their own materialization logic
s
load_assets_from_dbt_manifest
accepts a
node_info_to_auto_materialize_policy_fn
that allows you to basically provide a custom implementation of that function we also welcome a PR, but that might move a little slower as we iron out the exact names we want to use, etc.
❤️ 1
r
@sandy ok great - I will give that a shot. it can be a good workaround for now