https://dagster.io/ logo
Title
c

Chris Nogradi

02/02/2022, 7:23 PM
I am trying to understand how to pass a set of keys/ids from the IOManager.handle_output() to the IOManager.load_input() if the handle_output() creates the key/id from the external resource (in my case MongoDB's ObjectIDs). I see this done in the memory IO manager by using a local variable but this won't work cross process. Any examples, I can look at?
o

owen

02/02/2022, 7:36 PM
hi @Chris Nogradi I think you can reuse part of the implementation of the fs_io_manager for this purpose. the gist of it is just writing to / reading from the same file in both handle_output and load_input to deal with the process boundary issue you're mentioning. with your use case, you could generate the MongoDB ObjectID, then write that to a file in handle_output().
c

Chris Nogradi

02/02/2022, 7:45 PM
Thanks for the response @owen. Does this require all executors to be able to mount the same file system? Is there not a way to piggy back on the metadata event system? Maybe I don't understand the infrastructure.
o

owen

02/02/2022, 8:12 PM
ah I see what you mean. this protocol would indeed require you to have some globally-available file system (or something like s3) in order for it to work. re: piggybacking on the metadata event system, this is possible, but it would be a bit hacky / painful. You can access the DagsterInstance (which contains event logs, run history, etc.) at runtime using
output_context.step_context.instance
. This will give you a Dagster Instance that you can call something like
instance.all_logs(context.run_id, of_type=DagsterEventType.HANDLED_OUTPUT)
on to get all of the "handled output" events for the current run. You'd then have to iterate through these to get the correct one for the given step, and then read off metadata from that event (you can add metadata to this event by yielding metadata entries inside the handle_output function: https://docs.dagster.io/concepts/io-management/io-managers#yielding-metadata-from-an-iomanager). Not really a recommended pattern, and you'd need more complicated logic if you wanted to support re-execution on those edges (https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/core/execution/context/system.py?L475:9), but I figure'd I'd mention it
c

Chris Nogradi

02/02/2022, 10:57 PM
Thanks @owen, since it seems that what I am trying to do is not natively supported by Dagster, I am probably not trying to use it properly. Is the expected way of using database resources to push data to them inside of ops rather than using an IOManager and configuring it to be used for certain op outputs? Do pipelines typically fetch from database, then perform operations and finally output to a database in the last op in the pipeline so that IOManagers are not used for this? I assume that this coupled with Asset Materialization is how I am suppose to do it?
o

owen

02/02/2022, 11:07 PM
I would say that it really depends on the specific use case. Generally, with databases, the particular op will know "in advance" exactly where in the database the data that it's outputting will be stored (i.e. the schema/table name), in which case that can be encoded in (for example), the metadata argument on an Out, or just in the config schema of the op, either of which are accessible in the handle_output/load_input functions on an IOManager. The tricky part comes in when the storage location is variable at runtime, and so state needs to be transferred between the ops in some way. At this point, it's not necessarily a matter of putting the storage code in the IOManager vs the Op so much as it is a question of how you transfer the relevant state (be it an entire dataframe or just a pointer to an object id in mongodb) across the process boundary. Generally, users in this situation will use something like the s3_pickle_io_manager (or cloud object store of their choice) to handle that.
the question of when to use an IOManager vs. explicitly write to a database in an Op comes down to (in my opinion) if you will be using a table that is generated by an Op as an input to a downstream Op. If you're not, there's not much reason to use an IOManager (because the load_input function will never be called). Either solution still has to deal with the process communication issue, it's just a matter of ergonomics when you're using the output table in other ops (rather than having to copy/paste the same loading function every time you want to use that output as input to another op, the IOManager can handle it for you)
c

Chris Nogradi

02/03/2022, 5:30 PM
Thanks for the detailed explanations @owen. I realized, based on your responses that I can just store this relationship (out/in) in MongoDb as well and it seems to work the same as using a shared file. When you say that if no table is generated by an op then load_input is not called, is this for ops that have no outputs but still have downstreams ops that depend on them (these ops obviously get their data from somewhere else that is not the previous op)? Also is it reasonable for a IOManager to do nothing if an op produced an empty dataset (the algorithm found nothing in the datastream)?
o

owen

02/03/2022, 5:37 PM
my comment about load_input was just meant to emphasize that you mostly want to use a table-based IOManager (i.e. that writes outputs to tables in a database, then reads from those tables in load_input) if you care about using the written table as input to some downstream ops.
re: the IOManager doing nothing -- that seems like a totally reasonable protocol as long as the load_input function handles that case gracefully (returning None or something)
c

Chris Nogradi

02/03/2022, 5:40 PM
Perfect! Thanks!
1