Grigoriy Sterin
04/19/2022, 3:38 PMjohann
04/19/2022, 3:47 PMGrigoriy Sterin
04/19/2022, 5:54 PMLiezl Puzon
04/21/2022, 7:08 PMGrigoriy Sterin
04/22/2022, 1:45 AMfrom gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import asyncio
import time
DAGIT_URL = "<YOUR DAGIT URL>"
async def cancel_all_queued_runs():
transport = AIOHTTPTransport(url=f"{DAGIT_URL}/graphql")
client = Client(transport=transport, fetch_schema_from_transport=True)
runs_query = gql(
"""
query RunsQuery {
runsOrError {
__typename
... on Runs {
results {
runId
status
}
}
}
}
"""
)
terminate_mutation = gql(
"""
mutation terminateRun ($runId: String!) {
terminatePipelineExecution (runId: $runId) {
__typename
...on TerminatePipelineExecutionSuccess {
run {
runId
}
}
...on TerminatePipelineExecutionFailure {
run {
runId
}
message
}
...on PipelineRunNotFoundError {
runId
message
}
...on PythonError {
message
stack
}
}
}
"""
)
start = time.time()
result = await client.execute_async(runs_query)
runs = [run for run in result["runsOrError"]["results"] if run["status"] == "QUEUED"]
count = len(runs)
print(f"{count} runs remaining")
while count > 0:
for run in runs:
run_id = run["runId"]
mutation = await client.execute_async(terminate_mutation, variable_values={"runId": run_id})
if (
mutation["terminatePipelineExecution"]["__typename"]
== "TerminatePipelineExecutionSuccess"
):
if mutation["terminatePipelineExecution"]["run"]["runId"] != run_id:
print(
f"run_id didn't match on success: expected {run_id} "
f"but got {mutation['terminatePipelineExecution']['run']['runId']}"
)
elif (
mutation["terminatePipelineExecution"]["__typename"]
== "TerminatePipelineExecutionFailure"
):
print(
f"Failed to delete run {run_id}: {mutation['terminatePipelineExecution']['message']}"
)
elif mutation["terminatePipelineExecution"]["__typename"] == "PipelineRunNotFoundError":
print(
f"Run not found for {run_id}: {mutation['terminatePipelineExecution']['message']}"
)
elif mutation["terminatePipelineExecution"]["__typename"] == "PythonError":
formatted_stack = "\n".join(mutation["terminatePipelineExecution"]["stack"])
print(
f"Python error while deleting run {run_id}: {mutation['terminatePipelineExecution']['message']}\n{formatted_stack}"
)
result = await client.execute_async(runs_query)
runs = [run for run in result["runsOrError"]["results"] if run["status"] == "QUEUED"]
count = len(runs)
print(f"{count} runs remaining: {int(time.time() - start)} elapsed")
loop = asyncio.get_event_loop()
loop.run_until_complete(cancel_all_queued_runs())