is there a way to return an object with a subproce...
# ask-community
j
is there a way to return an object with a subprocess from an op? when I try a simple example, I get
Copy code
TypeError: cannot pickle '_thread.lock' object
which seems to make sense based on what I've read about pickling subprocesses. is this a situation where I'd need custom io_manager? the reason I want to return an object with a subprocess is that the app that the process is driving takes a long time to load up and I need to load it up, run an initial state, then use the results to figure out what the next set of commands should be (and do this a bunch). so reusing the same process seems reasonable. my current workaround is to put all of that logic in a big ole op that calls a bunch of 'normal' (non-op) functions, but I'd really like to refactor it into separate ops, but that requires passing the object with the process, which I can't do. I feel like I'm either misusing dagster or not architecting my DAG properly.
dagster bot responded by community 1
🤖 1
s
Maybe there is a different way to "pass around the process" - perhaps you could pass around the PID for the process instead and then use
psutil
to manage that process. Depends what you need to be able to do with the process, of course.
đź‘Ť 1
Another option might be to wrap your process in a dagster resource. That way all ops in your job can have access to the same process (provided that you use an
in_process_executor
, because that will only spawn the resource once). I do something similar to this when I use selenium in a job - I wrap the selenium driver in a resource, execute the job using an
in_process_executor
and then each of my ops can access the same selenium driver and the state of that driver is kept and transferred from one op to the next. I even use
psutil
to do some heavy-handed cleanup of the selenium driver process and it's subprocesses in the case where it doesn't shutdown cleanly.
s
Both of Stefan’s suggestions here are good. You could use a custom
IOManager
to handle this in the multiprocess case, but it seems simpler to just pass the PID around and use it to connect to the process within different ops.
j
Thanks for the tips! I was able to get it to work as an in_process resource, but the lack of being able to use multiprocess is not ideal. for simple DAGs it probably isn't an issue but I could see performance issues for more complex ones if I can't run in parallel. Sounds like a custom IOManager would potentially help but I haven't dug into that much yet. I think the passing around the PID sounds like the right approach so I'll probably poke at that next. Thanks again.
@sean can you elaborate a bit on the high-level overview on what I'd need to consider/do with setting up a custom IOManager to handle this for multiprocess? For the simple case I described the other options may be simpler, but thinking about it more, I think I'm eventually going to need to at least understand what the custom IOManager approach would entail.
k
@sean Do you know if there's a way to limit concurrency for resources, e.g. have one resource of a certain type at a time (where `op`s of other jobs that require the same resources have to wait until the busy resource is cleared)?
s
@John Boyle The custom
IOManager
case isn’t too different from the “passing the PID” case. In both cases, what is actually communicated between Dagster processes is a simple string like a PID or socket address. The main difference is that with a custom
IOManager
, the
IOManager
will handle reconnecting to a running process for you, so that the object fed to your op compute function is already a Python process-wrapping abstraction. Without the custom
IOManager
, your op will just receive the PID and will have to implement some kind of reconnection on its own. Here is the main
IOManager
doc page, which you should thoroughly read if you want to attempt this. I’m not an expert in the Python process APIs, but when creating the external process you’d want to launch it as a daemon/server so that it will be able to communicate with multiple Python processes (the ones your ops run in). Then your IOManager would store either the PID or a Unix or TCP socket address on output. When loading the output in a new process, if storing the PID, you could use
psutil
to gather information about and communicate with the external process. If storing the socket address, you could just reconnect to that. Maybe @sandy can offer some better advice here, he knows more about IOManagers than me. Or he might have a better suggestion than using
IOManager
here.
@Kobroli I don’t think there’s a way to do that, but also IMO Dagster seems like the wrong level of abstraction at which to implement this kind of constraint. Typically, Dagster resources themselves shouldn’t be expensive-- if you have a “heavy” third party system (e.g. database server), then Dagster resources should just be thin connections to that system, and the external system itself (not Dagster resource objects) should implement queueing/prioritization. That said, I’m not an expert in resources, so cc @chris for confirmation/correction/elaboration.
đź‘Ť 1
c
^ I'd agree with what Sean said here. There's no built-in way to do that using Dagster, and there's no plans to implement such a concurrency system at the moment. Best handled by the system the resource implements itself, or a thin layer on top of the system.