Hi <#C01U954MEER|dagster-support>! I cant manage ...
# ask-community
h
Hi #dagster-support! I cant manage to make a job working that consists of an op in which execute_job is called in a k8s deployment. Is that even possible?! I keep getting error messages like:
Copy code
Detected run worker status UNKNOWN: 'dagster_k8s.client.DagsterK8sUnrecoverableAPIError: Unexpected error encountered in Kubernetes API Client.

Stack Trace:
  File "/usr/local/lib/python3.10/site-packages/dagster_k8s/launcher.py", line 410, in check_run_worker_health
    status = self._api_client.get_job_status(
  File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 427, in get_job_status
    return k8s_api_retry(_get_job_status, max_retries=3, timeout=wait_time_between_attempts)
  File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 128, in k8s_api_retry
    raise DagsterK8sUnrecoverableAPIError(

The above exception was caused by the following exception:
kubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'f39347e9-d683-4509-a87a-6077207cf6a8', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'a214b1d3-7600-4e95-8010-e2533075c735', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'd0a2a6a0-214f-46d0-9b4c-226458af0cf2', 'Date': 'Thu, 31 Aug 2023 11:14:08 GMT', 'Content-Length': '286'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"dagster-run-020ca228-b897-4f32-ad84-1f4e37e141f7\" not found","reason":"NotFound","details":{"name":"dagster-run-020ca228-b897-4f32-ad84-1f4e37e141f7","group":"batch","kind":"jobs"},"code":404}



Stack Trace:
  File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 109, in k8s_api_retry
    return fn()
  File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 424, in _get_job_status
    job = self.batch_api.read_namespaced_job_status(job_name, namespace=namespace)
  File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api/batch_v1_api.py", line 2815, in read_namespaced_job_status
    return self.read_namespaced_job_status_with_http_info(name, namespace, **kwargs)  # noqa: E501
  File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api/batch_v1_api.py", line 2902, in read_namespaced_job_status_with_http_info
    return self.api_client.call_api(
  File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
  File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
  File "/usr/local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
  File "/usr/local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 241, in GET
    return self.request("GET", url,
  File "/usr/local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 235, in request
    raise ApiException(http_resp=r)
'. Marking run 020ca228-b897-4f32-ad84-1f4e37e141f7 as failed, because it has surpassed the configured maximum attempts to resume the run: 0.
I am wondering why the k8sRunLauncher thinks, it must check the job status of the job started by execute_job. As I understand execute_job is not even launched by the RunLauncher (https://docs.dagster.io/deployment/run-launcher). I dont want to use execute_in_process since I want to use multiprocess_executor for execution (https://docs.dagster.io/_apidocs/execution#dagster.execute_job). In my local development environment this works like a charm. In th k8s deployment, it seems to me, that the RunLauncher thinks, ther must be a k8s pod, that it must check the status for. But this pod is never launched, since the job is executed by multiprocess_executor. Any help would be appreciated! Heiner
j
You’re manually invoking execute_job? That’s an interesting pattern that we don’t really support currently. The run monitor daemon sees the new run in the database, and since the K8sRunLauncher is configured on the instance, it tries to check the status of a k8s job (which doesn’t exist)
You could either: • turn off run monitoring • write a custom run launcher that overrides
check_run_health
to ignore runs with some custom tag that you specify for these special runs. Heres a guide to the latter: https://github.com/dagster-io/dagster/discussions/15856
h
Hi @johann, That's exactly how it appears to me. Thank you very much for the clarification. I also appreciate your suggestion to write a custom launcher. However, I would prefer to stick with the standard Launcher. Let me explain why I am attempting to use `execute_job`: I've implemented a data pipeline that involves several data transfers from an Exasol database to Google BigQuery. The data is partitioned by date, and what I'm trying to achieve is a job that initially loads all partitions of one of these assets in multiple backfill runs. To manage this efficiently, I divide the total partitions into smaller chunks since a single load operation can be very large. My goal is to use parallel execution with three concurrent Exasol sessions to handle this workload. Here's what the job currently looks like:
Copy code
def _get_full_load_job_from_exasol_source(
    materialize_assets,
    asset_selection,
    trg_table_name,
    max_partition_chunk_size,
    partition_start_date,
    scd2,
    full_load_mode,
):
    if local_mode:
        job_config = {}
        job_tags = {}
    else:
        job_config = {
            "execution": {
                "config": {
                    "multiprocess": {
                        "max_concurrent": 5
                    }
                }
            }
        }
        job_tags = {
            "dagster-k8s/config": {
                "container_config": {
                    "resources": {
                        "requests": {"cpu": "1", "memory": "2Gi"},
                        "limits": {"cpu": "2", "memory": "4Gi"},
                    },
                },
            },
        }

    @op(name=f"{trg_table_name}_get_full_load_chunks", out=DynamicOut())
    def get_chunks():
        pr = get_full_load_partition_ranges(
            start_date=partition_start_date
            if full_load_mode.lower() != "coin"
            else get_fico_full_load_start_date(),
            max_partition_chunk_size=max_partition_chunk_size,
        )
        if scd2:
            # add a single partition part for the time before partition_start_date
            pr = [(SCD2_REMAINDER_PARTITION_KEY, SCD2_REMAINDER_PARTITION_KEY)] + pr

        return [
            DynamicOutput((str(part[0]), str(part[1])), mapping_key=str(i))
            for i, part in enumerate(pr)
        ]

    @op(
        name=f"{trg_table_name}_materialize_asset_full_load",
        required_resource_keys={
            "exasol_client",
            "bq_client",
        },
    )
    def materialize_asset(context, range):
        tags = {
            ASSET_PARTITION_RANGE_START_TAG: range[0],
            ASSET_PARTITION_RANGE_END_TAG: range[1],
        }
        if local_mode:
            tags = {**tags, **EXASOL_TAG}
            client = DagsterGraphQLClient("localhost", port_number=3000)
            client.submit_job_execution(f"materialize_{trg_table_name}", tags=tags)
        else:
            materialize(
                materialize_assets,
                instance=context.instance,
                selection=asset_selection,
                resources=context.resources._original_resource_dict,
                tags=tags,
            )

    @job(
        name=f"{trg_table_name}_full_load",
        description=f"This is a job that full loads the BQ table {trg_table_name} by chunking the partitions in chunks of {max_partition_chunk_size} partitions max.",
        config=job_config,
        tags=job_tags,
    )
    def full_load_job():
        chunks = get_chunks()
        chunks.map(materialize_asset).collect()

    return full_load_job
In essence, I'm looking for a way to trigger multiple asset materializations, each with different partition range tags, with a single click in the GUI. Perhaps there's a more straightforward approach I haven't considered yet? One idea I had was to create a job that publishes a Pub/Sub message to a topic, which a sensor could observe and yield RunRequests. However, this approach feels a bit hacky, and I haven't attempted it yet. Ultimately, it seems like a Custom RunLauncher might be the best option for my needs. Your insights and suggestions would be greatly appreciated.. Heiner
j
interesting. It’s possible with a couple clicks in the backfill dialog I suppose, but yeah nothing that allows multiple ranges with one click