Steven Murphy
07/18/2023, 7:08 PM@job
of @ops
(so not an asset materialisation job), does Dagster support a try/finally construct? My usecase is that I have a Postgres connection that I want to keep open for the duration of the job. It is used by multiple ops, and it seems wasteful to open a new connection for each one.
I've attached a screenshot showing a simplified version of the pipeline. I would like close_connection
to be called whether the_op
is successful or throws a failure.
I'd considered writing this as a PostgresResource
which doesn't appear to exist (closest I've found is this - https://dagster.slack.com/archives/C01U954MEER/p1682544874591409). I was planning to write it in a similar manner to the BigQueryResource
, however that's based on a HTTP API. As far as I can tell, the resource abstraction doesn't really lend itself to long lived database connections. Is this a reasonable conclusion?
Edit: It appears that getting the PGConnection (my custom object wrapping a psycopg2.connect
wouldn't work anyway, as it can't be pickled to be passed between ops). Currently reverting back to the resource approach, probably creating a new connection each time I need one. That has the benefit of context manager to control the conneciton at least. May play with connection pools as wellZach
07/18/2023, 8:01 PM.execute_in_process()
to run the job in a single process)Zach
07/18/2023, 8:04 PMclaire
07/18/2023, 10:18 PMSteven Murphy
07/19/2023, 4:41 PMmulti_or_in_process_executor
to in_process_executor
, changed my run time for a small local job from 5 seconds to 0.5 seconds. I'm guessing that spinning up the multi-threaded machinery for this test case took more effort that running the actual workload.
the resource would be spun up once and be reused until job execution completes.Let's say for example I went with the
in_process_executor
approach, and I decided I wanted to have a database connection open for the duration of the job, shared across all the ops. How would I do that? I'd like to do something like:
- Define resource, open underlying connection
- - Run job
- - - Run op 1 (pass in connection)
- - - Run op 2 (pass in connection)
- - - Run op 3 (pass in connection)
- - Shut down underlying connection
In my mind I'm thinking of some sort of context manager, almost like using a with:
statement on the connection resource
- with connection:
- - Run job
- - - Run op 1 (pass in connection)
- - - Run op 2 (pass in connection)
- - - Run op 3 (pass in connection)
claire
07/19/2023, 9:44 PMInterestingly, going from the defaultYep, it takes about a second or so to spin up each process, so short-running ops/assets execute more quickly in-process I'm not familiar with the specifics of opening database connections, but you could represent your resource as a class. Upon initializing you can open the connection and store it in a class attribute, and use that attribute in class methods that are called from the ops.tomulti_or_in_process_executor
, changed my run time for a small local job from 5 seconds to 0.5 seconds.in_process_executor
claire
07/19/2023, 9:45 PM