Prratek Ramchandani
01/30/2021, 11:29 PMhandle_output
method would take an iterator output from a solid, iterate through and write each line to a file (JSON in this case), and then upload that to GCS. However, the first “write to JSON file” bit seems like could conceptually be it’s own IO manager. Does it make sense to just have my custom IO manager do both those steps when handling output? Is there a recommended “Dagstery” way to split up these two tasks?Prratek Ramchandani
01/30/2021, 11:36 PMmrdavidlaing
01/31/2021, 10:02 AMPrratek Ramchandani
01/31/2021, 4:44 PMPrratek Ramchandani
01/31/2021, 4:46 PMdagster-gcp
library and the gcs_pickle_io_manager
seems to handle output here by first pickling and then writing to GCP - an analogous two steps to what I was thinking. I’m going to go down that route for now.sandy
02/01/2021, 4:20 PMsandy
02/01/2021, 4:20 PMPrratek Ramchandani
02/01/2021, 4:45 PMCharles Lariviere
02/02/2021, 7:34 PMhandle_output
method so that the next solid can take this as an input. Is that what Asset Materialization is for?sandy
02/02/2021, 7:40 PMhandle_output
method? The same IO manager is used for the handling outputs and for loading their downstream inputs, and the same context is available to both.
Here's an example of how this works for the built-in GCS IO manager: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/io_manager.py#L20.
handle_output
and load_input
both use the same _get_path
function that decides where the object goes.Charles Lariviere
02/02/2021, 7:47 PM_get_path
.
I may be incorrectly using the IO manager in this case then 🤔 I was hoping to load the data in BQ directly from GCS (i.e. through something like load_table_from_uri
) and completely avoid having to re-load the data in memory in the last solid.
This means the last step doesn’t actually need to reload the data in memory/local fs, but instead just needs to provide the GCS path where that file was created. Is an IO manager maybe less appropriate in this case?sandy
02/02/2021, 7:54 PMdef load_input(context):
return self._get_path(context.upstream_output)
It's a little weird, because load_input is returning something different than what was produced by the upstream handle_output, but nothing actually enforces that invariant. There are many situations where the downstream input might need to be loaded differently than the upstream output, e.g. if one solid writes a pandas dataframe and the downstream solid needs to consume it as a spark dataframe.
Does that work for you?Charles Lariviere
02/02/2021, 7:58 PMsandy
02/03/2021, 3:58 PMCharles Lariviere
02/03/2021, 4:17 PM