https://dagster.io/ logo
#ask-community
Title
# ask-community
g

Grigoriy Sterin

04/19/2022, 3:38 PM
And related question: I have several thousands of queued runs which I know will hang in the same way. Is there a way to cancel/terminate them all without clicking though pages in Dagit and terminating them in batches of 25?
j

johann

04/19/2022, 3:47 PM
The best option currently to use graphql to programmatically remove them
On our radar to improve this
g

Grigoriy Sterin

04/19/2022, 5:54 PM
Would you mind giving an example of how to achieve it with Graphql? I'm not very familiar with it
Never mind - I figured it out
l

Liezl Puzon

04/21/2022, 7:08 PM
@Grigoriy Sterin do you mind sharing a code snippet? I’m having the same issue
g

Grigoriy Sterin

04/22/2022, 1:45 AM
Sure. FWIW most of the code in my snippet I took from this Github issue: https://github.com/dagster-io/dagster/issues/3441
Copy code
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import asyncio
import time

DAGIT_URL = "<YOUR DAGIT URL>"

async def cancel_all_queued_runs():
    transport = AIOHTTPTransport(url=f"{DAGIT_URL}/graphql")
    client = Client(transport=transport, fetch_schema_from_transport=True)
    runs_query = gql(
        """
        query RunsQuery {
            runsOrError {
                __typename
                ... on Runs {
                    results {
                        runId
                        status
                    }
                }
            }
        }
        """
    )
    terminate_mutation = gql(
        """
        mutation terminateRun ($runId: String!) {
            terminatePipelineExecution (runId: $runId) {
                __typename
                ...on TerminatePipelineExecutionSuccess {
                    run {
                        runId
                    }
                }
                ...on TerminatePipelineExecutionFailure {
                    run {
                        runId
                    }
                    message
                }
                ...on PipelineRunNotFoundError {
                    runId
                    message
                }
                ...on PythonError {
                    message
                    stack
                }
            }
        }
        """
    )
    
    start = time.time()
    result = await client.execute_async(runs_query)
    runs = [run for run in result["runsOrError"]["results"] if run["status"] == "QUEUED"]
    count = len(runs)
    print(f"{count} runs remaining")
    while count > 0:
        for run in runs:
            run_id = run["runId"]
            mutation = await client.execute_async(terminate_mutation, variable_values={"runId": run_id})
            if (
                mutation["terminatePipelineExecution"]["__typename"]
                == "TerminatePipelineExecutionSuccess"
            ):
                if mutation["terminatePipelineExecution"]["run"]["runId"] != run_id:
                    print(
                        f"run_id didn't match on success: expected {run_id} "
                        f"but got {mutation['terminatePipelineExecution']['run']['runId']}"
                    )
            elif (
                mutation["terminatePipelineExecution"]["__typename"]
                == "TerminatePipelineExecutionFailure"
            ):
                print(
                    f"Failed to delete run {run_id}: {mutation['terminatePipelineExecution']['message']}"
                )
            elif mutation["terminatePipelineExecution"]["__typename"] == "PipelineRunNotFoundError":
                print(
                    f"Run not found for {run_id}: {mutation['terminatePipelineExecution']['message']}"
                )
            elif mutation["terminatePipelineExecution"]["__typename"] == "PythonError":
                formatted_stack = "\n".join(mutation["terminatePipelineExecution"]["stack"])
                print(
                    f"Python error while deleting run {run_id}: {mutation['terminatePipelineExecution']['message']}\n{formatted_stack}"
                )
        result = await client.execute_async(runs_query)
        runs = [run for run in result["runsOrError"]["results"] if run["status"] == "QUEUED"]
        count = len(runs)
        print(f"{count} runs remaining: {int(time.time() - start)} elapsed")

loop = asyncio.get_event_loop()
loop.run_until_complete(cancel_all_queued_runs())
❤️ 1