Bianca Rosa
07/20/2022, 9:59 PMdagster.core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor dagster_task_success
. Did some research here and couldn’t find much.dagster.core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor dagster_task_success
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.8/site-packages/dagster/core/errors.py", line 191, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
sqlalchemy.exc.StatementError: (sqlalchemy.exc.ResourceClosedError) This Connection is closed
[SQL: SHOW FULL TABLES FROM `workflows`]
File "/usr/local/lib/python3.8/site-packages/dagster/core/errors.py", line 184, in user_code_error_boundary
yield
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 354, in evaluate_tick
result = list(self._evaluation_fn(context))
File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 509, in _wrapped_fn
for item in result:
File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/run_status_sensor_definition.py", line 437, in _wrapped_fn
context.instance.get_event_records(
File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 106, in instance
DagsterInstance.from_ref(self._instance_ref)
File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 451, in from_ref
unified_storage = instance_ref.storage
File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/ref.py", line 413, in storage
return self.storage_data.rehydrate() if self.storage_data else None
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 85, in rehydrate
return klass.from_config_value(self, result.value)
File "/usr/local/lib/python3.8/site-packages/dagster/core/storage/legacy_storage.py", line 98, in from_config_value
run_storage = ConfigurableClassData(
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 85, in rehydrate
return klass.from_config_value(self, result.value)
File "/usr/local/lib/python3.8/site-packages/dagster_mysql/run_storage/run_storage.py", line 115, in from_config_value
return MySQLRunStorage(inst_data=inst_data, mysql_url=mysql_url_from_config(config_value))
File "/usr/local/lib/python3.8/site-packages/dagster_mysql/run_storage/run_storage.py", line 66, in __init__
table_names = retry_mysql_connection_fn(db.inspect(self._engine).get_table_names)
File "/usr/local/lib/python3.8/site-packages/dagster_mysql/utils.py", line 96, in retry_mysql_connection_fn
return fn()
File "<string>", line 2, in get_table_names
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/deprecations.py", line 128, in warned
return fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/reflection.py", line 198, in get_table_names
tnames = self.dialect.get_table_names(
File "<string>", line 2, in get_table_names
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/reflection.py", line 52, in cache
ret = fn(self, con, *args, **kw)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2530, in get_table_names
rp = connection.execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2191, in execute
return connection.execute(statement, *multiparams, **params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 976, in execute
return self._execute_text(object_, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1145, in _execute_text
ret = self._execute_context(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1177, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1481, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1173, in _execute_context
conn = self._revalidate_connection()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 463, in _revalidate_connection
raise exc.ResourceClosedError("This Connection is closed")
@run_status_sensor(pipeline_run_status=DagsterRunStatus.SUCCESS, minimum_interval_seconds=1)
def dagster_task_success(context: RunStatusSensorContext) -> None:
try:
# our code
except Exception:
logger.exception("Error saving dagster job status", exc_info=True)
daniel
07/21/2022, 1:22 AMBianca Rosa
07/21/2022, 12:00 PMdeploy-ecs
folderdaniel
07/21/2022, 12:50 PMBianca Rosa
07/21/2022, 2:30 PMAre any env vars needed by MySQL available in that gRPC server task? That’s where the sensor executesThat’s helpful, I’m gonna isolate the user used to connect to the db to make sure its grabbing a connection when it starts.
daniel
07/21/2022, 3:44 PMBianca Rosa
07/21/2022, 3:44 PM5.7.mysql_aurora.2.07.2
daniel
07/21/2022, 3:45 PMBianca Rosa
07/21/2022, 3:45 PMdaniel
07/21/2022, 3:45 PMBianca Rosa
07/21/2022, 3:46 PMretry_mysql_connection_fn
is running, its always returning a connection that’s already closed by some reason 🤔daniel
07/21/2022, 3:54 PMBianca Rosa
07/25/2022, 3:48 PM2022-07-25T15:30:07.427308Z11132431 Query SELECT jobs.job_body
FROM jobs
WHERE jobs.job_type = 'SENSOR'
2022-07-25T15:30:07.428497Z11132431 Query rollback
2022-07-25T15:30:07.430312Z11132431 Quit
daniel
07/25/2022, 3:51 PMBianca Rosa
07/25/2022, 3:52 PMNick Thorne
07/27/2022, 10:41 AM