Arnoud van Dommelen
02/17/2023, 10:08 AMrun_launcher:
module: dagster_aws.ecs
class: EcsRunLauncher
config:
run_task_kwargs:
capacityProviderStrategy:
- base: 1
weight: 1
capacityProvider: "FARGATE_SPOT"
As you can see in the code above, I included the capacity provider strategy to the run_task_kwargs. The only issue now is that this solution does not yet work due to the fact that ECS is not able to handle both capacityProviderStrategy and launchType. Therefore, my suggestion would be to add the following:
if not capacityProviderStrategy then launchType
else capacityProviderStrategy
I assume Dagster wrote a wrapper around this function from AWS:
response = client.run_task(
capacityProviderStrategy=[
{
'capacityProvider': 'string',
'weight': 123,
'base': 123
},
],
cluster='string',
count=123,
enableECSManagedTags=True|False,
enableExecuteCommand=True|False,
group='string',
launchType='EC2'|'FARGATE'|'EXTERNAL',
networkConfiguration={
'awsvpcConfiguration': {
'subnets': [
'string',
],
'securityGroups': [
'string',
],
'assignPublicIp': 'ENABLED'|'DISABLED'
}
},
overrides={
'containerOverrides': [
{
'name': 'string',
'command': [
'string',
],
'environment': [
{
'name': 'string',
'value': 'string'
},
],
'environmentFiles': [
{
'value': 'string',
'type': 's3'
},
],
'cpu': 123,
'memory': 123,
'memoryReservation': 123,
'resourceRequirements': [
{
'value': 'string',
'type': 'GPU'|'InferenceAccelerator'
},
]
},
],
'cpu': 'string',
'inferenceAcceleratorOverrides': [
{
'deviceName': 'string',
'deviceType': 'string'
},
],
'executionRoleArn': 'string',
'memory': 'string',
'taskRoleArn': 'string',
'ephemeralStorage': {
'sizeInGiB': 123
}
},
placementConstraints=[
{
'type': 'distinctInstance'|'memberOf',
'expression': 'string'
},
],
placementStrategy=[
{
'type': 'random'|'spread'|'binpack',
'field': 'string'
},
],
platformVersion='string',
propagateTags='TASK_DEFINITION'|'SERVICE'|'NONE',
referenceId='string',
startedBy='string',
tags=[
{
'key': 'string',
'value': 'string'
},
],
taskDefinition='string'
)
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_taskjordan
02/17/2023, 3:00 PMArnoud van Dommelen
02/20/2023, 8:25 AMJoris
02/20/2023, 9:14 AMArnoud van Dommelen
02/20/2023, 11:54 AMcapacityProviderStrategy
to the integrated Dagster tags on Job level?
For instance, in combination with FARGATE you could alter between SPOT and ON-DEMAND instances. Jobs that do not have any urgency for being processed timely could then have the tag {"ecs/capacityProvider": "FARGATE_SPOT"}
, whereas jobs that do have urgency could then have the tag {"ecs/capacityProvider": "FARGATE"}
. This follows the same approach as {"ecs/cpu": "4096"}
and {"ecs/memory": "8192"}
!
Very curious about your thoughts on this!
Thank you in advance,
Arnouddaniel
02/21/2023, 1:34 PMrun_launcher:
module: dagster_aws.ecs
class: EcsRunLauncher
config:
use_current_ecs_task_config: false
run_task_kwargs:
capacityProviderStrategy:
- base: 1
weight: 1
capacityProvider: "FARGATE_SPOT"
I think the reason its setting FARGATE is because its inferring it from the ECS task that's launching the run, setting use_current_ecs_task_config
to false would make it not do thatcluster
and/or networkConfiguration
keys to run_task_kwargs
as wellArnoud van Dommelen
02/21/2023, 3:16 PMuse_current_ecs_task_config
to false? As we use 5 different shopfloors which each have a different gRPC containing a unique conda environment activated (this cannot change!).capacityProvider
and launchType
defined. And this won't be solved when setting the use_current_ecs_task_config
to false I assume (as it still takes a base configuration from somewhere that contains launchType)?daniel
02/21/2023, 3:22 PMArnoud van Dommelen
02/21/2023, 3:31 PMdaniel
02/21/2023, 3:31 PMArnoud van Dommelen
02/21/2023, 3:38 PMcapacityProvider
?daniel
02/21/2023, 3:39 PMArnoud van Dommelen
02/21/2023, 3:39 PMdaniel
02/22/2023, 11:53 AMArnoud van Dommelen
03/09/2023, 9:35 AMjordan
03/09/2023, 3:58 PMArnoud van Dommelen
03/09/2023, 4:42 PMdaniel
04/03/2023, 10:14 PMecs/task_overrides
tag that can be used to override the ephemeral storage per-job - you can do something like@job(tags={"ecs/task_overrides": {"ephemeralStorage": {"sizeInGiB": 128}}):
def my_job():
...
Arnoud van Dommelen
04/04/2023, 2:10 PMjordan
05/15/2023, 8:02 PMArnoud van Dommelen
05/16/2023, 9:04 AM