https://dagster.io/ logo
Title
b

Balázs Dukai

10/05/2022, 2:19 PM
I'm just going to put this here for others if they get some unexpected resource configuration behaviour. I manage my resource configs by the deployment type (as per the fully_featured_project . So there is s resource-selector in my
repository.py
.
resource_defs_by_deployment_name = {
    "prod": RESOURCES_PROD,
    "local": RESOURCES_LOCAL,
}


@repository
def repo():
    deployment_name = os.environ.get("DAGSTER_DEPLOYMENT", "local")
    resource_defs = resource_defs_by_deployment_name[deployment_name]

    definitions = [
        with_resources(all_assets, resource_defs),
        *all_jobs
    ]

    return definitions
Then the
RESOURCES_LOCAL
contains the resource definitions for the local environment, and it is defined together with all the resources in
resources.___init___.py
(this is wrong!)
from resources.files import file_store
from resources.database import container

file_store_temp = file_store.configured({})

RESOURCES_LOCAL = {
    "file_store": file_store_temp,
    "container": container,
}
Where
file_store
and
container
are resource definitions. While
file_store_temp
is a configured resource. Now, here is where I stumbled (for many...many...hours). I have a job,
job_samle_data_image_test
, which is run by a
run_status_sensor
. The important bit is the
run_config
that goes into the
job_sample_data_image_test
that is executed by the
RunRequest
.
@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    name=f"asset_testing_success",
    monitored_jobs=[job_test, ],
    request_job=job_sample_data_image_test
)
def sensor_asset_testing_success(context):
    # The container and the temp dir in the monitored job were created with the
    # Run Id of the monitored job. In order to find the container the dir path, we
    # need the Run Id of the monitored job.
    run_id = bcore.get_run_id(context, short=True)
    container_id = make_container_id(run_id)
    temp_path = make_temp_path(run_id)

    image_tag = f"{datetime.today().date().isoformat()}-{run_id}"

    if context.dagster_run.job_name != job_sample_data_image_test.name:
        run_config = {
            "ops": {
                "sample_data_image_test": {
                    "config": {"image_repository": "test/sample-data",
                               "image_tag": image_tag,
                               "image_data_dir": "/tmp"}}
            },
            "resources": {
                "file_store": {"config": {"data_dir": temp_path}},
                "container": {"config": {"id": container_id}},

            }
        }
        return RunRequest(run_key=None, run_config=run_config)
    else:
        return SkipReason("Don't report status of status_reporting_job")
Notice that in the
run_config
I explicitly configure the two resources. When the
job_sample_data_image_test
is run, guess what resource configuration values is it going to take? My guess was that it will take whatever I pass in the
run_config
, thus
"file_store": {"config": {"data_dir": temp_path}}
and
"container": {"config": {"id": container_id}}
. Well, it turns out that this is just half-true. The job will get a
"file_store": {"config": {}}
, because that is what is configured for
file_store_temp
, which is passed to
RESOURCES_LOCAL
, which apparently overwrites the
run_config
for the job.
Then the
container
will get
{"id": container_id}
from the
run_config
, because RESOURCES_LOCAL only has the resource definition, and not the configured resource. That's it, I hope it can save some time for others.
:dagster-bot-resolve: 2
s

sandy

10/05/2022, 4:06 PM
Thanks for reporting this @Balázs Dukai. I was able to dig up a similar issue reported here: https://github.com/dagster-io/dagster/issues/9216. I definitely agree we should improve this experience.