I want to apply declarative scheduling to DBT asse...
# integration-dbt
g
I want to apply declarative scheduling to DBT assets https://dagster.io/blog/declarative-scheduling but where can I specify the
@asset(freshness=600seconds)
tag for an asset which is defined inside DBT?
b
There is a blog post with details about dbt. You will just have to add some config to your model with they key
dagster_freshness_policy
e.g.
Copy code
{{
  config(
    dagster_freshness_policy={"maximum_lag_minutes": 60}
  )
}}
g
It is shown there that:
Copy code
maximum_lag_minutes
can be configured. Is it also possible to pass the cron expression? I.e. tell dagster somehow that the dbt asset foo should be up-to-dte by 09:00 in the morning i.e. to trigger sufficiently up-front the reconciliation of all the data dependencies so it is on-time even if the various steps to materialize the dependencies are not instant?
b
There is some info here as well.
maximum_lag_minutes
will always be required but you can also define a
cron_schedule
i.e, for this table, I want data at 9am every day to be fresh by x minutes
g
@Sean Lopp I observe two bugs here A) KeyError: 'maximum_lag_minutes' B) The value must be specified literally and cannot reference a DBT variable
{{ config( dagster_freshness_policy={"cron_schedule": '0 3 * * SUN'} ) }} -- dagster_freshness_policy={"cron_schedule": {{ 'cron_schedule_weekly' }}}
Would it be possible to specify this constraint (ready at 9 in the morning using the maximum_lag_minutes? Would it be possible to ensure that upstream data assets are triggered well-before i.e. even if they take long to materialize that the SLA 9AM is not violated? And lastly: If not applicable via the maximum_lag_minutes - can you also support cron_schedule? And furthermore support the parsing of a DBT variable inside the configuration?
b
What did you try for the variable in the config? I can’t help too much on the Dagster side but I know my dbt 🙂
s
Copy code
{{
        config(
                dagster_freshness_policy={"cron_schedule": "0 9 * * *", "maximum_lag_minutes": 0}
        )
}}
I think this does what you want @geoHeil Can you open a GitHub issue for the request to support DBT variable references?
g
will do so
@Benoit Perigaud indeed - I was simply just feeding a normal DBT variable in the DBT template notation.
@Sean Lopp but this will trigger the leaf asset at 09:00 each day - right? I want the leaf asset to be up-to-date at 09:00 already. I.e. even if upstream data dependencies might take minutes our perhaps hours to accumulate/materialize the SLA of: The leaf is fresh at 09:00 should not be violated. Does this work with declarative scheduling (which includes CRON expressions)?
s
I believe the asset reconciliation sensor tracks when an asset is going to become stale, and then uses the historical run times of upstreams to launch a run and attempt to refresh the asset before the policy is violated, but @owen would be able to confirm/clarify
o
currently, it doesn't factor in historical run times (mostly because calculating how long a particular asset might take to execute is trickier than you might think!).However, it will kick off runs before the
9:00
limit. In cases like yours (a once a day asset, without other freshness policies in the mix), it will generally kick off the execution as soon as possible -- so as soon as data for the previous day is made available, your asset will start to execute.
👀 1
ah except for one caveat -- you'll want to set your maximum lag minutes to
9*60
. The cron schedule specifies
"when do you want the data by"
and the maximum lag minutes indicates
"at that time, how old can the data be"
So in this case, that would be saying "at 9AM, the data can be no more than 9 hours old", which is equivalent to saying that all of yesterday's data should be available by 9AM
j
It would be nice to have the "data should be available by 9AM" feature. As you said Owen, it would require historical data to calculate the median time it takes to run the asset to determine when the asset should be triggered. There should be a fallback as well to trigger the asset if the sensor doesn't kick off the asset run by a certain time. Is this something that is (A) possible, (B) requested for by others, and (C) on the roadmap?
🌈 1
👀 1
g
exactly
o
I'm interested in what problems you're anticipating with the current behavior -- in general, information about the expected runtime of an asset would only be used to allow execution to start later than the current logic would initiate it (as we could have confidence that the execution would complete by the deadline). As it stands currently, assuming the reconciliation sensor was targeting your entire graph, execution for a
FreshnessPolicy(maximum_lag_minutes=9*60, cron_schedule="0 9 * * *")
would start just after midnight (i.e. a run of that asset and all of its upstream assets would be kicked off at that time). It would not be possible to start that run earlier than midnight, as then the root data that was pulled in at the start of the run would not have all of the records for the previous day yet. So in essence, it's giving the asset the best possible chance of having
"all of yesterday's data"
by
9AM
. If you have some upstream source data that lands at different times (i.e. not at exactly midnight), you can model that by removing those upstream assets from being targeted by the reconciliation sensor (so the sensor won't automatically kick off runs for those assets), and kick them off with your own custom logic (such as a custom sensor). In this case, the reconciliation sensor will kick off of the asset with a freshness policy (and all of the targeted upstream assets) as soon as all the relevant upstream data is made available (which again is the earliest possible time to kick off this computation). Things can get more complicated when more freshness policies with different deadlines are in the mix, but for simple cases like this, it seems like this would be the desired behavior.
g
I think it would be good if we can harmonize both approaches i.e. the blocking (for some special assets) with the normal reconciliation pipeline. Plus - as mentioned before some early SLA warning once it is clear that given historic runtimes that the E2E materializations will no longer make it on time.
🌈 1
o
yep that makes total sense! the current plan (not all the pieces are in place for this to work yet) for the "blocking for some special assets" would be via observable source assets . Let's say you have some files that drop yesterday's data in an s3 bucket at some time after midnight each day. Then you have an asset that ingests those files and writes them out to tables (which are then consumed by dbt). The files in that s3 bucket could be modeled as an observable source asset, which the table that ingests those files downstream of it. The reconciliation sensor would then wait for the observable source to indicate that new data was available before kicking off the downstream assets.
g
How is observable source and sensor a duplication or incompatible concept?
But the reconciliation should only progress if all recent data is available.
o
the observable source is a way to let the reconciliation logic know when recent data is available (rather than either assuming that data for time T is available exactly at time T, or relying on a separate sensor to manage the state of the upstream data)
g
Do you have an idea (roughly) when such blocking for reconciliations might be available in dagster?
o
Just to be clear, the reconciliation sensor will already block for any upstream assets that are not part of its selection argument. So if you
build_asset_reconciliation_sensor(asset_selection=AssetSelection.all() - AssetSelection.keys("asset", "keys", "to", "block", "on"))
, then you'll have the behavior you're looking for (as long as you have some separate sensor(s) to manage those keys. But for the fully-integrated approach with observable source assets, I'd guess around a month or so?
g
Good to understand! Thanks
@owen is it possible with DBT tags (for some of the sources? To perform this set difference (subselection) based on these tags? I.e. select assets in dagster based on DBT tags? Or is this only possible during the loading from DBT?
j
@owen I'm not sure I understand how
FreshnessPolicy(maximum_lag_minutes=9*60, cron_schedule="0 9 * * *")
. is that the same as doing
@schedule(cron_schedule="0 0 * * *"
and having it complete at 9:00am because I know that the asset takes 9 hours to run?
s
@Jonathan Neo you could think of the max lag minutes as similar to an offset So in the case of a single asset
FreshnessPolicy(maximum_lag_minutes=9*60, cron_schedule="0 9 * * *")
means we want all of the data from before midnight processed by 9am. The reconciliation sensor does this as eagerly as possible, so it starts a run around midnight.
FreshnessPolicy(maximum_lag_minutes=30, cron_schedule="0 9 * * *")
means we want all of the data from before 8:30am ready by 9am. The reconciliation sensor still does this as eagerly as possible, so it starts a run around 8:30 In the case of multiple assets it starts to get more complicated. That is where the reconciliation sensor does a lot of its magic, and I think its well described here: https://dagster.io/blog/declarative-scheduling
@geoHeil there are a few ways to select dbt assets based on their tags in Dagster. One way is to use
node_info_to_group_fn
to translate dbt tags into asset groups and then use
AssetSelection.groups
. Another option would be to call
load_assets_from_dbt_*
using the
selection
argument and then pass the resulting object to
AssetSelection.assets
o
And just adding another option to Sean's list, there's now a
DbtManifestAssetSelection
, so if you have a pre-compiled manifest.json file, then you can use dbt selection syntax directly: https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.DbtManifestAssetSelection
j
@Sean Lopp Thanks for explaining it to me! It makes sense now. It took awhile for it to click in my head. I went through the docs several times initially to try to get it, but your explanation made it click. A follow up question then: Let's assume that we have
FreshnessPolicy(maximum_lag_minutes=9*60, cron_schedule="0 9 * * *")
as you say. If the asset takes: • 1 hour to run, then the freshness policy will be fulfilled at 9am. • 9 hours to run, then the freshness policy will be fulfilled at 9am, because the asset would have been refreshed by exactly 9am. • 10 hours to run, then the freshness policy will fail when checked at 9am, because the asset is still materializing. That's where my comment about "historical data to calculate the median time it takes to run the asset to determine when the asset should be triggered" would be useful. If I want my asset to be fresh by 9 AM, then I can define that declaratively, and let dagster calculate what time it needs to trigger the asset refresh. If the asset takes 10 hours to run, then it needs to trigger it at 11pm the day before. This feature would be useful for pipelines that are slowly increasing in execution time. Where it stands today, the developer has to manually update
maximum_lag_minutes
to a higher number when the asset's execution time increases.
s
@Jonathan Neo thanks, for the added context. I think the only tricky bit is that if you want all the data as of midnight by 9am, but your job takes 10 hours to run, then you'd never be able to meet the constraints the best you could do is: • say you want the data from before mid-night by 10am • or say you want the data from from before 11pm by 9am
👍 1
j
Ok thanks @Sean Lopp!