https://dagster.io/ logo
Title
a

Arun Kumar

12/15/2021, 11:52 AM
Hi team, our Dagster jobs are occasionally failing with the following error from multi-process executors with the following error. However it succeeds when I re-execute the job. Our op just runs a snowflake query using
dagster-snowflake
resource. Any thoughts on why this could happen?
An exception was thrown during execution that is likely a framework error, rather than an error in user code.
dagster.check.CheckError: Invariant failed. Description: Pipeline run analyses_exposures_loader (b15f4f49-05ae-45ad-ab00-60bf07c9797f) in state PipelineRunStatus.FAILURE, expected NOT_STARTED or STARTING

Stack Trace:
  File "/usr/local/lib/python3.7/site-packages/dagster/grpc/impl.py", line 86, in core_execute_run
    yield from execute_run_iterator(recon_pipeline, pipeline_run, instance)
,  File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/api.py", line 77, in execute_run_iterator
    pipeline_run.pipeline_name, pipeline_run.run_id, pipeline_run.status
,  File "/usr/local/lib/python3.7/site-packages/dagster/check/__init__.py", line 170, in invariant
    raise CheckError(f"Invariant failed. Description: {desc}")
More stack trace from dagit.
KeyboardInterrupt
  File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_plan.py", line 195, in _dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 324, in core_dagster_event_sequence_for_step
    _step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 66, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/compute.py", line 140, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/compute.py", line 122, in _yield_compute_results
    user_event_generator,
  File "/usr/local/lib/python3.7/site-packages/dagster/utils/__init__.py", line 383, in iterate_with_context
    next_output = next(iterator)
  File "/home/app/etl/ops/load_analysis_exposures.py", line 99, in load_analysis_exposures
    results = snowflake.execute_query(insert_sql, fetch_results=True)
  File "/usr/local/lib/python3.7/site-packages/dagster_snowflake/resources.py", line 105, in execute_query
    cursor.execute(sql, parameters)  # pylint: disable=E1101
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/cursor.py", line 697, in execute
    ret = self._execute_helper(query, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/cursor.py", line 512, in _execute_helper
    _no_results=_no_results,
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/connection.py", line 960, in cmd_query
    _include_retry_params=True,
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/network.py", line 469, in request
    _include_retry_params=_include_retry_params,
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/network.py", line 748, in _post_request
    result_url, headers, token=self.token, timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/network.py", line 662, in _get_request
    socket_timeout=socket_timeout,
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/network.py", line 791, in fetch
    session, method, full_url, headers, data, retry_ctx, **kwargs
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/network.py", line 840, in _request_exec_wrapper
    **kwargs,
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/network.py", line 996, in _request_exec
    auth=SnowflakeAuth(token),
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
    response.begin()
  File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/urllib3/contrib/pyopenssl.py", line 331, in recv_into
    if not util.wait_for_read(self.socket, self.socket.gettimeout()):
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/urllib3/util/wait.py", line 146, in wait_for_read
    return wait_for_socket(sock, read=True, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/urllib3/util/wait.py", line 107, in poll_wait_for_socket
    return bool(_retry_on_intr(do_poll, timeout))
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/urllib3/util/wait.py", line 43, in _retry_on_intr
    return fn(timeout)
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/urllib3/util/wait.py", line 105, in do_poll
    return poll_obj.poll(t)
  File "/usr/local/lib/python3.7/site-packages/snowflake/connector/cursor.py", line 491, in interrupt_handler
    raise KeyboardInterrupt
j

Josh Taylor

12/15/2021, 12:14 PM
sometimes sf requests just don't work, either due to flakiness of http or other aspects (sometimes we get 503's). we've had this happen where we just need to retry it multiple times.
:thankyou: 1
j

johann

12/15/2021, 3:22 PM
How do you have dagster deployed? What run launcher are you using?
a

Arun Kumar

12/15/2021, 7:35 PM
Hi @johann, Dagster is deployed using helm and we use K8sRunLauncher to launch every job in a separate pod
j

johann

12/15/2021, 7:53 PM
Hi Arun- i’m curious if you can find the pod logs for this job. They will have the form
dagster-run-<run_id><random suffix>
I’m particularly curious if you can find two for the same run id
a

Arun Kumar

12/15/2021, 7:55 PM
Yes, I looked into the pod logs and could not find any other extra information
.
.
time="2021-12-15T11:20:06Z" level=info msg="spawning process: [dagster api execute_run {\"__class__\": \"ExecuteRunArgs\", \"instance_ref\": null, \"pipeline_origin\": {\"__class__\": \"PipelinePythonOrigin\", \"pipeline_name\": \"analyses_exposures_loader\", \"repository_origin\": {\"__class__\": \"RepositoryPythonOrigin\", \"code_pointer\": {\"__class__\": \"FileCodePointer\", \"fn_name\": \"metrics_repo\", \"python_file\": \"/home/app/etl/repo.py\", \"working_directory\": \"/home/app\"}, \"container_image\": \"<http://611706558220.dkr.ecr.us-west-2.amazonaws.com/metrics-repo:latest\|611706558220.dkr.ecr.us-west-2.amazonaws.com/metrics-repo:latest\>", \"executable_path\": \"/usr/local/bin/python\"}}, \"pipeline_run_id\": \"b15f4f49-05ae-45ad-ab00-60bf07c9797f\"}]"
.
.
.
{"__class__": "ExecuteRunArgsLoadComplete"}
{"__class__": "DagsterEvent", "event_specific_data": {"__class__": "EngineEventData", "error": {"__class__": "SerializableErrorInfo", "cause": null, "cls_name": "CheckError", "message": "dagster.check.CheckError: Invariant failed. Description: Pipeline run analyses_exposures_loader (b15f4f49-05ae-45ad-ab00-60bf07c9797f) in state PipelineRunStatus.FAILURE, expected NOT_STARTED or STARTING\n", "stack": ["  File \"/usr/local/lib/python3.7/site-packages/dagster/grpc/impl.py\", line 86, in core_execute_run\n    yield from execute_run_iterator(recon_pipeline, pipeline_run, instance)\n", "  File \"/usr/local/lib/python3.7/site-packages/dagster/core/execution/api.py\", line 77, in execute_run_iterator\n    pipeline_run.pipeline_name, pipeline_run.run_id, pipeline_run.status\n", "  File \"/usr/local/lib/python3.7/site-packages/dagster/check/__init__.py\", line 170, in invariant\n    raise CheckError(f\"Invariant failed. Description: {desc}\")\n"]}, "marker_end": null, "marker_start": null, "metadata_entries": []}, "event_type_value": "ENGINE_EVENT", "logging_tags": {}, "message": "An exception was thrown during execution that is likely a framework error, rather than an error in user code.", "pid": null, "pipeline_name": "analyses_exposures_loader", "solid_handle": null, "step_handle": null, "step_key": null, "step_kind_value": null}
You mean two pods for the same run_id?
j

johann

12/15/2021, 7:56 PM
yes
Also curious about Kubernetes events for the pod object, e.g. a container restart
a

Arun Kumar

12/15/2021, 8:09 PM
Did not find multiple pods for this run id and also don't see any K8s events when I describe the pod
j

johann

12/15/2021, 8:10 PM
One more spot- could you
kubectl describe job dagster-run-<run-id>
Do you see containers failed count on the pod?
a

Arun Kumar

12/15/2021, 8:15 PM
job status looks fine to me
Name:           dagster-run-b15f4f49-05ae-45ad-ab00-60bf07c9797f
Namespace:      dagster
Selector:       controller-uid=f70ced44-ed5f-40f8-8cd4-cfd9aa664806
Labels:         <http://app.kubernetes.io/component=run_coordinator|app.kubernetes.io/component=run_coordinator>
                <http://app.kubernetes.io/instance=dagster|app.kubernetes.io/instance=dagster>
                <http://app.kubernetes.io/name=dagster|app.kubernetes.io/name=dagster>
                <http://app.kubernetes.io/part-of=dagster|app.kubernetes.io/part-of=dagster>
                <http://app.kubernetes.io/version=0.12.12|app.kubernetes.io/version=0.12.12>
Annotations:    <none>
Parallelism:    1
Completions:    1
Start Time:     Wed, 15 Dec 2021 03:15:13 -0800
Completed At:   Wed, 15 Dec 2021 03:20:05 -0800
Duration:       4m52s
Pods Statuses:  1 Running / 1 Succeeded / 0 Failed
Pod Template:
  Labels:           <http://app.kubernetes.io/component=run_coordinator|app.kubernetes.io/component=run_coordinator>
                    <http://app.kubernetes.io/instance=dagster|app.kubernetes.io/instance=dagster>
                    <http://app.kubernetes.io/name=dagster|app.kubernetes.io/name=dagster>
                    <http://app.kubernetes.io/part-of=dagster|app.kubernetes.io/part-of=dagster>
                    <http://app.kubernetes.io/version=0.12.12|app.kubernetes.io/version=0.12.12>
                    controller-uid=f70ced44-ed5f-40f8-8cd4-cfd9aa664806
                    job-name=dagster-run-b15f4f49-05ae-45ad-ab00-60bf07c9797f
  Annotations:      ...
  Service Account:  dagster
  Containers:
   dagster-run-b15f4f49-05ae-45ad-ab00-60bf07c9797f:
    Image:      ...
    Port:       <none>
    Host Port:  <none>
    Args:
      dagster
      api
      execute_run
      {"__class__": "ExecuteRunArgs", "instance_ref": null, "pipeline_origin": {"__class__": "PipelinePythonOrigin", "pipeline_name": "analyses_exposures_loader", "repository_origin": {"__class__": "RepositoryPythonOrigin", "code_pointer": {"__class__": "FileCodePointer", "fn_name": "metrics_repo", "python_file": "/home/app/etl/repo.py", "working_directory": "/home/app"}, "container_image": ..., "executable_path": "/usr/local/bin/python"}}, "pipeline_run_id": "b15f4f49-05ae-45ad-ab00-60bf07c9797f"}
.
.
.
Also, don't see any containers failure count.
Containers:
  dagster-run-b15f4f49-05ae-45ad-ab00-60bf07c9797f:
    Container ID:  ...
    Image:         ...
    Image ID:      ...
    Port:          <none>
    Host Port:     <none>
    Command: ...
    Args:
      dagster
      api
      execute_run
      {"__class__": "ExecuteRunArgs", "instance_ref": null, "pipeline_origin": {"__class__": "PipelinePythonOrigin", "pipeline_name": "analyses_exposures_loader", "repository_origin": {"__class__": "RepositoryPythonOrigin", "code_pointer": {"__class__": "FileCodePointer", "fn_name": "metrics_repo", "python_file": "/home/app/etl/repo.py", "working_directory": "/home/app"}, "container_image": ..., "executable_path": "/usr/local/bin/python"}}, "pipeline_run_id": "b15f4f49-05ae-45ad-ab00-60bf07c9797f"}
    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Wed, 15 Dec 2021 03:20:05 -0800
      Finished:     Wed, 15 Dec 2021 03:20:10 -0800
    Ready:          False
    Restart Count:  0
j

johann

12/16/2021, 4:42 PM
Hi @Arun Kumar -
Pods Statuses:  1 Running / 1 Succeeded / 0 Failed
This looks like the job ended up having 2 pods for it? (Though I’m surprised that one is still running after you’ve seen these errors)
Dagster expects that a launched K8s job will only run once. Unfortunately the K8s API doesn’t offer exactly once job launching, so we dedup using the run status check that you see the error for here (
in state PipelineRunStatus.FAILURE, expected NOT_STARTED or STARTING
). So the timeline is likely: • you launch k8s job • the Dagster run hits a
KeyboardInterrupt
and reports that to Dagit, causing the run to be FAILURE. After reporting failure to Dagit, it exits with code 0 hence being marked as Succeeded in K8s • the k8s container restarts, and the second container hits the
expected NOT_STARTED or STARTING
error You could validate this by trying something like
kubectl logs --previous <pod>
which should return the interrupt error. The mystery is why is the initial container getting interrupted (and why is it being restarted). Assuming that you didn’t manually send an interrupt to the container, it’s possible that some K8s operation (Node going down, pod getting rescheduled, etc.) is behind it. But it’s strange that this is happening frequently (is it possible that your cluster is under provisioned?).
a

Arun Kumar

12/17/2021, 10:33 AM
Thanks Johann. Looks like this error did not happen today and I will try to validate and get more information the next time when I see this error
j

johann

12/17/2021, 2:11 PM
Sounds good. My colleague also noticed that the snowflake module gets involved in interrupt handling https://github.com/snowflakedb/snowflake-connector-python/blob/master/src/snowflake/connector/cursor.py#L474 so it’s a possible source of the unexpected KeyboardInterrupt
a

Arun Kumar

12/29/2021, 9:47 AM
This happened again today and I was able to get more info today. Looks like the first pod was actually interrupted and deleted by the k8s auto-scaler. I have currently disabled auto-scaling for all the job pods.
25m         Normal    Scheduled           pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-4wztf   Successfully assigned dagster/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-4wztf to ip-172-30-40-71.us-west-2.compute.internal
25m         Normal    Pulling             pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-4wztf   Pulling image "..."
24m         Normal    Pulled              pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-4wztf   Successfully pulled image "...."
24m         Normal    Created             pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-4wztf   Created container dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d
24m         Normal    Started             pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-4wztf   Started container dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d
2m51s       Normal    ScaleDown           pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-4wztf   deleting pod for node scale down
2m51s       Normal    Killing             pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-4wztf   Stopping container dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d
2m51s       Normal    Scheduled           pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-b2nqh   Successfully assigned dagster/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-b2nqh to ip-172-30-17-8.us-west-2.compute.internal
2m50s       Normal    Pulling             pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-b2nqh   Pulling image "..."
2m50s       Normal    Pulled              pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-b2nqh   Successfully pulled image "..."
2m50s       Normal    Created             pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-b2nqh   Created container dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d
2m49s       Normal    Started             pod/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-b2nqh   Started container dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d
25m         Normal    SuccessfulCreate    job/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d         Created pod: dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-4wztf
2m51s       Normal    SuccessfulCreate    job/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d         Created pod: dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d-b2nqh
2m49s       Warning   TooManyActivePods   job/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d         Too many active pods running after completion count reached
2m49s       Normal    Completed           job/dagster-run-4db043ef-4da0-407c-8c6c-e961c18f2a1d         Job completed