Folks, been trying to run some sensors on ECS with...
# ask-community
b
Folks, been trying to run some sensors on ECS with dagster. Everything works locally, but on our staging/prod env we are getting an exception::
dagster.core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor dagster_task_success
. Did some research here and couldn’t find much.
Stacktrace
Copy code
dagster.core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor dagster_task_success
  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/errors.py", line 191, in user_code_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
sqlalchemy.exc.StatementError: (sqlalchemy.exc.ResourceClosedError) This Connection is closed
[SQL: SHOW FULL TABLES FROM `workflows`]
  File "/usr/local/lib/python3.8/site-packages/dagster/core/errors.py", line 184, in user_code_error_boundary
    yield
  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 354, in evaluate_tick
    result = list(self._evaluation_fn(context))
  File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 509, in _wrapped_fn
    for item in result:
  File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/run_status_sensor_definition.py", line 437, in _wrapped_fn
    context.instance.get_event_records(
  File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 106, in instance
    DagsterInstance.from_ref(self._instance_ref)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 451, in from_ref
    unified_storage = instance_ref.storage
  File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/ref.py", line 413, in storage
    return self.storage_data.rehydrate() if self.storage_data else None
  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 85, in rehydrate
    return klass.from_config_value(self, result.value)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/storage/legacy_storage.py", line 98, in from_config_value
    run_storage = ConfigurableClassData(
  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 85, in rehydrate
    return klass.from_config_value(self, result.value)
  File "/usr/local/lib/python3.8/site-packages/dagster_mysql/run_storage/run_storage.py", line 115, in from_config_value
    return MySQLRunStorage(inst_data=inst_data, mysql_url=mysql_url_from_config(config_value))
  File "/usr/local/lib/python3.8/site-packages/dagster_mysql/run_storage/run_storage.py", line 66, in __init__
    table_names = retry_mysql_connection_fn(db.inspect(self._engine).get_table_names)
  File "/usr/local/lib/python3.8/site-packages/dagster_mysql/utils.py", line 96, in retry_mysql_connection_fn
    return fn()
  File "<string>", line 2, in get_table_names
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/deprecations.py", line 128, in warned
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/reflection.py", line 198, in get_table_names
    tnames = self.dialect.get_table_names(
  File "<string>", line 2, in get_table_names
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/reflection.py", line 52, in cache
    ret = fn(self, con, *args, **kw)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2530, in get_table_names
    rp = connection.execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2191, in execute
    return connection.execute(statement, *multiparams, **params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 976, in execute
    return self._execute_text(object_, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1145, in _execute_text
    ret = self._execute_context(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1177, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1481, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
    raise exception
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1173, in _execute_context
    conn = self._revalidate_connection()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 463, in _revalidate_connection
    raise exc.ResourceClosedError("This Connection is closed")
The sensor itself is pretty simple:
Copy code
@run_status_sensor(pipeline_run_status=DagsterRunStatus.SUCCESS, minimum_interval_seconds=1)
def dagster_task_success(context: RunStatusSensorContext) -> None:
    try:
       # our code
    except Exception:
        logger.exception("Error saving dagster job status", exc_info=True)
And we are not even catching the exception, which makes me think something went wrong inside of the decorator.
Other things like schedules work fine, so i don’t see a reason why dagsterdaemon is unable to hold that db conn.
d
Hi Bianca - what tasks do you have running in ECs? Is there a separate one running a grpc server?
b
Yes - there is! Been following the examples on the
deploy-ecs
folder
d
Are any env vars needed by MySQL available in that gRPC server task? That's where the sensor executes
It seems like that task is having trouble accessing the mysql db - anything else that would be different about it compared to the other tasks?
b
All env vars are the same on all task tasks 🤔
Are any env vars needed by MySQL available in that gRPC server task? That’s where the sensor executes
That’s helpful, I’m gonna isolate the user used to connect to the db to make sure its grabbing a connection when it starts.
👍 1
So - the user’s access is not getting denied afaik. If I change the password to a bad password I get a proper error, and if the user doesn’t have permissions to log into the db I also get a proper error.
d
What version of dagster is this?
b
Running 0.15.5
MySQL is in an older version tho
Running
5.7.mysql_aurora.2.07.2
d
this happens 100% of the time on every sensor tick for that sensor?
b
yep!
it never worked on that env
d
The reason it's firing just on that sensor is that run status sensors have to hit the DB to learn what runs succeeded or failed - but I don't yet understand why it can't connect to the DB, especially if it's fine connecting to it in e.g. dagit
b
We’ve had a similar error when we tried to create our own queue coordinator, maybe for the same reason
It looks like here when
retry_mysql_connection_fn
is running, its always returning a connection that’s already closed by some reason 🤔
d
Yeah, i don't immediately see why that would fire in the user code task but not the dagit task, they both go through that same codepath
b
Hey @daniel! Morning! I got some updates, enabled mysql general logs and there is a quite weird behavior on the logs (I am no database specialist tho)
Copy code
2022-07-25T15:30:07.427308Z11132431 Query	SELECT jobs.job_body 
FROM jobs 
WHERE jobs.job_type = 'SENSOR'
2022-07-25T15:30:07.428497Z11132431 Query	rollback
2022-07-25T15:30:07.430312Z11132431 Quit
There’s a bunch of these logs - connects succesfully, runs query, tries to commit, and then just rollback
d
could your queries be hitting some kind of a timeout? Is there a way to configure the server to increase timeout limits?
b
🤔 maybe.
it does look like the rollback is expected tho, so if there is a commit there shouldn’t be anything wrong. (very trustful font)
n
If the time outs involve database queries, have you looked at adding some indexes ?