Hi folks, a couple of related questions. When bui...
# ask-community
s
Hi folks, a couple of related questions. When building a
@job
of
@ops
(so not an asset materialisation job), does Dagster support a try/finally construct? My usecase is that I have a Postgres connection that I want to keep open for the duration of the job. It is used by multiple ops, and it seems wasteful to open a new connection for each one. I've attached a screenshot showing a simplified version of the pipeline. I would like
close_connection
to be called whether
the_op
is successful or throws a failure. I'd considered writing this as a
PostgresResource
which doesn't appear to exist (closest I've found is this - https://dagster.slack.com/archives/C01U954MEER/p1682544874591409). I was planning to write it in a similar manner to the
BigQueryResource
, however that's based on a HTTP API. As far as I can tell, the resource abstraction doesn't really lend itself to long lived database connections. Is this a reasonable conclusion? Edit: It appears that getting the PGConnection (my custom object wrapping a
psycopg2.connect
wouldn't work anyway, as it can't be pickled to be passed between ops). Currently reverting back to the resource approach, probably creating a new connection each time I need one. That has the benefit of context manager to control the conneciton at least. May play with connection pools as well
z
Yeah long story short resource life-cycles are tied to the ops that they are configured on - they are reinitialized for every op that use them, and then garbage collected after the op. Currently Dagster doesn't support resources that can be shared across multiple ops in a job (unless you're using
.execute_in_process()
to run the job in a single process)
I wish this was highlighted better in the docs somewhere, feels like this question comes up in some form or another a couple times a week
2
c
+1 to Zach's comments above. Another option is if you use the in-process executor to execute your jobs, which basically executes all of the ops in a single process. In this case, the resource would be spun up once and be reused until job execution completes.
s
Thank you both for the clarification. Interestingly, going from the default
multi_or_in_process_executor
to
in_process_executor
, changed my run time for a small local job from 5 seconds to 0.5 seconds. I'm guessing that spinning up the multi-threaded machinery for this test case took more effort that running the actual workload.
the resource would be spun up once and be reused until job execution completes.
Let's say for example I went with the
in_process_executor
approach, and I decided I wanted to have a database connection open for the duration of the job, shared across all the ops. How would I do that? I'd like to do something like:
Copy code
- Define resource, open underlying connection
- - Run job
- - - Run op 1 (pass in connection)
- - - Run op 2 (pass in connection)
- - - Run op 3 (pass in connection)
- - Shut down underlying connection
In my mind I'm thinking of some sort of context manager, almost like using a
with:
statement on the connection resource
Copy code
- with connection:
- - Run job
- - - Run op 1 (pass in connection)
- - - Run op 2 (pass in connection)
- - - Run op 3 (pass in connection)
c
Interestingly, going from the default
multi_or_in_process_executor
to
in_process_executor
, changed my run time for a small local job from 5 seconds to 0.5 seconds.
Yep, it takes about a second or so to spin up each process, so short-running ops/assets execute more quickly in-process I'm not familiar with the specifics of opening database connections, but you could represent your resource as a class. Upon initializing you can open the connection and store it in a class attribute, and use that attribute in class methods that are called from the ops.
I wrote up a short discussion detailing resource spin up, since it seems like this question comes up pretty frequently: https://github.com/dagster-io/dagster/discussions/15398
🌈 1
catjam 1