Trying to learn about “build_asset_reconciliation_...
# ask-community
r
Trying to learn about “build_asset_reconciliation_sensor” and possibly move away from cron-based job scheduling (as suggested here: https://docs.dagster.io/guides/dagster/scheduling-assets?_gl=1*1w38q5o*_ga*MTM4ODU0Mjk5OS4xNjcxNzI4MjUw*_ga_84VRQZG7TV*MTY3MTcyODI1MC4xLjEuMTY3MTcyOTA2Mi4wLjAuMA..) With cron-based job scheduling, I’m able to provide config to all assets that are materialized by the job. With build_asset_reconciliation_sensor this does not seem possible. So if this reconciliation sensor recognizes that some assets are in need of being refreshed, how does run config get provided to the assets? I’ve built my assets on the assumption that the best way to provide config to them is via the @asset config_schema param and would prefer to not have to move away from this (unless there is a much better way that is suggested and supported).
o
hi @Robert Wade! how are you currently deciding what config to provide for each execution? or are you always providing the same config when launching from the sensor?
r
hi @owen. TBH I am just trying to make sense of how to execute assets in a way other than having a job execute all of the assets. The concept of an asset reconciliation sensor seems to be what I am looking for: only materialize those assets that are "due" to be materialized based on their freshness policies. So currently I have a job that executes a bunch of assets. Each asset has a config_schema that defines what parameters it needs. That job is kicked off by a schedule, which itself provides a RunRequest, which contains a run_config. That run_config consists of all the configuration parameters that are loaded from yml files.
When going to the build_asset_reconciliation_sensor, it appears that I can't supply the run_config. If that is true, then when that sensor detects that one or more assets need to be materialized, how do I provide those assets their configuration parameters?
So if I have this:
Copy code
@asset(config_schema={"file path": path})
def upstream_asset(context):
	path = context.op_config["path"]
	write_garbage_to_file("garbage", path)


@asset(config_schema={"s3_bucket": bucket})
def downstream_asset(context, upstream_asset):
	bucket = context.op_config["s3_bucket"]
	# do stuff with bucket


update_sensor = build_asset_reconciliation_sensor(
	name="update_sensor", asset_selection=AssetSelection.all()
)

update_job = define_asset_job(name="update_job", selection=AssetSelection.keys("upsteam_asset"))
update_schedule = ScheduleDefinition(name="update_job_schedule", job=update_job, cron_schedule="* * * * *", run_config=load_config())


defs = Definitions(assets=[upstream_asset, downstream_asset], schedules=[update_job_schedule], sensors=[update_sensor])
The schedule will run based on the cron. It will cause the job to run and will provide the job with the run_config, which will update the upstream_asset. The sensor will then determine that downstream_asset is stale and will start a run to materialize downstream_asset. BUT What config is provided to downstream_asset? Where do you define this runtime configuration?
a
Maybe the s3 bucket config in downstream could be an s3 resource preconfigured for that bucket? That way you only have to pass the io manager key to the asset decorator and the asset could be reconciled any time. I’ve been thinking about this recently and my conclusion was that if an asset truly needs runtime config then it’s not fit for asset reconciliation because it’s not dependent solely on an other asset. I try to design my assets in a way that the first one is materialized by a job run (manual, sensor or schedule) with appropriate configs and downstream assets only rely on the asset’s output and some resources. I’m still not sure that I get this right though.
r
My use of the s3 bucket config was just to illustrate that the asset needs a config param. There could be other parameters that aren't necessarily a "resource." I hear what you are saying, but the docs clearly show that assets ARE supposed to take runtime configuration -- it is clearly possible and shown in the docs.
a
Yes, assets can take runtime config, I don’t debate that. My doubt is about reconciliation sensors and runtime asset configs playing nice together. Let’s hope someone from the Dagster team sheds some light on this for us.
r
Yes, I hope so too. It just seems odd that a job would allow run-time config to be provided to assets, but then a sensor would not. It implies that sensors can only materialize assets that don't need config, OR it forces assets to manually retrieve their config from an external source.
o
I tend to agree with @Andras Somi here -- the asset reconciliation logic has no way of mapping the current state of the world to some desired run config (it only can decide if an asset needs to be executed or not, it's hard to see how it could make decisions on what specific config might be needed). However, it sounds like your config is generally static (i.e. you wouldn't actually want the sensor to vary what config it sends to the asset, you just want it to be able to pull in some pre-defined config from yaml files). If this config really is static, I think currently the way to handle it would be to associate that config directly with the asset (i.e. load it in as default values when building the asset definition or something to that effect). I do see the value in what you're trying to do though (it's nice to have a centralized place to toss your config), I just don't think it maps very smoothly onto the asset reconciliation sensor because the runs that will be produced by the sensor are somewhat irregular (i.e. they'll all contain different subsets of the assets, so a big blob of config will generally contain a bunch of irrelevant stuff)
🙌 1
I think a potentially more elegant thing (I'd need to think more on this) that could be added would be the ability to pass in a job (or set of jobs) that the reconciliation logic would have the ability to kick off runs of. Then the jobs themselves could be configured, so whatever run requests the sensor produced would have the desired config
r
I understand what you are saying: "asset reconciliation logic has no way of mapping the current state of the world to some desired run config" However, it seems odd that we would have jobs that rely on run-config to configure assets, and then we have to have a different way for sensors to configure assets. Ability to kick off runs in dagit and kick off runs via jobs that rely on config yml is valuable to us, so if sensors suddenly can't configure assets then sensors are useless for our purposes.
👍 1
a
@Robert Wade You can kick off configured runs from sensors or asset sensors, it's only the reconciliation sensor (which is a very different beast, probably shouldn't even be called a sensor) that has this sort of natural limitation.
👍 1
o
just expanding on that, the asset reconciliation sensor is currently a sensor mostly out of convenience for fast iteration/development -- in the future, it will become its own daemon process (similar to the backfill daemon), which will allow it to work across code locations (rather than being linked to a particular repository)
👍 1
210 Views