https://dagster.io/ logo
#ask-community
Title
# ask-community
m

Malo PARIS

06/01/2023, 9:01 AM
Hi community, Try to implement declarative scheduling with
build_asset_reconciliation_sensor
Attached my experimental assets :
asset1
to
5
10 min freshness policy ('test' group)
asset6
and
asset7
has 2 min freshness policy ('sensor' group) My sensor code :
Copy code
freshness_sla_sensor = build_asset_reconciliation_sensor(
    name="freshness_sla_sensor",
    asset_selection=AssetSelection.groups("sensor"),  
    default_status=DefaultSensorStatus.STOPPED,
    description="My SLA Sensor",
)
My expectation : the
freshness_sla_sensor
trigger
all needed assets
run when
asset6
or
asset7
are not respecting their freshness policy My ultimate goal : avoid unnecessary materialization of assets
asset1
to
5
It works when I use
AssetSelection.all()
but don't want
asset1
to
5
to materialize on its own With
AssetSelection.groups("sensor")
the sensor
skipped
all the tick.. Thanks for your help !
s

Sean Lopp

06/01/2023, 5:01 PM
The asset selection in the sensor controls which assets the sensor is allowed to try and run. In the case where you use
AssetSelection.groups("sensor")
the sensor is only allowed to run asset6 and asset7, but it knows that those assets can not be run without their upstreams running, so I'd expect it to not launch runs and instead those two assets would just be flagged as overdue More broadly, I don't think I understand your current setup. How would asset6 and asset7 have a 2 minute policy when the upstreams have a 10 minute policy? Wouldn't that cause un-necessary runs for asset 6 and 7 since they'd be processing every 2 minutes inputs that only change every 10 minutes? cc @owen our freshness expert as well
👍 1
@owen
👀 1
m

Malo PARIS

06/01/2023, 5:56 PM
Yes ! It seems evident now ! My goal is to tell dagster : “do not compute asset1 … to 5 more than once a day, never mind sheduled job or sensor run”
s

sandy

06/01/2023, 10:50 PM
@Malo PARIS do you have an understanding of how to get the behavior you're looking for? If not, are you able to describe: • Under what conditions do you want assets 1 through 5 to update? • Under what conditions do you want assets 6 and 7 to update? One more thing: we generally recommend using `AutoMaterializePolicy`s instead of
build_asset_reconciliation_sensor
. The latter is deprecated and will go away eventually.
m

Malo PARIS

06/02/2023, 10:00 AM
Hey @sandy, yeah it's a bit confusing for me Under what conditions do you want assets 1 through 5 to update? -> they must be fresh to materialize asset6 and asset7 but must only be refreshed once a day Under what conditions do you want assets 6 and 7 to update? -> asset6 must be fresh every day at 6 am -> asset7 must be fresh every day at 10 pm I will check
AutoMaterializePolicy
👍
s

sandy

06/05/2023, 10:31 PM
if that's what you're trying to accomplish, I believe the following would work: • on assets 1-5, assign lazy auto-materialize policies and no freshness policy • on asset 6, assign lazy auto-materialize policy and
FreshnessPolicy(cron_schedule="0 6 * * *", maximum_lag_minutes=24 * 60)
• on asset 7, assign lazy auto-materialize policy and
FreshnessPolicy(cron_schedule="0 22 * * *", maximum_lag_minutes=24 * 60)
m

Malo PARIS

06/06/2023, 7:01 AM
Thanks a lot, I will try 🙂 I have a better understanding now ! This feature is so powerful ! One last question, when I used
jobs
and
schedules
I could limit the
Op
concurrency with this :
Copy code
config={
        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 3,      # limits concurrent execution to 3
                },
            }
        }
    },
Is it possible to do this with the execution launched for
auto-materialize
? As we don't explicitly declare
jobs
in this case, we can't apply op/asset-level concurrency
s

sandy

06/09/2023, 6:07 PM
This won't work exactly the same, but you can still assign an executor at the
Definitions
level and apply that config to the executor
👍 1