Joseph Antonakakis
08/10/2022, 10:19 PMworkflows
where we house Dagster jobs.
This has been blocking us from using sensors 😞. Any troubleshooting advice? Follow-up Q: does Dagster Daemon run the sensor directly, or does it call the user code repo every sensor invocation? I'm trying to understand if, perhaps, the user code repo Docker task isn't able to connect to our MySQL DB.
Stack trace:
dagster.core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor dagster_task_canceled
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.8/site-packages/dagster/core/errors.py", line 191, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
sqlalchemy.exc.StatementError: (sqlalchemy.exc.ResourceClosedError) This Connection is closed
[SQL: SHOW FULL TABLES FROM `workflows`]
File "/usr/local/lib/python3.8/site-packages/dagster/core/errors.py", line 184, in user_code_error_boundary
yield
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/impl.py", line 289, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 354, in evaluate_tick
result = list(self._evaluation_fn(context))
File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 509, in _wrapped_fn
for item in result:
File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/run_status_sensor_definition.py", line 437, in _wrapped_fn
context.instance.get_event_records(
File "/usr/local/lib/python3.8/site-packages/dagster/core/definitions/sensor_definition.py", line 106, in instance
DagsterInstance.from_ref(self._instance_ref)
File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 451, in from_ref
unified_storage = instance_ref.storage
File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/ref.py", line 413, in storage
return self.storage_data.rehydrate() if self.storage_data else None
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 85, in rehydrate
return klass.from_config_value(self, result.value)
File "/usr/local/lib/python3.8/site-packages/dagster/core/storage/legacy_storage.py", line 98, in from_config_value
run_storage = ConfigurableClassData(
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 85, in rehydrate
return klass.from_config_value(self, result.value)
File "/usr/local/lib/python3.8/site-packages/dagster_mysql/run_storage/run_storage.py", line 115, in from_config_value
return MySQLRunStorage(inst_data=inst_data, mysql_url=mysql_url_from_config(config_value))
File "/usr/local/lib/python3.8/site-packages/dagster_mysql/run_storage/run_storage.py", line 66, in __init__
table_names = retry_mysql_connection_fn(db.inspect(self._engine).get_table_names)
File "/usr/local/lib/python3.8/site-packages/dagster_mysql/utils.py", line 96, in retry_mysql_connection_fn
return fn()
File "<string>", line 2, in get_table_names
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/deprecations.py", line 128, in warned
return fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/reflection.py", line 198, in get_table_names
tnames = self.dialect.get_table_names(
File "<string>", line 2, in get_table_names
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/reflection.py", line 52, in cache
ret = fn(self, con, *args, **kw)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2530, in get_table_names
rp = connection.execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2191, in execute
return connection.execute(statement, *multiparams, **params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 976, in execute
return self._execute_text(object_, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1145, in _execute_text
ret = self._execute_context(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1177, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1481, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1173, in _execute_context
conn = self._revalidate_connection()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 463, in _revalidate_connection
raise exc.ResourceClosedError("This Connection is closed")
Stephen Bailey
08/11/2022, 1:33 AMalex
08/11/2022, 3:02 PMJoseph Antonakakis
08/11/2022, 4:09 PMJoseph Antonakakis
08/11/2022, 4:09 PMalex
08/11/2022, 4:11 PMalex
08/11/2022, 4:13 PMinstance
object so i dont know why its hitting _revalidate_connection
and trying to use some previously established connectionalex
08/11/2022, 4:13 PMResourceClosedError
to the allow list of exceptions in retry_mysql_connection_fn
Joseph Antonakakis
08/11/2022, 4:17 PMJoseph Antonakakis
08/11/2022, 4:17 PMJoseph Antonakakis
08/11/2022, 4:20 PMMYSQL_CONNECTION_URL
is set on every container we run, and configured in our dagster.yaml in prod like so:
scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
run_launcher:
module: dagster_aws.ecs
class: EcsRunLauncher
config:
include_sidecars: true
run_storage:
module: dagster_mysql.run_storage
class: MySQLRunStorage
config:
mysql_url:
env: MYSQL_CONNECTION_URL
event_log_storage:
module: dagster_mysql.event_log
class: MySQLEventLogStorage
config:
mysql_url:
env: MYSQL_CONNECTION_URL
schedule_storage:
module: dagster_mysql.schedule_storage
class: MySQLScheduleStorage
config:
mysql_url:
env: MYSQL_CONNECTION_URL
alex
08/11/2022, 4:26 PMResourceClosedError
which isn’t handled gracefully
its unclear what is causing this connection to be held since these are fresh objects - must be something in the mysql
dbapi
used by sqlalchemy
, we don’t see this issue with postgres
Joseph Antonakakis
08/11/2022, 4:26 PMJoseph Antonakakis
08/11/2022, 4:27 PMJoseph Antonakakis
08/11/2022, 4:27 PMalex
08/11/2022, 4:29 PMalex
08/11/2022, 4:29 PMJoseph Antonakakis
08/11/2022, 4:31 PMJoseph Antonakakis
08/11/2022, 4:31 PMJoseph Antonakakis
08/11/2022, 4:32 PMJoseph Antonakakis
08/11/2022, 4:33 PMalex
08/11/2022, 4:40 PMalex
08/11/2022, 4:41 PMMYSQL_CONNECTION_URL
is what is expected?Joseph Antonakakis
08/11/2022, 5:32 PMJoseph Antonakakis
08/11/2022, 7:29 PMJoseph Antonakakis
08/11/2022, 7:29 PMmysql+mysqlconnector://
prefix btw @alexalex
08/11/2022, 7:39 PMECS with dagit, dagster daemon, and a user code repo all deployedHow are you managing these tasks? Are there any differences between them? For context on whats happening based on the stack trace - we are materializing a
DagsterInstance
that was passed from the daemon to the user code server, so its the dagster.yaml
in the daemon thats our source and then failing to load connect the DB we got pointed at when we processed that config.
Are there any networking settings on the dagit
task that are absent on the user code server task?
Certain kinds of sensors
are pretty much the only time the user code server reaches out to the database.Joseph Antonakakis
08/11/2022, 8:23 PMalex
08/11/2022, 8:34 PMJoseph Antonakakis
08/11/2022, 8:52 PMJoseph Antonakakis
08/11/2022, 8:53 PMJoseph Antonakakis
08/11/2022, 8:59 PM@run_status_sensor(pipeline_run_status=DagsterRunStatus.SUCCESS, minimum_interval_seconds=1)
def dagster_task_success(context: RunStatusSensorContext) -> None:
task = DagsterTaskDAO().get_by_job_id(context.dagster_run.run_id)
DagsterTaskDAO().update_complete(task)
DagsterTaskDAO
is just a utility that issues a SQLAlchemy query.alex
08/11/2022, 9:00 PMJoseph Antonakakis
08/11/2022, 9:00 PMJoseph Antonakakis
08/11/2022, 9:01 PMalex
08/11/2022, 9:03 PMThe above exception was caused by the following exception:
sqlalchemy.exc.StatementError: (sqlalchemy.exc.ResourceClosedError) This Connection is closed
[SQL: SHOW FULL TABLES FROM `workflows`]
Joseph Antonakakis
08/11/2022, 9:05 PMalex
08/11/2022, 9:16 PMworkflows
is the name of the dagster database?Joseph Antonakakis
08/11/2022, 9:34 PMJoseph Antonakakis
08/11/2022, 9:34 PMalex
08/11/2022, 9:53 PMResourceClosedError
?Joseph Antonakakis
08/12/2022, 12:07 AMJoseph Antonakakis
08/12/2022, 12:07 AMJoseph Antonakakis
08/12/2022, 12:56 AMJoseph Antonakakis
08/12/2022, 12:34 PMalex
08/12/2022, 2:32 PMJoseph Antonakakis
08/12/2022, 2:36 PMJoseph Antonakakis
08/12/2022, 2:36 PMpool_pre_ping
to nullify thisalex
08/12/2022, 2:50 PMdagit
where we enable pooling that makes sense but in this case it should be using the default engine which is explicitly set to NullPool
https://github.com/dagster-io/dagster/blame/master/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py#L59-L63Joseph Antonakakis
08/12/2022, 3:25 PMalex
08/12/2022, 3:31 PMdagster.yaml
config when evaluating sensors so daemon and user code at least - might be best just to keep them all in sync