Christopher Tee
03/14/2023, 8:45 AM@asset(config_schema={"input_a": str})
def a():
...
@asset(config_schema={"input_b": bool})
def b(a):
...
# Make `asset a` run on a schedule
update_job = define_asset_job(
name="update_job", selection=AssetSelection.keys("a")
)
update_job_schedule = ScheduleDefinition(
name="update_job_schedule", job=update_job, cron_schedule="* * * * *"
)
# Make `asset b` automatically materialize after `asset a`
update_sensor = build_asset_reconciliation_sensor(
name="update_sensor", asset_selection=AssetSelection.all()
)
defs = Definitions(
assets=[a, b],
schedules=[update_job_schedule],
sensors=[update_sensor],
)
Zach
03/14/2023, 5:20 PMconfig
parameter of define_asset_job
to make configured versions of your asset job and then schedule those. something like
update_job = define_asset_job(
name="update_job", selection=AssetSelection.keys("a"), config={"ops": {"b": {"config": "input_a": True}}}
)
update_job_schedule = ScheduleDefinition(
name="update_job_schedule", job=update_job, cron_schedule="* * * * *"
)
Christopher Tee
03/15/2023, 3:43 AMdefine_asset_job
. So in the example above, I can only provide config values for Asset `a`:
update_job = define_asset_job(
name="update_job",
selection=AssetSelection.keys("a"),
config={
"ops": {
"a": {"config": {"input_a": "hello"}},
}
},
)
Providing config values for Asset b
leads to an error on dagster dev
initialization, since it’s not part of the selected assets:
dagster.daemon.SchedulerDaemon - WARNING - Could not load location scheduling.py to check for schedules due to the following error: dagster._core.errors.DagsterInvalidConfigError: Invalid default_value for Field.
Error 1: Received unexpected config entry "b" at the root. Expected: "{ a: { config: { input_a: String } outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".
By specifying config values for Asset a
only, Dagster can be initialized but later fails when the schedule & sensor are activated:
ERROR - Sensor daemon caught an error for sensor update_sensor
Traceback (most recent call last):
File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 489, in _process_tick_generator
yield from _evaluate_sensor(
File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 653, in _evaluate_sensor
run = _get_or_create_sensor_run(
File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 780, in _get_or_create_sensor_run
return _create_sensor_run(
File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 815, in _create_sensor_run
external_execution_plan = repo_location.get_external_execution_plan(
File "../python3.10/site-packages/dagster/_core/host_representation/repository_location.py", line 750, in get_external_execution_plan
execution_plan_snapshot_or_error = sync_get_external_execution_plan_grpc(
File "../python3.10/site-packages/dagster/_api/snapshot_execution_plan.py", line 65, in sync_get_external_execution_plan_grpc
raise DagsterUserCodeProcessError.from_error_info(result.error)
dagster._core.errors.DagsterUserCodeProcessError: dagster._core.errors.DagsterInvalidConfigError: Error in config for job
Error 1: Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'a': {'config': {'input_a': '...'}}, 'b': {'config': {'input_b': True}}}}
So the remaining question is how to configure the config values for asset b
(materialized by automatic reconciliation)?
I’m guessing it involves supplying configs to a RunRequest
in a @sensor
. The issue is that this sensor is a reconciliation sensor, and there’s no way I’m aware of to provide config values to build_asset_reconciliation_sensor
.