https://dagster.io/ logo
#ask-community
Title
# ask-community
w

Wensi DING

04/07/2022, 2:17 PM
Hello all, we've deployed dagster in our k8s cluster, and we get this error once in a while during the pipeline runs:
Copy code
TimeoutError: [Errno 110] Connection timed out
  File "/home/appuser/.local/lib/python3.7/site-packages/dagster/core/execution/api.py", line 748, in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
  File "/home/appuser/.local/lib/python3.7/site-packages/dagster_celery/core_execution_loop.py", line 77, in core_celery_execution_loop
    if result.ready():
  File "/home/appuser/.local/lib/python3.7/site-packages/celery/result.py", line 317, in ready
    return self.state in self.backend.READY_STATES
  File "/home/appuser/.local/lib/python3.7/site-packages/celery/result.py", line 477, in state
    return self._get_task_meta()['status']
  File "/home/appuser/.local/lib/python3.7/site-packages/celery/result.py", line 416, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "/home/appuser/.local/lib/python3.7/site-packages/celery/backends/rpc.py", line 240, in get_task_meta
    for acc in self._slurp_from_queue(task_id, self.accept, backlog_limit):
  File "/home/appuser/.local/lib/python3.7/site-packages/celery/backends/rpc.py", line 274, in _slurp_from_queue
    binding.declare()
  File "/home/appuser/.local/lib/python3.7/site-packages/kombu/entity.py", line 606, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/home/appuser/.local/lib/python3.7/site-packages/kombu/entity.py", line 615, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "/home/appuser/.local/lib/python3.7/site-packages/kombu/entity.py", line 650, in queue_declare
    nowait=nowait,
  File "/home/appuser/.local/lib/python3.7/site-packages/amqp/channel.py", line 1163, in queue_declare
    spec.Queue.DeclareOk, returns_tuple=True,
  File "/home/appuser/.local/lib/python3.7/site-packages/amqp/abstract_channel.py", line 99, in wait
    self.connection.drain_events(timeout=timeout)
  File "/home/appuser/.local/lib/python3.7/site-packages/amqp/connection.py", line 525, in drain_events
    while not self.blocking_read(timeout):
  File "/home/appuser/.local/lib/python3.7/site-packages/amqp/connection.py", line 530, in blocking_read
    frame = self.transport.read_frame()
  File "/home/appuser/.local/lib/python3.7/site-packages/amqp/transport.py", line 326, in read_frame
    frame_header = read(7, True)
  File "/home/appuser/.local/lib/python3.7/site-packages/amqp/transport.py", line 604, in _read
    s = recv(n - len(rbuf))  # see note above
  File "/usr/local/lib/python3.7/ssl.py", line 931, in read
    return self._sslobj.read(len)
do you guys have an idea which parameter I could use to increase the related timeout value?
j

johann

04/07/2022, 2:29 PM
I’ll note that unless you need specific Celery features, the
dagster-k8s
library is generally simpler to use https://docs.dagster.io/deployment/guides/kubernetes/deploying-with-helm
I see this open issue… but it’s not clear why you would be hitting but not other users (at least that I’m aware of) https://github.com/celery/celery/issues/4039
w

Wensi DING

04/07/2022, 3:27 PM
Thanks @johann for looking at this. We are using dagster v0.12.15. And we use celery because we'd like to be able to do parallel execution for certain solids. Maybe you have some better solution for this need in the later versions?
j

johann

04/07/2022, 5:02 PM
Yes- we have a
k8s_job_executor
. I don’t remember off the top of my head which version that was added in
w

Wensi DING

04/08/2022, 7:17 AM
ok then, I guess that we'll try to upgrade our version to solve this and benefit from other new features. Thanks!
👍 1
2 Views