https://dagster.io/ logo
#ask-community
Title
# ask-community
c

Christopher Tee

03/14/2023, 8:45 AM
Hi everyone! I understand that it’s possible to parametrize `asset`s using `config_schema`s. However, I’m not quite sure how I can supply these config values to `asset`s in a scheduled or “reactive” setting. A minimal code example is provided in the thread. Thanks in advance! 🙌
dagster bot responded by community 1
Minimal code example:
Copy code
@asset(config_schema={"input_a": str})
def a():
    ...

@asset(config_schema={"input_b": bool})
def b(a):
    ...

# Make `asset a` run on a schedule
update_job = define_asset_job(
    name="update_job", selection=AssetSelection.keys("a")
)
update_job_schedule = ScheduleDefinition(
    name="update_job_schedule", job=update_job, cron_schedule="* * * * *"
)

# Make `asset b` automatically materialize after `asset a`
update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)

defs = Definitions(
    assets=[a, b],
    schedules=[update_job_schedule],
    sensors=[update_sensor],
)
z

Zach

03/14/2023, 5:20 PM
you can use the
config
parameter of
define_asset_job
to make configured versions of your asset job and then schedule those. something like
Copy code
update_job = define_asset_job(
    name="update_job", selection=AssetSelection.keys("a"), config={"ops": {"b": {"config": "input_a": True}}}
)
update_job_schedule = ScheduleDefinition(
    name="update_job_schedule", job=update_job, cron_schedule="* * * * *"
)
c

Christopher Tee

03/15/2023, 3:43 AM
Thanks @Zach for the response! I tried your solution and it seems that you can only specify the config corresponding to the selected asset in
define_asset_job
. So in the example above, I can only provide config values for Asset `a`:
Copy code
update_job = define_asset_job(
    name="update_job",
    selection=AssetSelection.keys("a"),
    config={
        "ops": {
            "a": {"config": {"input_a": "hello"}},
        }
    },
)
Providing config values for Asset
b
leads to an error on
dagster dev
initialization, since it’s not part of the selected assets:
Copy code
dagster.daemon.SchedulerDaemon - WARNING - Could not load location scheduling.py to check for schedules due to the following error: dagster._core.errors.DagsterInvalidConfigError: Invalid default_value for Field.
    Error 1: Received unexpected config entry "b" at the root. Expected: "{ a: { config: { input_a: String } outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".
By specifying config values for Asset
a
only, Dagster can be initialized but later fails when the schedule & sensor are activated:
Copy code
ERROR - Sensor daemon caught an error for sensor update_sensor
Traceback (most recent call last):
  File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 489, in _process_tick_generator
    yield from _evaluate_sensor(
  File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 653, in _evaluate_sensor
    run = _get_or_create_sensor_run(
  File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 780, in _get_or_create_sensor_run
    return _create_sensor_run(
  File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 815, in _create_sensor_run
    external_execution_plan = repo_location.get_external_execution_plan(
  File "../python3.10/site-packages/dagster/_core/host_representation/repository_location.py", line 750, in get_external_execution_plan
    execution_plan_snapshot_or_error = sync_get_external_execution_plan_grpc(
  File "../python3.10/site-packages/dagster/_api/snapshot_execution_plan.py", line 65, in sync_get_external_execution_plan_grpc
    raise DagsterUserCodeProcessError.from_error_info(result.error)
dagster._core.errors.DagsterUserCodeProcessError: dagster._core.errors.DagsterInvalidConfigError: Error in config for job
    Error 1: Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'a': {'config': {'input_a': '...'}}, 'b': {'config': {'input_b': True}}}}
So the remaining question is how to configure the config values for asset
b
(materialized by automatic reconciliation)? I’m guessing it involves supplying configs to a
RunRequest
in a
@sensor
. The issue is that this sensor is a reconciliation sensor, and there’s no way I’m aware of to provide config values to
build_asset_reconciliation_sensor
.
15 Views