Scott Hood
04/15/2022, 4:00 PMfs_io_manager
for this. However, I am a bit confused by the documentation on how I would essentially write and store the files in the base location.
My python code currently for getting the files and saving them to disk looks like this:
def __download(self, url, filename):
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
Does the fs_io_manager support something like this? Or what would be the most dagster way of essentially managing these files after downloaded?Aabir Abubaker Kar
04/15/2022, 5:27 PMfs_io_manager
or any io_manager
usually deals with op outputs. So your first step would be to decorate __download()
as an op.
The complication in your case seems to be the chunking. If you were willing to just output the whole response, fs_io_manager
would happily pickle that response to an automatically defined location under $DAGSTER_HOME.
But for your case, one solution is to op
output the Response
object itself, and define a custom IOManager to write it to disk one chunk at a time. Something like below:
@dagster.op(out={'request': dagster.Out(dagster_type=requests.models.Response, io_manager_key='custom_chunk_io_manager'})
def __download(self, url, filename):
with requests.get(url, stream=True) as r:
r.raise_for_status()
return r
class ChunkIOManager(dagster.IOManager):
def load_input(self, context):
with open(self.filename, 'rb') as f:
data = f.readlines()
return data
def handle_output(self, context, obj):
assert isinstance(obj, requests.models.Response)
self.filename = context.get_output_identifier()[-1]
# you can do other things here to define the filename contextually and SAVE it
with open(self.filename, 'wb') as f:
for chunk in obj.iter_content(chunk_size=8192):
f.write(chunk)
@dagster.io_manager
def chunkiomanager(context):
return ChunkIOManager()
...
# wherever you are defining the job which includes the op __download(), add a resource definition for `custom_chunk_io_manager`
resource_defs = {'custom_chunk_io_manager': chunkiomanager},
config = {...},
...
Scott Hood
04/15/2022, 5:34 PMScott Hood
04/15/2022, 7:11 PMoutputcontext.name
to something other than 'result'. From your above example, I wasn't able to get filename from the get_output_identifier
because it was set as the default value. is there a way for the output of my op
to set the filename for the context.name
Aabir Abubaker Kar
04/15/2022, 7:14 PMop
decorator like
@dagster.op(out={'my_result_name': dagster.Out(dagster_type=requests.models.Response, io_manager_key='custom_chunk_io_manager'})
def __download(...):
pass
Scott Hood
04/15/2022, 7:16 PMself.filename
the value is not stored and I get the error:
AttributeError: 'ChunkIOManager' object has no attribute 'filename'
when the loader attempts to get the file.Scott Hood
04/15/2022, 7:16 PMAabir Abubaker Kar
04/15/2022, 7:21 PMhandle_output()
should be called once the __downlaod()
op completes execution
load_input()
should get called when any downstream op
(say analyze_downloaded_stream()
is invoked. So it self.filename
should have been defined by then
I wrote up a simple working example that (among other things) differentiates between CustomIOManager and fs_io_manager
here: https://bakerwho.github.io/posts/datascience/Deployable-Dagster-MVP/
Running it might clear things up for you.Aabir Abubaker Kar
04/15/2022, 7:24 PMfilename
to the Custom IOManager, you should add an __init__()
definition to the class. Something like
class ChunkIOManager(dagster.IOManager):
def __init__(self, filename):
self.filename = filename
def load_input(self, context):
...
Scott Hood
04/15/2022, 7:29 PMop
in which case the op before __download
outputs a yield DynamicOutput(download_url, mapping_key=str(idx))
Where download_url
is a tuple of the filename and the url.
In the write_file_to_disk
I was able to pass the filename
and response object by returning them as a tuple again and having the chunk accept the two, get the filename, and save the data. However I think my problem might lie now with the formation of the Dynamic output in the job:
files = get_download_urls().map(write_file_to_disk)
copy_to_raw(files.collect())
Scott Hood
04/15/2022, 7:33 PMcopy_to_raw
loads its ReInitializing the io_manager.
Starting initialization of resources [custom_chunk_io_manager, io_manager].
Aabir Abubaker Kar
04/15/2022, 7:38 PMresource
)Aabir Abubaker Kar
04/15/2022, 7:38 PMDagster Bot
04/15/2022, 7:38 PM@dagster_bot <issue|docs|discussion> <title>
Scott Hood
04/15/2022, 7:55 PM@io_manager(config_schema={'base_path': Field(String)})
def chunk_io_manager(context):
<http://context.log.info|context.log.info>("I am a new object")
return ChunkIOManager(context.resource_config["base_path"])
During the step after download seems to be calling this again, getting a brand new object which wouldn't have the filename set. : \