John Boyle
05/27/2022, 12:49 AMTypeError: cannot pickle '_thread.lock' object
which seems to make sense based on what I've read about pickling subprocesses.
is this a situation where I'd need custom io_manager? the reason I want to return an object with a subprocess is that the app that the process is driving takes a long time to load up and I need to load it up, run an initial state, then use the results to figure out what the next set of commands should be (and do this a bunch). so reusing the same process seems reasonable.
my current workaround is to put all of that logic in a big ole op that calls a bunch of 'normal' (non-op) functions, but I'd really like to refactor it into separate ops, but that requires passing the object with the process, which I can't do.
I feel like I'm either misusing dagster or not architecting my DAG properly.Stefan Adelbert
05/27/2022, 2:23 AMpsutil
to manage that process. Depends what you need to be able to do with the process, of course.Stefan Adelbert
05/27/2022, 2:48 AMin_process_executor
, because that will only spawn the resource once).
I do something similar to this when I use selenium in a job - I wrap the selenium driver in a resource, execute the job using an in_process_executor
and then each of my ops can access the same selenium driver and the state of that driver is kept and transferred from one op to the next. I even use psutil
to do some heavy-handed cleanup of the selenium driver process and it's subprocesses in the case where it doesn't shutdown cleanly.sean
05/27/2022, 1:11 PMIOManager
to handle this in the multiprocess case, but it seems simpler to just pass the PID around and use it to connect to the process within different ops.John Boyle
05/27/2022, 4:37 PMJohn Boyle
05/28/2022, 1:07 AMKobroli
05/30/2022, 3:01 PMsean
05/30/2022, 10:28 PMIOManager
case isn’t too different from the “passing the PID” case. In both cases, what is actually communicated between Dagster processes is a simple string like a PID or socket address.
The main difference is that with a custom IOManager
, the IOManager
will handle reconnecting to a running process for you, so that the object fed to your op compute function is already a Python process-wrapping abstraction. Without the custom IOManager
, your op will just receive the PID and will have to implement some kind of reconnection on its own.
Here is the main IOManager
doc page, which you should thoroughly read if you want to attempt this.
I’m not an expert in the Python process APIs, but when creating the external process you’d want to launch it as a daemon/server so that it will be able to communicate with multiple Python processes (the ones your ops run in). Then your IOManager would store either the PID or a Unix or TCP socket address on output. When loading the output in a new process, if storing the PID, you could use psutil
to gather information about and communicate with the external process. If storing the socket address, you could just reconnect to that.
Maybe @sandy can offer some better advice here, he knows more about IOManagers than me. Or he might have a better suggestion than using IOManager
here.sean
05/30/2022, 10:39 PMchris
05/31/2022, 5:45 PM