```I'm trying to use an EcsRunLauncher and QueuedR...
# ask-community
d
Copy code
I'm trying to use an EcsRunLauncher and QueuedRunCoordinator but I keep getting this error:
Copy code
dagster._check.ParameterCheckError: Param "image" is not a str. Got None which is type <class 'NoneType'>.

  File "/usr/local/lib/python3.11/site-packages/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py", line 333, in _dequeue_run
    instance.run_launcher.launch_run(LaunchRunContext(dagster_run=run, workspace=workspace))
  File "/usr/local/lib/python3.11/site-packages/dagster_aws/ecs/launcher.py", line 370, in launch_run
    run_task_kwargs = self._run_task_kwargs(run, image, container_context)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster_aws/ecs/launcher.py", line 570, in _run_task_kwargs
    task_definition_config = DagsterEcsTaskDefinitionConfig.from_task_definition_dict(
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster_aws/ecs/tasks.py", line 126, in from_task_definition_dict
    return DagsterEcsTaskDefinitionConfig(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster_aws/ecs/tasks.py", line 55, in __new__
    check.str_param(image, "image"),
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_check/__init__.py", line 1347, in str_param
    raise _param_type_mismatch_exception(obj, str, param_name, additional_message)
Copy code
# ECS launcher
run_launcher:
  module: dagster_aws.ecs
  class: EcsRunLauncher
  config:
    container_name: "syncdag_user_code"
    include_sidecars: true
    env_vars:
      - DAGSTER_POSTGRES_USER
      - DAGSTER_POSTGRES_PASSWORD
      - DAGSTER_POSTGRES_DB
      - PG_DB_CONN_STRING
      - SYNCRETIC_ENVIRONMENT

# Custom values via env
run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 25
    tag_concurrency_limits:
      - key: "database"
        value: "redshift"
        limit: 4
      - key: "dagster/backfill"
        limit: 10
I've tried with and without the
container_name
param, but
syncdag_user_code
is the name of the container in the ecs task definition.
d
Hi drew - what image / task definition are you hoping that it use to launch the run in? Usually we recommend you run a separate Ecs task with a grpc server on it that has the DAGSTER_CURRENT_IMAGE env var set to the image to use - the daemon then uses that image to launch the run
d
Here's my current set of task definitions:
Copy code
[{
        "image": "${local.dagster_image_name}",
        "name": "${local.dagit_container_name}",
        "command": [
            "dagit","-h", "0.0.0.0", "-p","8080"
        ],
        "environment": [
          ${join(",\n", formatlist("{\"name\":\"%s\",\"value\":\"%s\"}", keys(local.dagster_variables), values(local.dagster_variables)))}
        ],
        "logConfiguration": {
          "logDriver": "awslogs",
          "options": {
            "awslogs-group": "${aws_cloudwatch_log_group.dagster.name}",
            "awslogs-region": "${var.aws_region}",
            "awslogs-stream-prefix": "dagster"
          }
        },
        "essential": true,
        "portMappings": [
            {
                "containerPort": 8080,
                "hostPort": 8080
            }
        ]
      },
      {
        "image": "${local.dagster_image_name}",
        "name": "${local.dagster_daemon_container_name}",
        "command": [
            "dagster-daemon","run"
        ],
        "environment": [
          ${join(",\n", formatlist("{\"name\":\"%s\",\"value\":\"%s\"}", keys(local.dagster_daemon_variables), values(local.dagster_daemon_variables)))}
        ],
        "logConfiguration": {
          "logDriver": "awslogs",
          "options": {
            "awslogs-group": "${aws_cloudwatch_log_group.dagster.name}",
            "awslogs-region": "${var.aws_region}",
            "awslogs-stream-prefix": "dagster"
          }
        },
        "essential": true
      },
  {
        "image": "${local.user_container_image_name}",
        "name": "${local.user_container_name}",
        "environment": [
          ${join(",\n", formatlist("{\"name\":\"%s\",\"value\":\"%s\"}", keys(local.dagster_variables), values(local.dagster_variables)))}
        ],
        "logConfiguration": {
          "logDriver": "awslogs",
          "options": {
            "awslogs-group": "${aws_cloudwatch_log_group.dagster.name}",
            "awslogs-region": "${var.aws_region}",
            "awslogs-stream-prefix": "dagster"
          }
        },
        "essential": true,
        "mountPoints": [
          {
            "sourceVolume": "${local.dagster_mounted_volume_name}",
            "containerPath": "/opt/dagster"
          }
        ],
        "portMappings": [
            {
                "containerPort": 8008,
                "hostPort": 8008
            }
        ]
      }]
but the high level is that I've got 3 containers,
user_code
,
dagit
, and
dagster-daemon
all on the same task definition
if I split
user_code
into its own task definition, how do you specify the
grpc
ip in
dagster.yml
?
d
If your user_code task definition sets the DAGSTER_CURRENT_IMAGE like in the deploy_ecs example here: https://github.com/dagster-io/dagster/blob/master/examples/deploy_ecs/docker-compose.yml#L110 - i would expect it to pick up the image correctly
er sorry, your user_code container, not task definition
d
hmm, so it seems like what is happening is that dagster is running a fresh copy of that image and not copying over any of the info in the task definition:
Copy code
{
        "image": "${local.user_container_image_name}",
        "name": "${local.user_container_name}",
        "environment": [
          ${join(",\n", formatlist("{\"name\":\"%s\",\"value\":\"%s\"}", keys(local.dagster_variables), values(local.dagster_variables)))}
        ],
        "logConfiguration": {
          "logDriver": "awslogs",
          "options": {
            "awslogs-group": "${aws_cloudwatch_log_group.dagster.name}",
            "awslogs-region": "${var.aws_region}",
            "awslogs-stream-prefix": "dagster"
          }
        },
        "essential": true,
        "mountPoints": [
          {
            "sourceVolume": "${local.dagster_mounted_volume_name}",
            "containerPath": "/opt/dagster"
          }
        ],
        "portMappings": [
            {
                "containerPort": 8008,
                "hostPort": 8008
            }
        ]
      }
d
What information are you hoping that it will copy over?
d
i.e. the
"mountPoints"
and the
"environment"
keys
I have an efs drive mounted to the
user_code
container that has the repo in it, and that seems to not survive the ecs runtask
d
those will be copied over from a container with the same name as the "container_name" field on the run launcher
d
hmm, so I'm sure that it's not doing that. how should I go about debugging this?
d
Can you pass along your latest dagster.yaml (with container_name set) and the task definition that's being used to launch the run?
oh, you know what, you're right, I gave you some bad information there. It's copying the config over from the container that was used to launch the run, not the one set in container_name
Here's the relevant code that constructs the task definition to use from the current task: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-aws/dagster_aws/ecs/tasks.py#L178-L212 Sorry for the confusion there
d
oh, it looks like you pushed a commit 2 hours ago that allows for mountPoints?
d
I did, but it should work before that if you’re using the codepath where it pulls from the calling container
The thing I added is for the path where you tell it to construct its own task definition from configuration that you provide
d
I see. how does it decide which path to follow?
d
If you supply a task_definition key to the run launcher, it will create one using that config - if not, it will use the task definition that launched the run
d
so, here are the env vars I have set:
Copy code
PG_DB_CONN_STRING : "postgresql://${var.rds_username}:${var.rds_password}@${aws_db_instance.dagster[0].address}:${aws_db_instance.dagster[0].port}/${aws_db_instance.dagster[0].name}",
    DAGSTER_POSTGRES_USER: var.rds_username,
    DAGSTER_POSTGRES_PASSWORD: var.rds_password,
    DAGSTER_POSTGRES_DB: aws_db_instance.dagster[0].name,
    DAGSTER_HOME : "${var.dagster-container-home}/",
    SYNCDAG_HOME : "/opt/dagster/syncdag/repository.py" #path to a not yet mounted drive
I think it may have something to do with the
CMD
arg in the
user_code
dockerfile:
Copy code
CMD env >> /etc/environment && exec poetry run dagster api grpc -h 0.0.0.0 -p 8008 -f $SYNCDAG_HOME
except
$SYNCDAG_HOME
does get set correctly
d
Just confirming understanding - This is no longer about the run launcher now / is more of a general ecs question about how env vars can be accessed in the user code container?
d
I'm not sure. The problem is that env vars aren't making it to the runlauncher. Neither the
mountPoints
nor the
DAGSTER_
vars. When I set the
DAGSTER_
vars in the
Dockerfile
, it correctly passes them to the runlauncher and I get the
mountPoint
error. It feels like somehow the runlauncher is launching the docker container without the ecs task definition
(because the env vars I posted above are defined in the task definition and not the
Dockerfile
itself)
d
Can you post the latest version of the dagster.yaml that you’re using? Are the mount points and env vars that you’re hoping to include present on the container that’s launching the run? (Likely the daemon container)
d
here's my task definition taken from the cluster on the aws console:
Copy code
{
    "taskDefinitionArn": "",
    "containerDefinitions": [
        {
            "name": "dagit",
            "image": "<http://dkr.ecr.us-east-1.amazonaws.com/build_ecs/dagster:devel|dkr.ecr.us-east-1.amazonaws.com/build_ecs/dagster:devel>",
            "cpu": 0,
            "portMappings": [
                {
                    "containerPort": 8080,
                    "hostPort": 8080,
                    "protocol": "tcp"
                }
            ],
            "essential": true,
            "command": [
                "dagit",
                "-h",
                "0.0.0.0",
                "-p",
                "8080"
            ],
            "environment": [
                {
                    "name": "DAGSTER_POSTGRES_USER",
                    "value": "user"
                },
                {
                    "name": "SYNCDAG_HOME",
                    "value": "/opt/dagster/syncdag/repository.py"
                },
                {
                    "name": "SYNCRETIC_ENVIRONMENT",
                    "value": "DEVELOPMENT"
                },
                {
                    "name": "DAGSTER_POSTGRES_DB",
                    "value": "rds"
                },
                {
                    "name": "DAGSTER_HOME",
                    "value": "/opt/dagster/dagster_home//"
                },
                {
                    "name": "PG_DB_CONN_STRING",
                    "value": "user:pass@url/rds"
                },
                {
                    "name": "DAGSTER_POSTGRES_PASSWORD",
                    "value": "password"
                }
            ],
            "mountPoints": [],
            "volumesFrom": [],
            
        {
            "name": "dagster_daemon",
            "image": "<http://dkr.ecr.us-east-1.amazonaws.com/build_ecs/dagster:devel|dkr.ecr.us-east-1.amazonaws.com/build_ecs/dagster:devel>",
            "cpu": 0,
            "portMappings": [],
            "essential": true,
            "command": [
                "dagster-daemon",
                "run"
            ],
            "environment": [
                {
                    "name": "PG_DB_CONN_STRING",
                    "value": "<postgresql://user:pass@url>:port/rds"
                },
            ],
            "mountPoints": [],
            "volumesFrom": [],
           
        {
            "name": "syncdag_user_code",
            "image": "<http://dkr.ecr.us-east-1.amazonaws.com/build_ecs/syncdag_user_code:devel|dkr.ecr.us-east-1.amazonaws.com/build_ecs/syncdag_user_code:devel>",
            "cpu": 0,
            "portMappings": [
                {
                    "containerPort": 8008,
                    "hostPort": 8008,
                    "protocol": "tcp"
                }
            ],
            "essential": true,
            "environment": [
                {
                    "name": "DAGSTER_POSTGRES_USER",
                    "value": "user"
                },
                {
                    "name": "SYNCDAG_HOME",
                    "value": "/opt/dagster/syncdag/repository.py"
                },
            
                {
                    "name": "DAGSTER_POSTGRES_DB",
                    "value": "rds"
                },
                {
                    "name": "DAGSTER_HOME",
                    "value": "/opt/dagster/dagster_home//"
                },
                {
                    "name": "PG_DB_CONN_STRING",
                    "value": "user:pass@url/rds"
                },
                {
                    "name": "DAGSTER_POSTGRES_PASSWORD",
                    "value": "password"
                }
            ],
            "mountPoints": [
                {
                    "sourceVolume": "dagster",
                    "containerPath": "/opt/dagster"
                }
            ],
            "volumesFrom": [],
            
    ],
    "revision": 43,
    "volumes": [
        {
            "name": "dagster",
            "efsVolumeConfiguration": {
                "fileSystemId": "fs-ID",
                "rootDirectory": "/",
                "transitEncryption": "ENABLED",
                "authorizationConfig": {
                    "accessPointId": "fsap-ID",
                    "iam": "ENABLED"
                }
            }
        }
    ],
    "status": "ACTIVE",
    "
}
specifically, I have been assuming that it used the
user_code
container. should I be setting the mount and vars on the
dagster_daemon
container?
d
It sounds like you're expecting the mountPoints from the user code container to be passed through - it's reasonable for you to expect that, but that's not currently the container that it uses
yeah, it'll use whichever container launches the run
d
ahh I see
d
(it's more common from what i've seen for the three containers you're using there to each have their own task)
d
I was thinking of doing that, but then
dagster.yaml
is static and I'd have to figure out a way to set it
when they're all in the same task, you can set dagster.yaml to
0.0.0.0
err the
grpc
d
I see
d
dope, finally got it working. you were very helpful. another hint for anyone else doing this in the future -- you need to do
RUN poetry config virtualenvs.create false
before
RUN poetry install
because the spawned run won't be able to interact w/ poetry (if you're using
poetry
)
I am still somewhat randomly getting:
Copy code
FileNotFoundError: [Errno 2] No such file or directory: '/opt/dagster/syncdag/repository.py'

  File "/usr/local/lib/python3.11/site-packages/dagster/_grpc/impl.py", line 120, in core_execute_run
    recon_pipeline.get_definition()
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 238, in get_definition
    return self.repository.get_definition().get_maybe_subset_job_def(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 117, in get_definition
    return repository_def_from_pointer(self.pointer, self.repository_load_data)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 789, in repository_def_from_pointer
    target = def_from_pointer(pointer)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 679, in def_from_pointer
    target = pointer.load_target()
             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/code_pointer.py", line 175, in load_target
    module = load_python_file(self.python_file, self.working_directory)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/code_pointer.py", line 72, in load_python_file
    os.stat(python_file)
this file is on the efs drive
so, I can materialize -- and 50% of the time, it completes successfully, 50% of the time I get this error
hmm, so it appears it isn't 50% of the time. Only my assets with
StaticPartition
s will materialize and the ones w/ no partitions fail w/ that error.
d
Hm I’m having a lot of trouble understanding why that would matter - I would think all runs would need to be able to load that file
d
got it. those runs launch from the
dagit
container instead of the
daemon
or
user_code
containers.
if they're triggered from the ui
d
Ohhh yes sorry that would do it! If you have the run queue enabled, then every run will launch from the daemon
❤️ 1