Heiner Hippke
08/31/2023, 2:15 PMDetected run worker status UNKNOWN: 'dagster_k8s.client.DagsterK8sUnrecoverableAPIError: Unexpected error encountered in Kubernetes API Client.
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/launcher.py", line 410, in check_run_worker_health
status = self._api_client.get_job_status(
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 427, in get_job_status
return k8s_api_retry(_get_job_status, max_retries=3, timeout=wait_time_between_attempts)
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 128, in k8s_api_retry
raise DagsterK8sUnrecoverableAPIError(
The above exception was caused by the following exception:
kubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'f39347e9-d683-4509-a87a-6077207cf6a8', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'a214b1d3-7600-4e95-8010-e2533075c735', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'd0a2a6a0-214f-46d0-9b4c-226458af0cf2', 'Date': 'Thu, 31 Aug 2023 11:14:08 GMT', 'Content-Length': '286'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"dagster-run-020ca228-b897-4f32-ad84-1f4e37e141f7\" not found","reason":"NotFound","details":{"name":"dagster-run-020ca228-b897-4f32-ad84-1f4e37e141f7","group":"batch","kind":"jobs"},"code":404}
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 109, in k8s_api_retry
return fn()
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 424, in _get_job_status
job = self.batch_api.read_namespaced_job_status(job_name, namespace=namespace)
File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api/batch_v1_api.py", line 2815, in read_namespaced_job_status
return self.read_namespaced_job_status_with_http_info(name, namespace, **kwargs) # noqa: E501
File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api/batch_v1_api.py", line 2902, in read_namespaced_job_status_with_http_info
return self.api_client.call_api(
File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 373, in request
return self.rest_client.GET(url,
File "/usr/local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 241, in GET
return self.request("GET", url,
File "/usr/local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 235, in request
raise ApiException(http_resp=r)
'. Marking run 020ca228-b897-4f32-ad84-1f4e37e141f7 as failed, because it has surpassed the configured maximum attempts to resume the run: 0.
I am wondering why the k8sRunLauncher thinks, it must check the job status of the job started by execute_job. As I understand execute_job is not even launched by the RunLauncher (https://docs.dagster.io/deployment/run-launcher). I dont want to use execute_in_process since I want to use multiprocess_executor for execution (https://docs.dagster.io/_apidocs/execution#dagster.execute_job).
In my local development environment this works like a charm. In th k8s deployment, it seems to me, that the RunLauncher thinks, ther must be a k8s pod, that it must check the status for. But this pod is never launched, since the job is executed by multiprocess_executor.
Any help would be appreciated!
Heinerjohann
09/01/2023, 2:16 PMjohann
09/01/2023, 2:18 PMcheck_run_health
to ignore runs with some custom tag that you specify for these special runs.
Heres a guide to the latter: https://github.com/dagster-io/dagster/discussions/15856Heiner Hippke
09/04/2023, 5:16 AMdef _get_full_load_job_from_exasol_source(
materialize_assets,
asset_selection,
trg_table_name,
max_partition_chunk_size,
partition_start_date,
scd2,
full_load_mode,
):
if local_mode:
job_config = {}
job_tags = {}
else:
job_config = {
"execution": {
"config": {
"multiprocess": {
"max_concurrent": 5
}
}
}
}
job_tags = {
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "1", "memory": "2Gi"},
"limits": {"cpu": "2", "memory": "4Gi"},
},
},
},
}
@op(name=f"{trg_table_name}_get_full_load_chunks", out=DynamicOut())
def get_chunks():
pr = get_full_load_partition_ranges(
start_date=partition_start_date
if full_load_mode.lower() != "coin"
else get_fico_full_load_start_date(),
max_partition_chunk_size=max_partition_chunk_size,
)
if scd2:
# add a single partition part for the time before partition_start_date
pr = [(SCD2_REMAINDER_PARTITION_KEY, SCD2_REMAINDER_PARTITION_KEY)] + pr
return [
DynamicOutput((str(part[0]), str(part[1])), mapping_key=str(i))
for i, part in enumerate(pr)
]
@op(
name=f"{trg_table_name}_materialize_asset_full_load",
required_resource_keys={
"exasol_client",
"bq_client",
},
)
def materialize_asset(context, range):
tags = {
ASSET_PARTITION_RANGE_START_TAG: range[0],
ASSET_PARTITION_RANGE_END_TAG: range[1],
}
if local_mode:
tags = {**tags, **EXASOL_TAG}
client = DagsterGraphQLClient("localhost", port_number=3000)
client.submit_job_execution(f"materialize_{trg_table_name}", tags=tags)
else:
materialize(
materialize_assets,
instance=context.instance,
selection=asset_selection,
resources=context.resources._original_resource_dict,
tags=tags,
)
@job(
name=f"{trg_table_name}_full_load",
description=f"This is a job that full loads the BQ table {trg_table_name} by chunking the partitions in chunks of {max_partition_chunk_size} partitions max.",
config=job_config,
tags=job_tags,
)
def full_load_job():
chunks = get_chunks()
chunks.map(materialize_asset).collect()
return full_load_job
In essence, I'm looking for a way to trigger multiple asset materializations, each with different partition range tags, with a single click in the GUI. Perhaps there's a more straightforward approach I haven't considered yet? One idea I had was to create a job that publishes a Pub/Sub message to a topic, which a sensor could observe and yield RunRequests. However, this approach feels a bit hacky, and I haven't attempted it yet.
Ultimately, it seems like a Custom RunLauncher might be the best option for my needs. Your insights and suggestions would be greatly appreciated..
Heinerjohann
09/05/2023, 4:34 PM