Hello everyone! Has anyone found a solution to in...
# deployment-ecs
a
Hello everyone! Has anyone found a solution to integrate FARGATE_SPOT as a capacity provider for ECS? I have tried to set the default capacity provider of the ECS cluster to FARGATE_SPOT and also tried to set the default capacity provider of the ECS services to FARGATE_SPOT, however, the Dagster jobs are still using the on-demand FARGATE capacity provider (which are significantly more expensive!). It seems to me that Dagster makes a direct call to AWS without using any configuration denoted in the ECS cluster or ECS services... Thank you in advance!
plus1 1
👍 1
So I managed to create a (quick) potential solution, however, this does require a small change on the Dagster side.
Copy code
run_launcher:
  module: dagster_aws.ecs
  class: EcsRunLauncher
  config:
    run_task_kwargs:
      capacityProviderStrategy:
        - base: 1
          weight: 1
          capacityProvider: "FARGATE_SPOT"
As you can see in the code above, I included the capacity provider strategy to the run_task_kwargs. The only issue now is that this solution does not yet work due to the fact that ECS is not able to handle both capacityProviderStrategy and launchType. Therefore, my suggestion would be to add the following:
if not capacityProviderStrategy then launchType
else capacityProviderStrategy
I assume Dagster wrote a wrapper around this function from AWS:
Copy code
response = client.run_task(
    capacityProviderStrategy=[
        {
            'capacityProvider': 'string',
            'weight': 123,
            'base': 123
        },
    ],
    cluster='string',
    count=123,
    enableECSManagedTags=True|False,
    enableExecuteCommand=True|False,
    group='string',
    launchType='EC2'|'FARGATE'|'EXTERNAL',
    networkConfiguration={
        'awsvpcConfiguration': {
            'subnets': [
                'string',
            ],
            'securityGroups': [
                'string',
            ],
            'assignPublicIp': 'ENABLED'|'DISABLED'
        }
    },
    overrides={
        'containerOverrides': [
            {
                'name': 'string',
                'command': [
                    'string',
                ],
                'environment': [
                    {
                        'name': 'string',
                        'value': 'string'
                    },
                ],
                'environmentFiles': [
                    {
                        'value': 'string',
                        'type': 's3'
                    },
                ],
                'cpu': 123,
                'memory': 123,
                'memoryReservation': 123,
                'resourceRequirements': [
                    {
                        'value': 'string',
                        'type': 'GPU'|'InferenceAccelerator'
                    },
                ]
            },
        ],
        'cpu': 'string',
        'inferenceAcceleratorOverrides': [
            {
                'deviceName': 'string',
                'deviceType': 'string'
            },
        ],
        'executionRoleArn': 'string',
        'memory': 'string',
        'taskRoleArn': 'string',
        'ephemeralStorage': {
            'sizeInGiB': 123
        }
    },
    placementConstraints=[
        {
            'type': 'distinctInstance'|'memberOf',
            'expression': 'string'
        },
    ],
    placementStrategy=[
        {
            'type': 'random'|'spread'|'binpack',
            'field': 'string'
        },
    ],
    platformVersion='string',
    propagateTags='TASK_DEFINITION'|'SERVICE'|'NONE',
    referenceId='string',
    startedBy='string',
    tags=[
        {
            'key': 'string',
            'value': 'string'
        },
    ],
    taskDefinition='string'
)
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task
Would this be something that can be fixed? It would add great value to us, considering FARGATE_SPOT instances are approximately 70% less expensive!
D 2
j
Seems reasonable - can you open an issue on the Dagster github repo? We probably need to do some additional input sanitization here to support this: https://github.com/dagster-io/dagster/blob/a1d5d14678a89f02b85b34fad78152b5bd97a164/python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py#L145-L162
a
Hi Jordan! Here the url to the issue: https://github.com/dagster-io/dagster/issues/12437
@jordan, any idea about the timeline of implementation? Thanks!
j
This will help save my company so much money!
🌈 1
a
Hi @jordan, As an addition to the feature request above, could it also be possible to include the
capacityProviderStrategy
to the integrated Dagster tags on Job level? For instance, in combination with FARGATE you could alter between SPOT and ON-DEMAND instances. Jobs that do not have any urgency for being processed timely could then have the tag
{"ecs/capacityProvider": "FARGATE_SPOT"}
, whereas jobs that do have urgency could then have the tag
{"ecs/capacityProvider": "FARGATE"}
. This follows the same approach as
{"ecs/cpu": "4096"}
and
{"ecs/memory": "8192"}
! Very curious about your thoughts on this! Thank you in advance, Arnoud
d
@Arnoud van Dommelen what if you set this?
Copy code
run_launcher:
  module: dagster_aws.ecs
  class: EcsRunLauncher
  config:
    use_current_ecs_task_config: false
    run_task_kwargs:
      capacityProviderStrategy:
        - base: 1
          weight: 1
          capacityProvider: "FARGATE_SPOT"
I think the reason its setting FARGATE is because its inferring it from the ECS task that's launching the run, setting
use_current_ecs_task_config
to false would make it not do that
if you do that it's possible you'll need to add the
cluster
and/or
networkConfiguration
keys to
run_task_kwargs
as well
a
Hi @daniel, does it still take the task definition of the corresponding gRPC if I set the
use_current_ecs_task_config
to false? As we use 5 different shopfloors which each have a different gRPC containing a unique conda environment activated (this cannot change!).
Also, the error I got from boto3 was that it mentioned that you cannot have both
capacityProvider
and
launchType
defined. And this won't be solved when setting the
use_current_ecs_task_config
to false I assume (as it still takes a base configuration from somewhere that contains launchType)?
d
from looking through the code, I believe it will be solved, The only place we set FARGATE explicitly is when that is pulled from the current task
And yes setting, use_current_ecs_task_config to false shouldn't affect anything task definition related
a
Oke, will check this as soon as possible! Thanks for the fast reply 🙂 What about the implementation of the Dagster reserved tag for setting the capacityProvider, as this enables job-level implementation rather than infrastructure wide? Would this be a relatively quick feature to built or does it have a lot of dependencies?
d
I think that would be a quick one - similar to this tag, but for run task kwargs instead of just overrides: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py#L427
a
Do I then need to change my current issue to the Dagster reserved tag for setting
capacityProvider
?
d
You could edit that one or file a new one - i think the ask in the original issue is still reasonable (to not set the launchType if it will lead to an invalid configuration)
❤️ 1
a
Oke, then I will issue a new one and refer to it on the already existing issue!
@daniel Is there a way for me to speed up the process for integrating this reserved Dagster tag issue?
d
The fastest way for something relatively scoped and small like that tag would probably be to send out the PR that adds it: https://docs.dagster.io/community/contributing
a
Hi @daniel, We are currently working on a PR for the FARGATE_SPOT integration within Dagster and now have a version running where we can assign the capacityProvider tag to a job to tell ECS what strategy to use! This brings us to the following: We are aware that FARGATE_SPOT has limited availability during business hours, hence we integrated a signal handler that would deal with a SIGTERM message from AWS in order to make sure the Dagster jobs can shutdown gracefully before receiving a SIGKIL message. However, the Dagster jobs are immediately set to failure ("failed to start") when there is insufficient capacity available (on AWS servers) at the time of dequeuing, thus not reaching our implemented signal handler! Idea: In order to catch errors like this (see attached screenshot), it would be a great addition to the daemon to implement a try/catch, where it tries FARGATE_SPOT (the assigned capacity provider) and executes FARGATE (the default capacity provider) in case insufficient capacity is available. Would this be an interesting feature to Dagster as well or is this too case specific (as we would prefer not to investigate in something that will not become part of the Dagster package at some point)? Thanks in advance! Arnoud :)
j
Is there any way to have ECS handle this kind of fallback automatically (maybe with capacity provider strategies? although I think that might just determine how it initially tries to place a task)? Otherwise, sounds like a reasonable proposal. We already have other places where we have unique behavior written for certain scenarios (for example, our mem/cpu defaults are different if you run on Windows). If you do submit it, can you do so as a separate PR?
🌈 1
a
Hi @jordan, will take a look to see if it is solvable within ECS first obviously! Concerning the PR's, we will create separate issues on GitHub with corresponding PR 🙂
👍 1
Hi @jordan @daniel, we have created a PR for the capacity provider strategy and simultaneously also added the feature to customize ephemeral storage on job level. https://github.com/dagster-io/dagster/pull/13217
d
@Arnoud van Dommelen - still working through the PR, thanks so much for sending that out. One thing I just realized while reviewing it is that there is actually already a (not currently documented)
ecs/task_overrides
tag that can be used to override the ephemeral storage per-job - you can do something like
Copy code
@job(tags={"ecs/task_overrides": {"ephemeralStorage": {"sizeInGiB": 128}}):
def my_job():
  ...
a
@daniel, thanks for the update, hope to see it live soon 🙂! Will you still include the ephemeral storage tag as we proposed it (more intuitive way)?
Hi @daniel, I found out that the ephemeral storage tag has changed in the launcher.py to "ecs/ephemeral_storage", whereas the documentation stated "ecs/ephermeralStorage"! Could this be changed in the documentation? 🙂 Cheers, Arnoud
Hi @jordan, We have created a procedure that Dagster automatically uses the on-demand instances when no spot instances are available (working towards a PR). However, we were wondering if we can further improve this by requeuing the Dagster runs when no spot instances were available (useful for Dagster runs that have no rush for instance). Hence the following question: Is there a (relatively) simple way to re-enqueue Dagster runs that were rejected due to lack of available spot instances so we can handle this inside the dagster-ecs launch_run definition? We are willing to develop this part, however, some pointers in the right direction would help us a lot in order to speed up the development process! Thank you in advance, Arnoud
j
How much can you lean into Dagster’s existing run retries here? https://docs.dagster.io/deployment/run-retries#run-retries
(unrelated but our slack archives messages older than 90 days to https://discuss.dagster.io/ - so if run retries aren’t sufficient, it might be worth reposting this latest question to a new thread. otherwise it might disappear in the next week or so)
a
Thanks for the heads up! Will try to find some time today to implement the retry on a separate environment and try to force the retries (need a big scale due to availability spot instances...). Maybe you can answer the following questions as well: Does the retry of the job put the job back in the queue? Is the retry triggered when the launch_run results in a "failed to start" (outside job scope)? Will resubmit the thread in case my test did not work! Kind regards, Arnoud
I created a new thread just in case already!