Hello everyone. I have a quick question. I have an...
# ask-community
n
Hello everyone. I have a quick question. I have an OP that is using a resource which is a connection to a postgres db. The whole OP takes quite some time and at some point the connection is dropped leading to a
psycopg2.InterfaceError: connection already closed
. Any idea how to reestablish the connection using the existing resource (through dagster.OpExecutionContext) ? Thanks everyone!! 🙂
j
How is your resource configured? For our Postgres use case, I created a subclass of
dagster.ConfigurableResource
with a method to return an instance of a different class that represents the actual Postgres connection. This class contains the credentials, as well as methods to connect and disconnect from the database. In my software-defined assets, I can do something like:
Copy code
postgres = context.resources.postgres.get_client() 
postgres.connect()
Would this approach work for your use case?
n
Hey Justin. My resources are configured like this:
Copy code
@dagster.resource(
    config_schema={"connection_string": dagster.Field(dagster.StringSource)}
)
def pg_replica(context):
    conn_string = context.resource_config["connection_string"]
    conn_db = psycopg2.connect(conn_string)
    return PGRepository(conn_db)
The PGRepository is a class that initiates the connection and then I can just use any function in there to query.
I think I do understand what you mean. I can probably achieve something similar. I will try. Thank you so much!!! 🙏
j
No problem. I pasted a crude example below.
Copy code
from dagster import ConfigurableResource

class PostgresDatabase(object):
    def __init__(self, host: str, user: str, password: str):
        self.host = host
        self.user = user
        self.password = password
        self.connection = None

    def connect(self):
        # do something to define a connection with your lib of choice
        # self.connection = connection

    def disconnect(self) -> None:
        self.connection.dispose()
        self.connection = None
        return None

    def is_connected(self) -> bool:
        if self.connection is None:
            return False
        else:
            return True

class PostgresResource(ConfigurableResource):
    host: str
    user: str
    password: str

    def get_client(self):
        return PostgresDatabase(self.host, self.user, self.password)
n
Thank you Justin. I believe I can adjust this to my case 🙏