Hi, I’am trying to run a pipeline and after some t...
# ask-community
a
Hi, I’am trying to run a pipeline and after some time (not always the same, sometimes 1.30 minutes other times 10 mins) I get a postgres connection error and the pipeline fails. The pipeline consists of seven solids and each one of them executes a bunch of queries to BigQuery. The whole pipeline on production should take a couple of hours to finish. We don’t have this problem with our other Dagster pipelines which are a lot smaller and shorter in time. Any help is greatly appreciated, thanks!!
here’s the error stack trace:
Copy code
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 190, in _dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 312, in core_dagster_event_sequence_for_step
    for user_event in check.generator(
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 71, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 599, in _user_event_sequence_for_step_compute_fn
    for event in iterate_with_context(
  File "/usr/local/lib/python3.8/site-packages/dagster/utils/__init__.py", line 362, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 126, in execute_core_compute
    for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 92, in _yield_compute_results
    user_event_generator = compute_fn(SolidExecutionContext(compute_context), inputs)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/decorators/solid.py", line 304, in compute
    result = fn(context, **kwargs)
  File "/customer_platform_dagster/pipelines/pipeline_scv_mvp_oneoff.py", line 169, in hygiene_01
    execute(context,
  File "/customer_platform_dagster/pipelines/scv_mvp_oneoff_poc/utils.py", line 21, in decorator
    return func(*args, **kwargs)
  File "/customer_platform_dagster/pipelines/scv_mvp_oneoff_poc/run_poc.py", line 111, in execute
    utils.assert_equal(context,
  File "/customer_platform_dagster/pipelines/scv_mvp_oneoff_poc/utils.py", line 117, in assert_equal
    result, query_details = execute_sql(context, **kwargs)
  File "/customer_platform_dagster/pipelines/scv_mvp_oneoff_poc/utils.py", line 83, in execute_sql
    <http://context.log.info|context.log.info>(
  File "/usr/local/lib/python3.8/site-packages/dagster/core/log_manager.py", line 243, in info
    return self._log(<http://logging.INFO|logging.INFO>, msg, kwargs)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/log_manager.py", line 204, in _log
    logger_.log(level, message, extra=extra)
  File "/usr/local/lib/python3.8/logging/__init__.py", line 1500, in log
    self._log(level, msg, args, **kwargs)
  File "/usr/local/lib/python3.8/logging/__init__.py", line 1577, in _log
    self.handle(record)
  File "/usr/local/lib/python3.8/logging/__init__.py", line 1587, in handle
    self.callHandlers(record)
  File "/usr/local/lib/python3.8/logging/__init__.py", line 1649, in callHandlers
    hdlr.handle(record)
  File "/usr/local/lib/python3.8/logging/__init__.py", line 950, in handle
    self.emit(record)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 152, in emit
    self._instance.handle_new_event(event)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 1087, in handle_new_event
    self._event_storage.store_event(event)
  File "/usr/local/lib/python3.8/site-packages/dagster_postgres/event_log/event_log.py", line 139, in store_event
    with self._connect() as conn:
  File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.8/site-packages/dagster_postgres/utils.py", line 157, in create_pg_connection
    conn = retry_pg_connection_fn(engine.connect)
  File "/usr/local/lib/python3.8/site-packages/dagster_postgres/utils.py", line 113, in retry_pg_connection_fn
    return fn()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3095, in connect
    return self._connection_cls(self, close_with_result=close_with_result)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 91, in __init__
    else engine.raw_connection()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3174, in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3141, in _wrap_pool_connect
    return fn()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 301, in connect
    return _ConnectionFairy._checkout(self)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 755, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 419, in checkout
    rec = pool._do_get()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 259, in _do_get
    return self._create_connection()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 247, in _create_connection
    return _ConnectionRecord(self)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 362, in __init__
    self.__connect(first_connect_check=True)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 599, in __connect
    connection = pool._invoke_creator(self)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/create.py", line 578, in connect
    return dialect.connect(*cargs, **cparams)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 548, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/usr/local/lib/python3.8/site-packages/psycopg2/__init__.py", line 127, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
  File "/usr/local/lib/python3.8/site-packages/dagster/utils/interrupts.py", line 78, in _new_signal_handler
    raise error_cls()
cc @Sri Kadiyala
d
Hi Andreas - thanks for the stack trace, do you have the text of the error message as well just above the trace? Might help us narrow it down
a
yes, sorry forgot to copy that too. Here:
dagster.core.errors.DagsterExecutionInterruptedError
d
Interesting - so that error should only be triggered if your pipeline process receives an interrupt signal somehow (which shouldn't be initiated from Dagster unless you attempted to manually terminate the pipeline). Is there anything else you can think of that would be trying to kill or terminate the pipeline process?
a
hmm interesting .. the only other thing I can think of is that there is an error with BigQuery sql execution or connection but whenever I had such errors in the past they were clearly reported in Dagit.
a
if there is nothing too sensitive in the event log - you could send over a debug file. There is a download debug file option in the
...
menu on the runs page
s
Hello Alex, here is the dbug file:
a
Thanks, thats useful. In this case kubernetes seems to be attempting to move the pod which we currently do not handle gracefully. You may need to adjust any k8s autoscaling systems you are using - or include tags on the dagster runs to prevent them participating