Robert Wade
12/22/2022, 9:01 PMowen
12/29/2022, 3:02 PMRobert Wade
01/25/2023, 11:10 PM@asset(config_schema={"file path": path})
def upstream_asset(context):
path = context.op_config["path"]
write_garbage_to_file("garbage", path)
@asset(config_schema={"s3_bucket": bucket})
def downstream_asset(context, upstream_asset):
bucket = context.op_config["s3_bucket"]
# do stuff with bucket
update_sensor = build_asset_reconciliation_sensor(
name="update_sensor", asset_selection=AssetSelection.all()
)
update_job = define_asset_job(name="update_job", selection=AssetSelection.keys("upsteam_asset"))
update_schedule = ScheduleDefinition(name="update_job_schedule", job=update_job, cron_schedule="* * * * *", run_config=load_config())
defs = Definitions(assets=[upstream_asset, downstream_asset], schedules=[update_job_schedule], sensors=[update_sensor])
The schedule will run based on the cron. It will cause the job to run and will provide the job with the run_config, which will update the upstream_asset. The sensor will then determine that downstream_asset is stale and will start a run to materialize downstream_asset.
BUT
What config is provided to downstream_asset? Where do you define this runtime configuration?Andras Somi
01/26/2023, 9:36 AMRobert Wade
01/26/2023, 4:45 PMAndras Somi
01/26/2023, 5:36 PMRobert Wade
01/26/2023, 5:38 PMowen
01/26/2023, 11:11 PMRobert Wade
01/27/2023, 12:03 AMAndras Somi
01/27/2023, 7:21 AMowen
01/27/2023, 5:26 PM