Hey folks :wave:, I’ve implemented the following D...
# ask-community
c
Hey folks 👋, I’ve implemented the following DAG (see screenshot below). The assets should have the following materialization behaviour: •
asset1
is to be updated on a cron schedule (every minute) •
asset2
should lazily incorporate upstream data no later than 2 minutes ago I’m having issues with the auto-materialization behaviour with
asset2
. In the screenshot below,
asset2
does not auto-materialize itself even though the dashboard indicates that there are newer upstream data and
asset2
is overdue. My suspicion is that because the materialization policy is
lazy
, observations on the asset are not being triggered to evaluate whether it needs to be materialized or not. Any idea on how I can get
asset2
to materialize automatically? Thanks a lot!
Here is the code for reproduction:
Copy code
from dagster import (
    AssetSelection,
    AutoMaterializePolicy,
    DefaultScheduleStatus,
    Definitions,
    FreshnessPolicy,
    ScheduleDefinition,
    asset,
    define_asset_job,
)


@asset
def asset1():
    ...


@asset(
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=2),
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def asset2(asset1):
    ...


refresh_job = define_asset_job("refresh_asset1", AssetSelection.keys("asset1"))

refresh_schedule = ScheduleDefinition(
    job=refresh_job,
    cron_schedule="*/1 * * * *",
    default_status=DefaultScheduleStatus.RUNNING,
)

defs = Definitions(assets=[asset1, asset2], schedules=[refresh_schedule])
I’m currently using
dagster==1.3.13
c
Hey Christopher - checking with folks on the team about this who are more familiar with auto-materialization
j
Just to double check, have you enabled auto materializing?
Nevermind, I’m able to reproduce 🙂 Seems like a bug while the downstream is missing. Once I materialize it once, it continues to materialize. Is that the case for you?
c
I forgot to enable auto-materializing on the “Daemon” tab 😅 Once that’s enabled,
asset2
auto-materializes as expected! It might be worthwhile adding that condition to the list of conditions under an asset’s “Auto-materialize policy” as a reminder. That, and/or, expose an API to auto-enable auto-materialization from the get-go 🙂
As I’m building up the toy problem towards my original use case,
asset2
no longer auto-materializes when I add
asset3
which materializes in an ad hoc manner — even though
asset2
is 2 minutes stale relative to
asset1
. Attaching the code once again for reproducibility.
Copy code
from dagster import (
    AssetSelection,
    AutoMaterializePolicy,
    DefaultScheduleStatus,
    Definitions,
    FreshnessPolicy,
    ScheduleDefinition,
    asset,
    define_asset_job,
)


@asset(description="Refreshes every minute")
def asset1():
    ...


@asset(
    description="Should be no more than 2 minutes stale. Refreshes lazily",
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=2),
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def asset2(asset1, asset3):
    ...


@asset(description="Materialized ad hoc or by external processes")
def asset3():
    ...


refresh_job = define_asset_job("refresh_asset1", AssetSelection.keys("asset1"))

refresh_schedule = ScheduleDefinition(
    job=refresh_job,
    cron_schedule="*/1 * * * *",
    default_status=DefaultScheduleStatus.RUNNING,
)

defs = Definitions(assets=[asset1, asset2, asset3], schedules=[refresh_schedule])
I assume this is a bug, or am I misinterpreting the auto-materialization behaviour?
o
Hi @Christopher Tee ! For the first behavior you were experiencing, we've located the source of that issue and will have a fix out (it impacts lazy assets which are missing and whose parents do not have automaterialize policies). For this issue though, this is somewhat expected behavior. By default, assets with no dependencies are assumed to be reading from some always-updating source, and so if asset3 hasn't been executed in (e.g.) 5 minutes, then it's assumed to be 5 minutes out of date. This in turn means that if asset2 is executed right now, even if it's getting the most recent available version of asset3 as input, that data will still be 5 minutes out of date, and therefore not satisfy the freshness policy, so it will not be executed. This default assumption isn't always accurate though, and you can change this behavior by making asset3 downstream of an observable source asset. This would let dagster know if asset3 is as up to date as possible (has been materialized after the newest version of that observable source). An updated version of your code would be:
Copy code
import datetime

from dagster import (
    AssetSelection,
    AutoMaterializePolicy,
    DefaultScheduleStatus,
    Definitions,
    FreshnessPolicy,
    ScheduleDefinition,
    asset,
    define_asset_job,
    observable_source_asset,
)
from dagster._core.definitions.data_version import DataVersion


@asset(description="Refreshes every minute")
def asset1():
    ...


@asset(
    description="Should be no more than 2 minutes stale. Refreshes lazily",
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=2),
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def asset2(asset1, asset3):
    ...


@observable_source_asset
def my_observable_source():
    # this version will always be different
    return DataVersion(str(datetime.datetime.now()))


@asset(
    description="Materialized ad hoc or by external processes",
    non_argument_deps={my_observable_source.key},
)
def asset3():
    ...


refresh_job = define_asset_job("refresh_asset1", AssetSelection.keys("asset1"))

refresh_schedule = ScheduleDefinition(
    job=refresh_job,
    cron_schedule="*/1 * * * *",
    default_status=DefaultScheduleStatus.RUNNING,
)

defs = Definitions(
    assets=[asset1, asset2, asset3, my_observable_source], schedules=[refresh_schedule]
)
c
Glad that we’ve quashed a 🐛 there! 🙌 I’ve tried out the modified code and it fits with my use case! Thanks so much for that. So fundamentally, all assets assume that their ancestors also refresh periodically, otherwise it assumes a non-updating ancestor to be out-of-date (relative to
now
). By attaching that ancestor to an
observable_source_asset
, Dagster knows that it (
asset3
in this case), is not 2 minutes stale (relative to the observable source asset), allowing
asset2
to auto-materialize.
Also, I noticed that
asset2
won’t re-materialize automatically after refreshing/observing the
observable_source_asset
. To get
asset2
to continue to auto-materialize,
asset3
needs to have a lazy
auto_materialization_policy
. I’m attaching the final code here for others & posterity
Copy code
import datetime

from dagster import (
    AssetSelection,
    AutoMaterializePolicy,
    DefaultScheduleStatus,
    Definitions,
    FreshnessPolicy,
    ScheduleDefinition,
    asset,
    define_asset_job,
    observable_source_asset,
)
from dagster._core.definitions.data_version import DataVersion


@asset(description="Refreshes every minute")
def asset1():
    ...


@asset(
    description="Should be no more than 2 minutes stale. Refreshes lazily",
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=2),
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def asset2(asset1, asset3):
    ...


@observable_source_asset
def my_observable_source():
    # this version will always be different
    return DataVersion(str(datetime.datetime.now()))


@asset(
    description="Materialized ad hoc or by external processes",
    non_argument_deps={my_observable_source.key},
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def asset3():
    ...


refresh_job = define_asset_job("refresh_asset1", AssetSelection.keys("asset1"))

refresh_schedule = ScheduleDefinition(
    job=refresh_job,
    cron_schedule="*/1 * * * *",
    default_status=DefaultScheduleStatus.RUNNING,
)

defs = Definitions(
    assets=[asset1, asset2, asset3, my_observable_source], schedules=[refresh_schedule]
)
o
that's exactly right yep! we've considered making it possible to encode the nature of the asset (constantly going out of date or not) in some way other than creating an upstream observable source asset, but it's not currently high priority
👍 1
c
Gotcha. I did wonder if there’s a way to encode this pattern / use case. Hope to hear of it soon in the coming releases! 😄
And thanks once again everyone for the help!!