REPOST OF OLD THREAD: Hi <@U01EK4V7BM4>, We have...
# deployment-ecs
a
REPOST OF OLD THREAD: Hi @jordan, We have created a procedure that Dagster automatically uses the on-demand instances when no spot instances are available (working towards a PR). However, we were wondering if we can further improve this by requeuing the Dagster runs when no spot instances were available (useful for Dagster runs that have no rush for instance). Hence the following question: Is there a (relatively) simple way to re-enqueue Dagster runs that were rejected due to lack of available spot instances so we can handle this inside the dagster-ecs launch_run definition? We are willing to develop this part, however, some pointers in the right direction would help us a lot in order to speed up the development process! Thank you in advance, Arnoud
Latest outstanding questions: Does the retry of the job put the job back in the queue? Is the retry triggered when the launch_run results in a "failed to start" (outside job scope)?
d
run retries puts the run back in the queue, and are triggered if the fun fails for any reason (including failing to start)
❤️ 1
are they rejected right away synchronously when the run_task call is made? Or does it happen later? if you have a stack trace from a failed run that would help
a
Hi @daniel, thanks for the response! This would already resolve the issue we are facing. I could not get the test out in time yesterday due to some other things that came up unfortunately. Will try to get it done today and will get back to you whether it works 🙂 P.s. I really appreciate the effort you guys put in the community in order to help everyone out and support further development of the Dagster platform! Keep being awesome 🌈 Cheers, Arnoud
Hi @daniel, can you link me the main source code where the run retries are defined? I need to slightly modify this in order to avoid having to use a persistent storage for the event logs. Also, I need to specifically tell Dagster to retry the job in a specific case and not in a general sense (it could happen that jobs fail where a retry is not desired!)
What you're describing doesn't sound like a trivial change to me (at least as something that we could upstream)
a
Hi @daniel, How can I initialize the
workspace_context
, as I need it as input for retry_run definition (
workspace_context: IWorkspaceProcessContext
inside the definition)? I was not able to find a proper example in order to do so unfortunately... Once this is done, the code should run! Thank you in advance!
d
I don’t think we have a great user facing way to do that - it’s not part of the public API
Maybe if you sent out what you have so far we could take a look - but that function is intended to be called by the daemon at a specific time as part of the system operation, not by users
a
Copy code
def retry_run(
        failed_run: DagsterRun,
        workspace_context: IWorkspaceProcessContext,
    ) -> None:
        """Submit a retry as a re-execute from failure."""
        instance = workspace_context.instance
        workspace = workspace_context.create_request_context()
        if not failed_run.external_pipeline_origin:
            instance.report_engine_event(
                "Run does not have an external pipeline origin, unable to retry the run.",
                failed_run,
            )
            return

        origin = failed_run.external_pipeline_origin.external_repository_origin
        code_location = workspace.get_code_location(origin.code_location_origin.location_name)
        repo_name = origin.repository_name

        if not code_location.has_repository(repo_name):
            instance.report_engine_event(
                (
                    f"Could not find repository {repo_name} in location {code_location.name}, unable to"
                    " retry the run. It was likely renamed or deleted."
                ),
                failed_run,
            )
            return

        external_repo = code_location.get_repository(repo_name)

        if not external_repo.has_external_job(failed_run.pipeline_name):
            instance.report_engine_event(
                (
                    f"Could not find job {failed_run.pipeline_name} in repository {repo_name}, unable"
                    " to retry the run. It was likely renamed or deleted."
                ),
                failed_run,
            )
            return

        external_pipeline = code_location.get_external_pipeline(
            PipelineSelector(
                location_name=origin.code_location_origin.location_name,
                repository_name=repo_name,
                pipeline_name=failed_run.pipeline_name,
                solid_selection=failed_run.solid_selection,
                asset_selection=None
                if failed_run.asset_selection is None
                else list(failed_run.asset_selection),
            )
        )

        strategy = ReexecutionStrategy.ALL_STEPS

        new_run = instance.create_reexecuted_run(
            parent_run=failed_run,
            code_location=code_location,
            external_pipeline=external_pipeline,
            strategy=strategy,
            use_parent_run_tags=True,
        )

        instance.report_engine_event(
            "Retrying the run",
            failed_run,
            engine_event_data=EngineEventData({"new run": MetadataValue.dagster_run(new_run.run_id)}),
        )
        instance.report_engine_event(
            "Launched as an automatic retry",
            new_run,
            engine_event_data=EngineEventData(
                {"failed run": MetadataValue.dagster_run(failed_run.run_id)}
            ),
        )

        instance.submit_run(new_run.run_id, workspace)

    def launch_run(self, context: LaunchRunContext) -> None:
        """Launch a run in an ECS task."""
        run = context.dagster_run
        container_context = EcsContainerContext.create_for_run(run, self)

        pipeline_origin = check.not_none(context.pipeline_code_origin)
        image = pipeline_origin.repository_origin.container_image

        # ECS limits overrides to 8192 characters including json formatting
        # <https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RunTask.html>
        # When container_context is serialized as part of the ExecuteRunArgs, we risk
        # going over this limit (for example, if many secrets have been set). This strips
        # the container context off of our pipeline origin because we don't actually need
        # it to launch the run; we only needed it to create the task definition.
        repository_origin = pipeline_origin.repository_origin

        stripped_repository_origin = repository_origin._replace(container_context={})
        stripped_pipeline_origin = pipeline_origin._replace(
            repository_origin=stripped_repository_origin
        )

        args = ExecuteRunArgs(
            pipeline_origin=stripped_pipeline_origin,
            pipeline_run_id=run.run_id,
            instance_ref=self._instance.get_ref(),
        )
        command = args.get_command_args()

        run_task_kwargs = self._run_task_kwargs(run, image, container_context)

        # Set cpu or memory overrides
        # <https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html>
        cpu_and_memory_overrides = self.get_cpu_and_memory_overrides(container_context, run)

        task_overrides = self._get_task_overrides(container_context, run)

        container_overrides: List[Dict[str, Any]] = [
            {
                "name": self._get_container_name(container_context),
                "command": command,
                # containerOverrides expects cpu/memory as integers
                **{k: int(v) for k, v in cpu_and_memory_overrides.items()},
            }
        ]

        run_task_kwargs["overrides"] = {
            "containerOverrides": container_overrides,
            # taskOverrides expects cpu/memory as strings
            **cpu_and_memory_overrides,
            **task_overrides,
        }
        run_task_kwargs["tags"] = [
            *run_task_kwargs.get("tags", []),
            *self.build_ecs_tags_for_run_task(run),
        ]

        run_task_kwargs_from_run = self._get_run_task_kwargs_from_run(run)
        run_task_kwargs.update(run_task_kwargs_from_run)

        # launchType and capacityProviderStrategy are incompatible - prefer the latter if it is set
        if "launchType" in run_task_kwargs and run_task_kwargs.get("capacityProviderStrategy"):
            del run_task_kwargs["launchType"]

        # FARGATE_SPOT safety feature 
        # -> Set capacityProviderStrategy to FARGATE if no spot instances available and spot_only is not True
        # -> Requeue job if no spot instances available and spot_only is True
        if run_task_kwargs.get("capacityProviderStrategy") == [{'capacityProvider': 'FARGATE_SPOT'}]:
            try:
                response = self.ecs.run_task(**run_task_kwargs)
                tasks = response["tasks"]
                if not tasks:
                    failures = response["failures"]
                    exceptions = []
                    for failure in failures:
                        reason = failure.get("reason")
                        exceptions.append(f"Failure reason: {reason}")
                        if reason == "Capacity is unavailable at this time. Please try again later or in a different availability zone":
                            raise SpotInstanceException()
                    # Raise regular error if error not due to spot integration
                    raise Exception(exceptions)
            except SpotInstanceException:
                spot_only = run.tags.get(
                    "ecs/spot_only", container_context.run_resources.get("spot_only")
                )
                if spot_only == True:
                    # Re-enqueue job
                    try:
                        # TODO: verify if works!
                        self.retry_run(run, {workspace_context})
                    except Exception as e:
                        raise e

                else:
                    # Set capacityProviderStrategy to FARGATE
                    run_task_kwargs["capacityProviderStrategy"] = [{'capacityProvider': 'FARGATE'}]
                    response = self.ecs.run_task(**run_task_kwargs)
                    tasks = response["tasks"]

                    if not tasks:
                        failures = response["failures"]
                        exceptions = []
                        for failure in failures:
                            arn = failure.get("arn")
                            reason = failure.get("reason")
                            detail = failure.get("detail")
                            exceptions.append(Exception(f"Task {arn} failed because {reason}: {detail}"))
                        raise Exception(exceptions)                    
        else:
            # if capacityProviderStrategy is not FARGATE_SPOT, business as usual
            response = self.ecs.run_task(**run_task_kwargs)
            tasks = response["tasks"]

            if not tasks:
                failures = response["failures"]
                exceptions = []
                for failure in failures:
                    arn = failure.get("arn")
                    reason = failure.get("reason")
                    detail = failure.get("detail")
                    exceptions.append(Exception(f"Task {arn} failed because {reason}: {detail}"))
                raise Exception(exceptions)

        arn = tasks[0]["taskArn"]
        cluster_arn = tasks[0]["clusterArn"]
        self._set_run_tags(run.run_id, cluster=cluster_arn, task_arn=arn)
        self.report_launch_events(run, arn, cluster_arn)
I have not made many adjustments to the retry_run definition, as only change there is the retry strategy (the rest should be the same as the retry_run definition defined in the daemon). The changes to the launch_run definition inside the dagster_aws/ecs/launcher.py is relatively straightforward where there are 3 situations defined: 1. No spot instances (near real time deliveries) 2. Spot instance as capacity provider, but spot is not required (deliveries in the near future) 3. Spot instance as capacity provider and spot instances required (deliveries in the future, no time pressure at all)
Also added the following class for exception handling (not super relevant here):
Copy code
class SpotInstanceException(Exception):
    def __init__(self, message='No spot instance available'):
        super(SpotInstanceException, self).__init__(message)
d
Would it be possible to send out your changes as a pr?
I don’t think you will have access to a workspace process context so you may need to fork that method
a
I can, however, there is no documentation and unit tests written for it whatsoever!
Also, did not do a lot of code optimizations yet 🙂