Question - we've been having a bunch of issues whe...
# ask-community
j
Question - we've been having a bunch of issues when running sensors with MySQL as our run storage mechanism on 0.15.5. We are running a Docker deployment in ECS with dagit, dagster daemon, and a user code repo all deployed. We use the ECS run launcher to dispatch runs. Dagit has been able to query runs and we've had no DB issues overall. Our DB is called
workflows
where we house Dagster jobs. This has been blocking us from using sensors 😞. Any troubleshooting advice? Follow-up Q: does Dagster Daemon run the sensor directly, or does it call the user code repo every sensor invocation? I'm trying to understand if, perhaps, the user code repo Docker task isn't able to connect to our MySQL DB. Stack trace:
Copy code
dagster.core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor dagster_task_canceled
  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/errors.py", line 191, in user_code_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
sqlalchemy.exc.StatementError: (sqlalchemy.exc.ResourceClosedError) This Connection is closed
[SQL: SHOW FULL TABLES FROM `workflows`]
  File "/usr/local/lib/python3.8/site-packages/dagster/core/errors.py", line 184, in user_code_error_boundary
    yield
  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 354, in evaluate_tick
    result = list(self._evaluation_fn(context))
  File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 509, in _wrapped_fn
    for item in result:
  File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/run_status_sensor_definition.py", line 437, in _wrapped_fn
    context.instance.get_event_records(
  File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 106, in instance
    DagsterInstance.from_ref(self._instance_ref)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 451, in from_ref
    unified_storage = instance_ref.storage
  File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/ref.py", line 413, in storage
    return self.storage_data.rehydrate() if self.storage_data else None
  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 85, in rehydrate
    return klass.from_config_value(self, result.value)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/storage/legacy_storage.py", line 98, in from_config_value
    run_storage = ConfigurableClassData(
  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 85, in rehydrate
    return klass.from_config_value(self, result.value)
  File "/usr/local/lib/python3.8/site-packages/dagster_mysql/run_storage/run_storage.py", line 115, in from_config_value
    return MySQLRunStorage(inst_data=inst_data, mysql_url=mysql_url_from_config(config_value))
  File "/usr/local/lib/python3.8/site-packages/dagster_mysql/run_storage/run_storage.py", line 66, in __init__
    table_names = retry_mysql_connection_fn(db.inspect(self._engine).get_table_names)
  File "/usr/local/lib/python3.8/site-packages/dagster_mysql/utils.py", line 96, in retry_mysql_connection_fn
    return fn()
  File "<string>", line 2, in get_table_names
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/deprecations.py", line 128, in warned
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/reflection.py", line 198, in get_table_names
    tnames = self.dialect.get_table_names(
  File "<string>", line 2, in get_table_names
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/reflection.py", line 52, in cache
    ret = fn(self, con, *args, **kw)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2530, in get_table_names
    rp = connection.execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2191, in execute
    return connection.execute(statement, *multiparams, **params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 976, in execute
    return self._execute_text(object_, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1145, in _execute_text
    ret = self._execute_context(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1177, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1481, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
    raise exception
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1173, in _execute_context
    conn = self._revalidate_connection()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 463, in _revalidate_connection
    raise exc.ResourceClosedError("This Connection is closed")
s
I am not 100% on this, but I am pretty sure the sensor code is running on the user code Docker task itself, and not via the daemon.
👍 1
a
yep, user code is only ever run in side the user code servers.
j
@alex have you seen this type of error before in the past during Dagster development? We have our user code repo fully configured to have access to our MySQL DB
So it's unclear why the connection is failing
a
based on the error it seems to be some sort of implicit connection re-use going awry
in that stack trace were creating a new
instance
object so i dont know why its hitting
_revalidate_connection
and trying to use some previously established connection
one fix could be to add
ResourceClosedError
to the allow list of exceptions in
retry_mysql_connection_fn
j
Well, this sensor works locally with MySQL in a docker configuration
So I'm slightly confused why it's failing in prod.
MYSQL_CONNECTION_URL
is set on every container we run, and configured in our dagster.yaml in prod like so:
Copy code
scheduler:
  module: dagster.core.scheduler
  class: DagsterDaemonScheduler

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator

run_launcher:
  module: dagster_aws.ecs
  class: EcsRunLauncher
  config:
    include_sidecars: true

run_storage:
  module: dagster_mysql.run_storage
  class: MySQLRunStorage
  config:
    mysql_url:
      env: MYSQL_CONNECTION_URL

event_log_storage:
  module: dagster_mysql.event_log
  class: MySQLEventLogStorage
  config:
    mysql_url:
      env: MYSQL_CONNECTION_URL

schedule_storage:
  module: dagster_mysql.schedule_storage
  class: MySQLScheduleStorage
  config:
    mysql_url:
      env: MYSQL_CONNECTION_URL
a
based on the error / stack trace - I suspect the long lived server is holding on to a connection that goes stale and is throwing
ResourceClosedError
which isn’t handled gracefully its unclear what is causing this connection to be held since these are fresh objects - must be something in the
mysql
dbapi
used by
sqlalchemy
, we don’t see this issue with
postgres
j
Mind you, we also have no issues running jobs in prod or querying their data via dagit; we are only having issues with sensors accessing the prod DB.
Interesting, but wouldn't the same thing happen with dagit?
Or rather not, given that it's just a GraphQL server
a
theoretically ya, but in dagit we explicitly enable pooling, so maybe its handled correctly there
j
one thing to note - sensors have never worked for us
Even if the user code repo is freshly deployed / restarted
But we can try the above on next release.
For https://github.com/dagster-io/dagster/pull/9343, do you have a hypothesis as to why this isn't a problem locally for us? When I deploy fresh Docker containers (dagit, daemon, user code) sensors run successfully.
a
oh interesting so even on initial invocation they didnt work - hm
can you attach to the running user code server where the sensor is being evaluated and ensure
MYSQL_CONNECTION_URL
is what is expected?
j
Will report back here
Ok, just confirmed that all our tasks have the env variable set at the docker level.
We use
mysql+mysqlconnector://
prefix btw @alex
a
hmmmm
ECS with dagit, dagster daemon, and a user code repo all deployed
How are you managing these tasks? Are there any differences between them? For context on whats happening based on the stack trace - we are materializing a
DagsterInstance
that was passed from the daemon to the user code server, so its the
dagster.yaml
in the daemon thats our source and then failing to load connect the DB we got pointed at when we processed that config. Are there any networking settings on the
dagit
task that are absent on the user code server task? Certain kinds of
sensors
are pretty much the only time the user code server reaches out to the database.
j
Biggest thing on on user code repo. We use SQLAlchemy in our user code to connect to our MySQL DB, which is the same instance in RDS as where we store workflows data. Our prod data is in one DB, workflows in an adjacent DB. What is sketchy is that this is the same config we have locally.
a
hmm do you connect with that DB at all at definition time? like as part of defining your jobs/repository
j
We don't; only thing we do is setup our own SQLAlchemy session.
This behavior is the same locally, where sensors work.
To be explicit, here is an example of a sensor we're running:
Copy code
@run_status_sensor(pipeline_run_status=DagsterRunStatus.SUCCESS, minimum_interval_seconds=1)
def dagster_task_success(context: RunStatusSensorContext) -> None:
    task = DagsterTaskDAO().get_by_job_id(context.dagster_run.run_id)
    DagsterTaskDAO().update_complete(task)
DagsterTaskDAO
is just a utility that issues a SQLAlchemy query.
a
right but hypothesis here is that sqlalchemy session creation incurs a DB connection which then goes idle and closes then when the sensor tries to grab the connection it gets the closed one resulting in the observed error
j
Gotcha; so we believe this is due to idle connection
Let me hard restart our user code repo and immediately enable sensors, to see if they work post-restart + reload
a
oh interesting i sort of overlooked this part of the stack trace:
Copy code
The above exception was caused by the following exception:
sqlalchemy.exc.StatementError: (sqlalchemy.exc.ResourceClosedError) This Connection is closed
[SQL: SHOW FULL TABLES FROM `workflows`]
j
Wait I actually think I have something disproving the hypothesis. I've been running local dagster all day, and the sensors have never failed on DB connectivity. Despite being long-running.
a
workflows
is the name of the dagster database?
j
Yep
Same RDS instance as the DB we connect to for our user-defined code (our prod DB).
a
if you give it a bad url, does it manifest as a
ResourceClosedError
?
j
Nope; tried this locally
Also tried unsetting the env variable, different error
Something else I just tried -- I hard reset the user code repo job and sensors persisted failing right after restarted.
Update from my side - I monkey-patched Dagster to have your fix @alex from your PR and it unfortunately didn't fix the issue 😞
a
😞 well thanks for testing it out for me sorry that I have no ideas whats going wrong here
j
I'm wondering if we could monkey patch dagster to add flags to how you folks stand up the engine
Looking at https://github.com/dagster-io/dagster/issues/8877, I wonder if we can do
pool_pre_ping
to nullify this
a
in
dagit
where we enable pooling that makes sense but in this case it should be using the default engine which is explicitly set to NullPool https://github.com/dagster-io/dagster/blame/master/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py#L59-L63
j
Gotcha; if I wanna subclass + tweak the run storage, does that mean I have to redeploy all the services, or can I redeploy just user code repo.
a
the daemon sends over its
dagster.yaml
config when evaluating sensors so daemon and user code at least - might be best just to keep them all in sync