Arnoud van Dommelen
05/16/2023, 1:01 PMArnoud van Dommelen
05/16/2023, 1:02 PMdaniel
05/16/2023, 8:58 PMdaniel
05/16/2023, 8:59 PMArnoud van Dommelen
05/17/2023, 7:34 AMArnoud van Dommelen
05/17/2023, 11:59 AMdaniel
05/18/2023, 3:06 PMdaniel
05/18/2023, 3:07 PMArnoud van Dommelen
05/22/2023, 10:42 AMworkspace_context
, as I need it as input for retry_run definition (workspace_context: IWorkspaceProcessContext
inside the definition)? I was not able to find a proper example in order to do so unfortunately... Once this is done, the code should run!
Thank you in advance!daniel
05/22/2023, 10:58 AMdaniel
05/22/2023, 11:06 AMArnoud van Dommelen
05/22/2023, 11:37 AMdef retry_run(
failed_run: DagsterRun,
workspace_context: IWorkspaceProcessContext,
) -> None:
"""Submit a retry as a re-execute from failure."""
instance = workspace_context.instance
workspace = workspace_context.create_request_context()
if not failed_run.external_pipeline_origin:
instance.report_engine_event(
"Run does not have an external pipeline origin, unable to retry the run.",
failed_run,
)
return
origin = failed_run.external_pipeline_origin.external_repository_origin
code_location = workspace.get_code_location(origin.code_location_origin.location_name)
repo_name = origin.repository_name
if not code_location.has_repository(repo_name):
instance.report_engine_event(
(
f"Could not find repository {repo_name} in location {code_location.name}, unable to"
" retry the run. It was likely renamed or deleted."
),
failed_run,
)
return
external_repo = code_location.get_repository(repo_name)
if not external_repo.has_external_job(failed_run.pipeline_name):
instance.report_engine_event(
(
f"Could not find job {failed_run.pipeline_name} in repository {repo_name}, unable"
" to retry the run. It was likely renamed or deleted."
),
failed_run,
)
return
external_pipeline = code_location.get_external_pipeline(
PipelineSelector(
location_name=origin.code_location_origin.location_name,
repository_name=repo_name,
pipeline_name=failed_run.pipeline_name,
solid_selection=failed_run.solid_selection,
asset_selection=None
if failed_run.asset_selection is None
else list(failed_run.asset_selection),
)
)
strategy = ReexecutionStrategy.ALL_STEPS
new_run = instance.create_reexecuted_run(
parent_run=failed_run,
code_location=code_location,
external_pipeline=external_pipeline,
strategy=strategy,
use_parent_run_tags=True,
)
instance.report_engine_event(
"Retrying the run",
failed_run,
engine_event_data=EngineEventData({"new run": MetadataValue.dagster_run(new_run.run_id)}),
)
instance.report_engine_event(
"Launched as an automatic retry",
new_run,
engine_event_data=EngineEventData(
{"failed run": MetadataValue.dagster_run(failed_run.run_id)}
),
)
instance.submit_run(new_run.run_id, workspace)
def launch_run(self, context: LaunchRunContext) -> None:
"""Launch a run in an ECS task."""
run = context.dagster_run
container_context = EcsContainerContext.create_for_run(run, self)
pipeline_origin = check.not_none(context.pipeline_code_origin)
image = pipeline_origin.repository_origin.container_image
# ECS limits overrides to 8192 characters including json formatting
# <https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RunTask.html>
# When container_context is serialized as part of the ExecuteRunArgs, we risk
# going over this limit (for example, if many secrets have been set). This strips
# the container context off of our pipeline origin because we don't actually need
# it to launch the run; we only needed it to create the task definition.
repository_origin = pipeline_origin.repository_origin
stripped_repository_origin = repository_origin._replace(container_context={})
stripped_pipeline_origin = pipeline_origin._replace(
repository_origin=stripped_repository_origin
)
args = ExecuteRunArgs(
pipeline_origin=stripped_pipeline_origin,
pipeline_run_id=run.run_id,
instance_ref=self._instance.get_ref(),
)
command = args.get_command_args()
run_task_kwargs = self._run_task_kwargs(run, image, container_context)
# Set cpu or memory overrides
# <https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html>
cpu_and_memory_overrides = self.get_cpu_and_memory_overrides(container_context, run)
task_overrides = self._get_task_overrides(container_context, run)
container_overrides: List[Dict[str, Any]] = [
{
"name": self._get_container_name(container_context),
"command": command,
# containerOverrides expects cpu/memory as integers
**{k: int(v) for k, v in cpu_and_memory_overrides.items()},
}
]
run_task_kwargs["overrides"] = {
"containerOverrides": container_overrides,
# taskOverrides expects cpu/memory as strings
**cpu_and_memory_overrides,
**task_overrides,
}
run_task_kwargs["tags"] = [
*run_task_kwargs.get("tags", []),
*self.build_ecs_tags_for_run_task(run),
]
run_task_kwargs_from_run = self._get_run_task_kwargs_from_run(run)
run_task_kwargs.update(run_task_kwargs_from_run)
# launchType and capacityProviderStrategy are incompatible - prefer the latter if it is set
if "launchType" in run_task_kwargs and run_task_kwargs.get("capacityProviderStrategy"):
del run_task_kwargs["launchType"]
# FARGATE_SPOT safety feature
# -> Set capacityProviderStrategy to FARGATE if no spot instances available and spot_only is not True
# -> Requeue job if no spot instances available and spot_only is True
if run_task_kwargs.get("capacityProviderStrategy") == [{'capacityProvider': 'FARGATE_SPOT'}]:
try:
response = self.ecs.run_task(**run_task_kwargs)
tasks = response["tasks"]
if not tasks:
failures = response["failures"]
exceptions = []
for failure in failures:
reason = failure.get("reason")
exceptions.append(f"Failure reason: {reason}")
if reason == "Capacity is unavailable at this time. Please try again later or in a different availability zone":
raise SpotInstanceException()
# Raise regular error if error not due to spot integration
raise Exception(exceptions)
except SpotInstanceException:
spot_only = run.tags.get(
"ecs/spot_only", container_context.run_resources.get("spot_only")
)
if spot_only == True:
# Re-enqueue job
try:
# TODO: verify if works!
self.retry_run(run, {workspace_context})
except Exception as e:
raise e
else:
# Set capacityProviderStrategy to FARGATE
run_task_kwargs["capacityProviderStrategy"] = [{'capacityProvider': 'FARGATE'}]
response = self.ecs.run_task(**run_task_kwargs)
tasks = response["tasks"]
if not tasks:
failures = response["failures"]
exceptions = []
for failure in failures:
arn = failure.get("arn")
reason = failure.get("reason")
detail = failure.get("detail")
exceptions.append(Exception(f"Task {arn} failed because {reason}: {detail}"))
raise Exception(exceptions)
else:
# if capacityProviderStrategy is not FARGATE_SPOT, business as usual
response = self.ecs.run_task(**run_task_kwargs)
tasks = response["tasks"]
if not tasks:
failures = response["failures"]
exceptions = []
for failure in failures:
arn = failure.get("arn")
reason = failure.get("reason")
detail = failure.get("detail")
exceptions.append(Exception(f"Task {arn} failed because {reason}: {detail}"))
raise Exception(exceptions)
arn = tasks[0]["taskArn"]
cluster_arn = tasks[0]["clusterArn"]
self._set_run_tags(run.run_id, cluster=cluster_arn, task_arn=arn)
self.report_launch_events(run, arn, cluster_arn)
Arnoud van Dommelen
05/22/2023, 11:40 AMArnoud van Dommelen
05/22/2023, 11:45 AMclass SpotInstanceException(Exception):
def __init__(self, message='No spot instance available'):
super(SpotInstanceException, self).__init__(message)
daniel
05/22/2023, 11:46 AMdaniel
05/22/2023, 11:48 AMArnoud van Dommelen
05/22/2023, 11:50 AMArnoud van Dommelen
05/22/2023, 11:51 AMArnoud van Dommelen
05/22/2023, 12:49 PM