I've got sqlalchemy autoflush enabled in the db model code that my solids run, and sometimes a pipeline run fails due to an integrity constraint failure that happened in a totally different pipeline. Seems the same celery worker executed both pipelines and reused the db session between the runs without first cleaning up after the first one caused an exception. This problem with celery+sqlalchemy is discussed here
http://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html, and before adopting dagster we were extending celery.Task and calling
session.remove()
in its after_return method, just like that article recommends. What's the cleanest way to do this in dagster?