Christopher Tee
07/10/2023, 3:41 PMasset1
is to be updated on a cron schedule (every minute)
• asset2
should lazily incorporate upstream data no later than 2 minutes ago
I’m having issues with the auto-materialization behaviour with asset2
. In the screenshot below, asset2
does not auto-materialize itself even though the dashboard indicates that there are newer upstream data and asset2
is overdue.
My suspicion is that because the materialization policy is lazy
, observations on the asset are not being triggered to evaluate whether it needs to be materialized or not.
Any idea on how I can get asset2
to materialize automatically? Thanks a lot!Christopher Tee
07/10/2023, 3:42 PMfrom dagster import (
AssetSelection,
AutoMaterializePolicy,
DefaultScheduleStatus,
Definitions,
FreshnessPolicy,
ScheduleDefinition,
asset,
define_asset_job,
)
@asset
def asset1():
...
@asset(
freshness_policy=FreshnessPolicy(maximum_lag_minutes=2),
auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def asset2(asset1):
...
refresh_job = define_asset_job("refresh_asset1", AssetSelection.keys("asset1"))
refresh_schedule = ScheduleDefinition(
job=refresh_job,
cron_schedule="*/1 * * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
defs = Definitions(assets=[asset1, asset2], schedules=[refresh_schedule])
I’m currently using dagster==1.3.13
chris
07/11/2023, 12:43 AMjohann
07/11/2023, 12:08 PMjohann
07/11/2023, 12:22 PMChristopher Tee
07/11/2023, 6:14 PMasset2
auto-materializes as expected!
It might be worthwhile adding that condition to the list of conditions under an asset’s “Auto-materialize policy” as a reminder. That, and/or, expose an API to auto-enable auto-materialization from the get-go 🙂Christopher Tee
07/11/2023, 6:19 PMasset2
no longer auto-materializes when I add asset3
which materializes in an ad hoc manner — even though asset2
is 2 minutes stale relative to asset1
.
Attaching the code once again for reproducibility.
from dagster import (
AssetSelection,
AutoMaterializePolicy,
DefaultScheduleStatus,
Definitions,
FreshnessPolicy,
ScheduleDefinition,
asset,
define_asset_job,
)
@asset(description="Refreshes every minute")
def asset1():
...
@asset(
description="Should be no more than 2 minutes stale. Refreshes lazily",
freshness_policy=FreshnessPolicy(maximum_lag_minutes=2),
auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def asset2(asset1, asset3):
...
@asset(description="Materialized ad hoc or by external processes")
def asset3():
...
refresh_job = define_asset_job("refresh_asset1", AssetSelection.keys("asset1"))
refresh_schedule = ScheduleDefinition(
job=refresh_job,
cron_schedule="*/1 * * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
defs = Definitions(assets=[asset1, asset2, asset3], schedules=[refresh_schedule])
I assume this is a bug, or am I misinterpreting the auto-materialization behaviour?owen
07/11/2023, 8:31 PMimport datetime
from dagster import (
AssetSelection,
AutoMaterializePolicy,
DefaultScheduleStatus,
Definitions,
FreshnessPolicy,
ScheduleDefinition,
asset,
define_asset_job,
observable_source_asset,
)
from dagster._core.definitions.data_version import DataVersion
@asset(description="Refreshes every minute")
def asset1():
...
@asset(
description="Should be no more than 2 minutes stale. Refreshes lazily",
freshness_policy=FreshnessPolicy(maximum_lag_minutes=2),
auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def asset2(asset1, asset3):
...
@observable_source_asset
def my_observable_source():
# this version will always be different
return DataVersion(str(datetime.datetime.now()))
@asset(
description="Materialized ad hoc or by external processes",
non_argument_deps={my_observable_source.key},
)
def asset3():
...
refresh_job = define_asset_job("refresh_asset1", AssetSelection.keys("asset1"))
refresh_schedule = ScheduleDefinition(
job=refresh_job,
cron_schedule="*/1 * * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
defs = Definitions(
assets=[asset1, asset2, asset3, my_observable_source], schedules=[refresh_schedule]
)
Christopher Tee
07/11/2023, 9:36 PMnow
). By attaching that ancestor to an observable_source_asset
, Dagster knows that it (asset3
in this case), is not 2 minutes stale (relative to the observable source asset), allowing asset2
to auto-materialize.Christopher Tee
07/11/2023, 9:38 PMasset2
won’t re-materialize automatically after refreshing/observing the observable_source_asset
. To get asset2
to continue to auto-materialize, asset3
needs to have a lazy auto_materialization_policy
.
I’m attaching the final code here for others & posterity
import datetime
from dagster import (
AssetSelection,
AutoMaterializePolicy,
DefaultScheduleStatus,
Definitions,
FreshnessPolicy,
ScheduleDefinition,
asset,
define_asset_job,
observable_source_asset,
)
from dagster._core.definitions.data_version import DataVersion
@asset(description="Refreshes every minute")
def asset1():
...
@asset(
description="Should be no more than 2 minutes stale. Refreshes lazily",
freshness_policy=FreshnessPolicy(maximum_lag_minutes=2),
auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def asset2(asset1, asset3):
...
@observable_source_asset
def my_observable_source():
# this version will always be different
return DataVersion(str(datetime.datetime.now()))
@asset(
description="Materialized ad hoc or by external processes",
non_argument_deps={my_observable_source.key},
auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def asset3():
...
refresh_job = define_asset_job("refresh_asset1", AssetSelection.keys("asset1"))
refresh_schedule = ScheduleDefinition(
job=refresh_job,
cron_schedule="*/1 * * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
defs = Definitions(
assets=[asset1, asset2, asset3, my_observable_source], schedules=[refresh_schedule]
)
owen
07/11/2023, 9:40 PMChristopher Tee
07/11/2023, 9:46 PMChristopher Tee
07/11/2023, 9:46 PM