Nickolas Zourmpakis
06/07/2023, 2:18 PMpsycopg2.InterfaceError: connection already closed
. Any idea how to reestablish the connection using the existing resource (through dagster.OpExecutionContext) ? Thanks everyone!! 🙂Justin Taylor
06/07/2023, 2:31 PMdagster.ConfigurableResource
with a method to return an instance of a different class that represents the actual Postgres connection. This class contains the credentials, as well as methods to connect and disconnect from the database. In my software-defined assets, I can do something like:
postgres = context.resources.postgres.get_client()
postgres.connect()
Would this approach work for your use case?Nickolas Zourmpakis
06/07/2023, 2:58 PM@dagster.resource(
config_schema={"connection_string": dagster.Field(dagster.StringSource)}
)
def pg_replica(context):
conn_string = context.resource_config["connection_string"]
conn_db = psycopg2.connect(conn_string)
return PGRepository(conn_db)
The PGRepository is a class that initiates the connection and then I can just use any function in there to query.Nickolas Zourmpakis
06/07/2023, 3:23 PMJustin Taylor
06/07/2023, 3:33 PMfrom dagster import ConfigurableResource
class PostgresDatabase(object):
def __init__(self, host: str, user: str, password: str):
self.host = host
self.user = user
self.password = password
self.connection = None
def connect(self):
# do something to define a connection with your lib of choice
# self.connection = connection
def disconnect(self) -> None:
self.connection.dispose()
self.connection = None
return None
def is_connected(self) -> bool:
if self.connection is None:
return False
else:
return True
class PostgresResource(ConfigurableResource):
host: str
user: str
password: str
def get_client(self):
return PostgresDatabase(self.host, self.user, self.password)
Nickolas Zourmpakis
06/07/2023, 3:50 PM