Question about custom IO managers - I’m writing an...
# announcements
p
Question about custom IO managers - I’m writing an IO manager to read/write to an object in GCS and the
handle_output
method would take an iterator output from a solid, iterate through and write each line to a file (JSON in this case), and then upload that to GCS. However, the first “write to JSON file” bit seems like could conceptually be it’s own IO manager. Does it make sense to just have my custom IO manager do both those steps when handling output? Is there a recommended “Dagstery” way to split up these two tasks?
Also, am I using the right abstractions here? The steps in my pipeline are: 1. Extract data from API 2. Write data to file 3. Upload file to GCS 4. Load from GCS to BQ and I figured it made sense to have a solid that does step 1 but an IO manager handle 2 and 3, using the output from 1.
m
Interesting question! I also want to know the answer. In specific; I’d want to emit two AssetMaterializations in steps 3 & 4- can that be done from the IO Manager?
p
I’m pretty sure that’s doable. The way I see it, an IO Manager handles what to do with the input and output to solids while an Asset Materialization handles any other assets produced as part of the computation. (Not 100% sure btw)
I was poking around the
dagster-gcp
library and the
gcs_pickle_io_manager
seems to handle output here by first pickling and then writing to GCP - an analogous two steps to what I was thinking. I’m going to go down that route for now.
s
Hey @Prratek Ramchandani - both the approaches you laid out make sense. More generally, when deciding what to model as an IO manager, the answer revolves around: • Which pieces you want to be executing in the same process. E.g. in your case it makes sense for, 1, 2, and 3 to all happen in the same process, so modeling 2 & 3 with an IO manager instead of separate solids lets you do this. • Which pieces you want to be "injectable". E.g. in a test environment, it might be helpful to be able to run the pipeline without access to GCS, and modeling the steps that write to GCS with an IO manager make it easier to supply a mock version in tests.
👍 1
@mrdavidlaing - yes, AssetMaterializations can be emitted from the IO manager.
thankyou 1
p
Thanks @sandy! That framework for thinking about it is helpful.
c
Hey folks 👋 Sorry to hijack the thread — I’m trying to do exactly the same thing but I’m stuck on step 4. Once the file is loaded to GCS through the IO manager, I’m curious how the solid handling step 4 (loading from GCS to BQ) can receive context about where the file was loaded? Essentially, I would have expected to be able to yield the GCS path in the
handle_output
method so that the next solid can take this as an input. Is that what Asset Materialization is for?
s
Hey @Charles Lariviere - how do you compute the GCS path in the
handle_output
method? The same IO manager is used for the handling outputs and for loading their downstream inputs, and the same context is available to both. Here's an example of how this works for the built-in GCS IO manager: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/io_manager.py#L20.
handle_output
and
load_input
both use the same
_get_path
function that decides where the object goes.
c
Hey @sandy — thanks for your quick reply. I basically re-purposed this exact IO manager but instead have it write a JSON file, so I’m using the same
_get_path
. I may be incorrectly using the IO manager in this case then 🤔 I was hoping to load the data in BQ directly from GCS (i.e. through something like
load_table_from_uri
) and completely avoid having to re-load the data in memory in the last solid. This means the last step doesn’t actually need to reload the data in memory/local fs, but instead just needs to provide the GCS path where that file was created. Is an IO manager maybe less appropriate in this case?
s
Ah, that makes sense. In that case, you could basically do something like:
Copy code
def load_input(context):
    return self._get_path(context.upstream_output)
It's a little weird, because load_input is returning something different than what was produced by the upstream handle_output, but nothing actually enforces that invariant. There are many situations where the downstream input might need to be loaded differently than the upstream output, e.g. if one solid writes a pandas dataframe and the downstream solid needs to consume it as a spark dataframe. Does that work for you?
c
Ah, that’s very interesting! I’ll give that a try, thanks @sandy 🙏 I’m new to Dagster, this is actually my first pipeline, so I’m curious; is that the Dagster-y way to go about a pipeline like this one? Or would this be better served with a solid instead that handles the output and passes the external filepath as an output downstream?
s
The way we discussed above is the way that I would do it if I were writing the pipeline. It has a couple advantages over a solid that instead handles the output and passes the external filepath as an output downstream: • Passing the output as an external filepath downstream means that the filepath needs to be stored somewhere and read from somewhere on each execution, which is a bit of a waste given that it will be the same every time. • Passing the output as an external filepath downstream makes it more difficult to run a backfill over just the downstream step, because you'd need to somehow pass that filepath as input. • Handling the output in the solid body makes it more difficult to test the solid business logic on its own. If the solid just outputs JSON, it's easy to write a unit test that runs the solid and checks if that JSON matches what's expected.
c
Awesome, this is super helpful — really appreciate you’re input on this! Thanks! 🙏
576 Views