https://dagster.io/ logo
#ask-community
Title
# ask-community
d

Danny Steffy

02/23/2023, 8:33 PM
I'm currently running a fanned out graph aseet job that splits into 60 different pieces using the QueuedRunCoordinator. We have the dagster daemon dagster.yaml as:
Copy code
run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 25
    tag_concurrency_limits:
      - key: "database"
        value: "sql"
        limit: 8
      - key: "ml"
        value: "score"
        limit: 10
However, only 4 ops are generated at a time. We previously had the concurrency limit for sql set to 4. Is there a way to confirm that those updated limits are currently loaded in the daemon? also how do those keys and values become associated with a run? I'm using a sql server resource in my fanned out op, does it do some sort of name checking with the resource?
d

daniel

02/23/2023, 8:42 PM
Hi Danny - those tags will strictly refer to tags on the job - not on individual ops or assets. We don't currently have a way to supply global concurrency limits at the op level, but its come up a lot recently and it's something we're starting to take a closer look at If you want to apply op-level concurrency limits within a single run, you can configure that in code on the executor for your job, following the example here: https://docs.dagster.io/concepts/ops-jobs-graphs/job-execution#op-concurrency-limits
d

Danny Steffy

02/23/2023, 9:20 PM
I see, so my issue is unrelated to the concurrency limits
d

daniel

02/23/2023, 9:20 PM
if you want more than 4 at once, I think that might be changing the max_concurrent parameter of the executor
in that same doc link - I believe the default is based on the number of CPUs available
d

Danny Steffy

02/23/2023, 10:49 PM
hm I see. Is there any way to set the executor for a graph defined asset to be materialized via the UI?
d

daniel

02/23/2023, 10:49 PM
Yeah you can also specify that same config in the launchpad UI '
Copy code
execution:
  config:
    multiprocess:
      max_concurrent: 20
(If you Shift-click on Materialize it will open the launchpad UI(
d

Danny Steffy

02/23/2023, 10:52 PM
ah I see! Can I specify to use the docker executor instead of the multiprocess?
d

daniel

02/23/2023, 10:52 PM
you specify which executor to use in code, but you can configure that executor in the launchpad
d

Danny Steffy

02/23/2023, 10:56 PM
I didn't think I could define the executor for the assets or graphs in code, just for jobs?
d

daniel

02/23/2023, 10:56 PM
oh i see - the definitions or repository object can also specify a default executor which would be used there
d

Danny Steffy

02/23/2023, 10:57 PM
ah, thank you!
do I need the same dagster.yaml config that I have for the daemon to be set in the dagit dagster.yaml as well? Specifically the run launcher and all the configuration settings for the DockerRunLauncher? Getting an error about an environment variable not being set when I try to materialize the asset in the UI with the new docker_executor being set in the Definition
d

daniel

02/23/2023, 11:17 PM
Yeah, you want a single shared dagster.yaml
(or rather, your dagster.yaml should be the same everywhere)
d

Danny Steffy

02/23/2023, 11:58 PM
hmmm getting this error still:
Copy code
dagster._core.errors.DagsterInvalidConfigError: Errors whilst loading configuration for {'image': Field(<dagster._config.source.StringSourceType object at 0x7f7999fa8910>, default=@, is_required=False), 'network': Field(<dagster._config.source.StringSourceType object at 0x7f7999fa8910>, default=@, is_required=False), 'registry': Field(<dagster._config.field_utils.Shape object at 0x7f7993b66d70>, default=@, is_required=False), 'env_vars': Field(<dagster._config.config_type.Array object at 0x7f7993b9b130>, default=@, is_required=False), 'container_kwargs': Field(<dagster._config.field_utils.Permissive object at 0x7f7999fa8580>, default=@, is_required=False), 'networks': Field(<dagster._config.config_type.Array object at 0x7f7993b986d0>, default=@, is_required=False)}.
Error 1: Post processing at path root:registry:password of original value {'env': 'DOCKER_REG_PASSWORD'} failed:
dagster._config.errors.PostProcessingError: You have attempted to fetch the environment variable "DOCKER_REG_PASSWORD" which is not set. In order for this execution to succeed it must be set in this environment.
I've made sure the dagit, dagster_code, and dagster_daemon containers all have that env var, and it's also being set in the run_launcher config that all three containers are now using in the dagster.yaml file
d

daniel

02/24/2023, 12:00 AM
Where are you seeing that error exactly?
d

Danny Steffy

02/24/2023, 12:00 AM
image.png
d

daniel

02/24/2023, 12:03 AM
Looks like that’s in the container it tried to spin up to run the op/step - you’re using the DockerRunLauncher?
d

Danny Steffy

02/24/2023, 12:03 AM
yep!
oh wait, interesting
d

daniel

02/24/2023, 12:04 AM
Typically the env vars you put on the run launcher would also be passed through to the step containers
d

Danny Steffy

02/24/2023, 12:04 AM
when I go to the configuration tab, it doesn't have that env_var set in the run_launcher config
but it's in there on my dagster.yaml... maybe I need to restart my dagit container. going to give that a try
d

daniel

02/24/2023, 12:07 AM
Depending on the way you have it set up it may need to be rebuilt, not just restarted
d

Danny Steffy

02/24/2023, 12:13 AM
ah. So if we copy the dagster.yaml file over to the image and then run the dagit command in the Dockerfile, would mounting the file location via docker-compose actually do anything? or do we need to rebuild the image each time?
d

daniel

02/24/2023, 12:29 AM
If they’re both in the DAGSTER_HOME folder I’m not actually certain how docker compose handles that - my guess would be the docker-compose would break the tie but im not certain
d

Danny Steffy

02/24/2023, 8:05 PM
hm so I'm still getting this error. I connected via bash to the different containers and they all have that environment variable mentioned above. I'm not sure what to look at next for this
d

daniel

02/24/2023, 8:05 PM
Can you paste the full stack trace? Including the red part at the bottom there that got cut off?
This is a recent version of dagster I assume?
d

Danny Steffy

02/24/2023, 8:05 PM
Copy code
dagster._core.errors.DagsterInvalidConfigError: Errors whilst loading configuration for {'image': Field(<dagster._config.source.StringSourceType object at 0x7f9abe7148e0>, default=@, is_required=False), 'network': Field(<dagster._config.source.StringSourceType object at 0x7f9abe7148e0>, default=@, is_required=False), 'registry': Field(<dagster._config.field_utils.Shape object at 0x7f9ab82ed990>, default=@, is_required=False), 'env_vars': Field(<dagster._config.config_type.Array object at 0x7f9ab82ef5b0>, default=@, is_required=False), 'container_kwargs': Field(<dagster._config.field_utils.Permissive object at 0x7f9abe714550>, default=@, is_required=False), 'networks': Field(<dagster._config.config_type.Array object at 0x7f9ab82ee290>, default=@, is_required=False)}.
Error 1: Post processing at path root:registry:password of original value {'env': 'DOCKER_REG_PASSWORD'} failed:
dagster._config.errors.PostProcessingError: You have attempted to fetch the environment variable "DOCKER_REG_PASSWORD" which is not set. In order for this execution to succeed it must be set in this environment.

Stack Trace:
File "/usr/local/lib/python3.10/site-packages/dagster/_config/post_process.py", line 79, in _post_process
new_value = context.config_type.post_process(config_value)
File "/usr/local/lib/python3.10/site-packages/dagster/_config/source.py", line 42, in post_process
return str(_ensure_env_variable(cfg))
File "/usr/local/lib/python3.10/site-packages/dagster/_config/source.py", line 16, in _ensure_env_variable
raise PostProcessingError(

  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/api.py", line 992, in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/executor/step_delegating/step_delegating_executor.py", line 305, in execute
    list(
  File "/usr/local/lib/python3.10/site-packages/dagster_docker/docker_executor.py", line 217, in launch_step
    container_context = self._get_docker_container_context(step_handler_context)
  File "/usr/local/lib/python3.10/site-packages/dagster_docker/docker_executor.py", line 145, in _get_docker_container_context
    run_launcher = step_handler_context.instance.run_launcher
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/instance/__init__.py", line 660, in run_launcher
    launcher = cast(InstanceRef, self._ref).run_launcher
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/instance/ref.py", line 491, in run_launcher
    return self.run_launcher_data.rehydrate() if self.run_launcher_data else None
  File "/usr/local/lib/python3.10/site-packages/dagster/_serdes/config_class.py", line 96, in rehydrate
    raise DagsterInvalidConfigError(
I think we have it pinned, but I'm not sure what version we're currently running
d

daniel

02/24/2023, 8:06 PM
Got it - so this error is implying pretty strongly that DOCKER_REG_PASSWORD is not set in the container that it spun up to run the job in. Is it possible to paste the contents of your dagster.yaml?
d

Danny Steffy

02/24/2023, 8:07 PM
Copy code
storage:
  postgres:
    postgres_db:
      username:
        env: POSTGRES_USER
      password:
        env: POSTGRES_PASSWORD
      hostname:
        env: POSTGRES_HOST
      db_name:
        env: POSTGRES_DB
      port: 5432

local_artifact_storage:
  module: dagster.core.storage.root
  class: LocalArtifactStorage
  config:
    base_dir: "/opt/dagster/local/"

run_launcher:
  module: dagster_docker
  class: DockerRunLauncher
  config:
    env_vars:
      - POSTGRES_USER
      - POSTGRES_PASSWORD
      - POSTGRES_DB
      - POSTGRES_HOST
      - DAGSTER_CURRENT_IMAGE
      - POSTGRES_HOST
      - POSTGRES_USER
      - POSTGRES_PASSWORD
      - POSTGRES_DB
      - FLUSH_USER
      - FLUSH_PASS
      - FLUSH_DISPATCH_SERVER
      - FLUSH_DISPATCH_DB
      - AIRBYTE_PASSWORD
      - AIRBYTE_HOST
      - DAGSTER_DEPLOYMENT
      - DOCKER_REG_PASSWORD
    networks:
      - dagster
      - flush
    registry:
      url:xxx
      username: xxxx
      password:
        env: DOCKER_REG_PASSWORD
    container_kwargs:
      volumes:
        - /ssd-dagster-01/data:/opt/dagster/local/
        - /ssd-dagster-01/code:/opt/dagster/app/
        #- dagster_io_manager:/tmp/io_manager_storage
        - /var/run/docker.sock:/var/run/docker.sock

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 25
    tag_concurrency_limits:
      - key: "database"
        value: "sql"
        limit: 8
      - key: "ml"
        value: "score"
        limit: 10

telemetry:
  enabled: false
d

daniel

02/24/2023, 8:08 PM
Is it possible to 'docker inspect' and paste the command that it used for that container (the one that it spun up for the run)?
er wait, maybe that doesn't print the command, one sec
er yeah, it should be in there
d

Danny Steffy

02/24/2023, 8:09 PM
ah I just checked what I believe is the container that got spun up for the run
and it doesn't have that env var
d

daniel

02/24/2023, 8:10 PM
aha!
d

Danny Steffy

02/24/2023, 8:10 PM
but I'm not sure why it wouldn't?
isnt' that being set by the dagster.yaml?
d

daniel

02/24/2023, 8:10 PM
It should be yeah, when you inspect it can you paste the key under "Cmd"?
My suspicion is still that the dagster.yaml is out of date somewhere
specifically my guess is that it's out of date on the daemon container
(since that is where the run is launched from)
d

Danny Steffy

02/24/2023, 8:12 PM
Copy code
"Cmd": [
                "dagster",
                "api",
                "execute_run",
                "{\"__class__\": \"ExecuteRunArgs\", \"instance_ref\": {\"__class__\": \"InstanceRef\", \"compute_logs_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"LocalComputeLogManager\", \"config_yaml\": \"base_dir: /opt/dagster/dagster_home/storage\\n\", \"module_name\": \"dagster.core.storage.local_compute_log_manager\"}, \"custom_instance_class_data\": null, \"event_storage_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"PostgresEventLogStorage\", \"config_yaml\": \"postgres_db:\\n  db_name:\\n    env: POSTGRES_DB\\n  hostname:\\n    env: POSTGRES_HOST\\n  password:\\n    env: POSTGRES_PASSWORD\\n  port: 5432\\n  username:\\n    env: POSTGRES_USER\\n\", \"module_name\": \"dagster_postgres\"}, \"local_artifact_storage_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"LocalArtifactStorage\", \"config_yaml\": \"base_dir: /opt/dagster/local/\\n\", \"module_name\": \"dagster.core.storage.root\"}, \"run_coordinator_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"QueuedRunCoordinator\", \"config_yaml\": \"max_concurrent_runs: 25\\ntag_concurrency_limits:\\n- key: database\\n  limit: 8\\n  value: sql\\n- key: ml\\n  limit: 10\\n  value: score\\n\", \"module_name\": \"dagster.core.run_coordinator\"}, \"run_launcher_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"DockerRunLauncher\", \"config_yaml\": \"container_kwargs:\\n  volumes:\\n  - /ssd-dagster-01/data:/opt/dagster/local/\\n  - /ssd-dagster-01/code:/opt/dagster/app/\\n  - /var/run/docker.sock:/var/run/docker.sock\\nenv_vars:\\n- POSTGRES_USER\\n- POSTGRES_PASSWORD\\n- POSTGRES_DB\\n- POSTGRES_HOST\\n- DAGSTER_CURRENT_IMAGE\\n- POSTGRES_HOST\\n- POSTGRES_USER\\n- POSTGRES_PASSWORD\\n- POSTGRES_DB\\n- FLUSH_USER\\n- FLUSH_PASS\\n- FLUSH_DISPATCH_SERVER\\n- FLUSH_DISPATCH_DB\\n- AIRBYTE_PASSWORD\\n- AIRBYTE_HOST\\n- DAGSTER_DEPLOYMENT\\nnetworks:\\n- dagster\\n- flush\\nregistry:\\n  password:\\n    env: DOCKER_REG_PASSWORD\\n  url: <http://flcontainers.azurecr.io|flcontainers.azurecr.io>\\n  username: flcontainers\\n\", \"module_name\": \"dagster_docker\"}, \"run_storage_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"PostgresRunStorage\", \"config_yaml\": \"postgres_db:\\n  db_name:\\n    env: POSTGRES_DB\\n  hostname:\\n    env: POSTGRES_HOST\\n  password:\\n    env: POSTGRES_PASSWORD\\n  port: 5432\\n  username:\\n    env: POSTGRES_USER\\n\", \"module_name\": \"dagster_postgres\"}, \"schedule_storage_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"PostgresScheduleStorage\", \"config_yaml\": \"postgres_db:\\n  db_name:\\n    env: POSTGRES_DB\\n  hostname:\\n    env: POSTGRES_HOST\\n  password:\\n
  env: POSTGRES_PASSWORD\\n  port: 5432\\n  username:\\n    env: POSTGRES_USER\\n\", \"module_name\": \"dagster_postgres\"}, \"scheduler_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"DagsterDaemonScheduler\", \"config_yaml\": \"{}\\n\", \"module_name\": \"dagster.core.scheduler\"}, \"secrets_loader_data\": null, \"settings\": {\"telemetry\": {\"enabled\": false}}, \"storage_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"DagsterPostgresStorage\", \"config_yaml\": \"postgres_db:\\n  db_name:\\n    env: POSTGRES_DB\\n
  hostname:\\n    env: POSTGRES_HOST\\n  password:\\n    env: POSTGRES_PASSWORD\\n  port: 5432\\n  username:\\n
  env: POSTGRES_USER\\n\", \"module_name\": \"dagster_postgres\"}}, \"pipeline_origin\": {\"__class__\": \"PipelinePythonOrigin\", \"pipeline_name\": \"__ASSET_JOB\", \"repository_origin\": {\"__class__\": \"RepositoryPythonOrigin\", \"code_pointer\": {\"__class__\": \"FileCodePointer\", \"fn_name\": \"defs\", \"python_file\": \"repo.py\", \"working_directory\": \"/opt/dagster/app/\"}, \"container_context\": {}, \"container_image\": \"<http://flcontainers.azurecr.io/dagster_code:latest\|flcontainers.azurecr.io/dagster_code:latest\>", \"entry_point\": [\"dagster\"], \"executable_path\": \"/usr/local/bin/python\"}}, \"pipeline_run_id\": \"40a3c5f8-074f-46e6-b2e6-7d6461b15a91\", \"set_exit_code_on_failure\": null}"
this was working with the daemon before, it's when I changed the configuration on dagit that this error starting occurring
d

daniel

02/24/2023, 8:13 PM
That command is a big mess but the relevant bit is buried in there
env_vars:\\n- POSTGRES_USER\\n- POSTGRES_PASSWORD\\n- POSTGRES_DB\\n- POSTGRES_HOST\\n- DAGSTER_CURRENT_IMAGE\\n- POSTGRES_HOST\\n- POSTGRES_USER\\n- POSTGRES_PASSWORD\\n- POSTGRES_DB\\n- FLUSH_USER\\n- FLUSH_PASS\\n- FLUSH_DISPATCH_SERVER\\n- FLUSH_DISPATCH_DB\\n- AIRBYTE_PASSWORD\\n- AIRBYTE_HOST\\n- DAGSTER_DEPLOYMENT\\n
(no DOCKER_REG_PASSWORD) - that's the daemon serializing its dagster.yaml and passing it into the command
d

Danny Steffy

02/24/2023, 8:28 PM
is there a way to restart the daemon from inside the container?
d

daniel

02/24/2023, 8:29 PM
Hmm not that i'm aware of - possibly a way to do it in docker, but i'm not sure
d

Danny Steffy

02/24/2023, 8:29 PM
because I can see that the dagster.yaml has the DOCKER_REG_PASSWORD inside the daemon container
d

daniel

02/24/2023, 8:31 PM
ah is that because its mounted from an external file?
d

Danny Steffy

02/24/2023, 8:32 PM
it definitely is mounted from a file on the host vm
d

daniel

02/24/2023, 8:32 PM
Got it - that would require a restart when that file changes, yeah
which I would typically do by stopping and restarting the container
d

Danny Steffy

02/24/2023, 8:36 PM
hm I think I must have had a stuck container that was spun up? I killed the dagster_code code container that had been spun up for the run, restarted the daemon, and then reexecuted the run. It finished successfully now
🎉 1
thanks for the help!
9 Views