https://dagster.io/ logo
#ask-community
Title
# ask-community
e

Ethan Leifer

05/09/2023, 1:06 PM
Hi I am trying to make a postgres resource using asyncpg, but for some reason the
_pool
isn’t avaible
Copy code
class PostgresResource(ConfigurableResource):
    _pool: Pool
    _schema: str

    
    async def create_schema(self, schema: str):
        async with self._pool.acquire() as conn:
            await conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema};")

async def init_postgres():
    pool = await create_pool(
        host=POSTGRES_HOST,
        port=POSTGRES_PORT,
        user=POSTGRES_USER,
        password=POSTGRES_PASSWORD,
        database=POSTGRES_DB,
    )
    if pool:
        postgres_resource = PostgresResource(
            _pool=pool,
            _schema=JOB_SCHEMA,
        )
        await postgres_resource.create_schema(schema=JOB_SCHEMA)

        ALL_RESOURCES["postgres"] = postgres_resource


loop = asyncio.get_event_loop()
loop.run_until_complete(init_postgres())
This errors on the create_schema call
s

sean

05/09/2023, 1:35 PM
Hi Ethan,
ConfigurableResource
objects aren’t just regular python objects, there is an initial routine that happens when they are run as part of a job. The reason
_pool
isn’t bound here is because that routine is not getting called because you’re not running this as part of a job.
e

Ethan Leifer

05/09/2023, 2:11 PM
So I have to run the asset as part of a job?
That explanation makes sense
But I am confused on the solution
s

sean

05/09/2023, 2:59 PM
I don’t see any asset in the code you posted-- but the resource should be available to any associated assets/jobs if you supply it as part of a
Definitions
object:
Copy code
defs = Definitions(
    resources = {"postgres": PostgresResource(_pool=pool, _schema=JOB_SCHEMA},
    assets=[...],
    jobs=[...],
)
4 Views