Hey team, I have a need to download files via a G...
# ask-community
s
Hey team, I have a need to download files via a GET request in one of my jobs. I have other ops that need to easily access these files and I was thinking of utilizing the
fs_io_manager
for this. However, I am a bit confused by the documentation on how I would essentially write and store the files in the base location. My python code currently for getting the files and saving them to disk looks like this:
Copy code
def __download(self, url, filename):
        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            with open(filename, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
Does the fs_io_manager support something like this? Or what would be the most dagster way of essentially managing these files after downloaded?
daggy success 1
❤️ 1
a
fs_io_manager
or any
io_manager
usually deals with op outputs. So your first step would be to decorate
__download()
as an op. The complication in your case seems to be the chunking. If you were willing to just output the whole response,
fs_io_manager
would happily pickle that response to an automatically defined location under $DAGSTER_HOME. But for your case, one solution is to
op
output the
Response
object itself, and define a custom IOManager to write it to disk one chunk at a time. Something like below:
Copy code
@dagster.op(out={'request': dagster.Out(dagster_type=requests.models.Response, io_manager_key='custom_chunk_io_manager'})
def __download(self, url, filename):
   with requests.get(url, stream=True) as r:
        r.raise_for_status()
        return r

class ChunkIOManager(dagster.IOManager):
    def load_input(self, context):
        with open(self.filename, 'rb') as f:
            data = f.readlines()
        return data
        

    def handle_output(self, context, obj):
        assert isinstance(obj, requests.models.Response)

        self.filename = context.get_output_identifier()[-1]
        # you can do other things here to define the filename contextually and SAVE it
        with open(self.filename, 'wb') as f:
                for chunk in obj.iter_content(chunk_size=8192):
                    f.write(chunk)

@dagster.io_manager
def chunkiomanager(context):
    return ChunkIOManager()

...
# wherever you are defining the job which includes the op __download(), add a resource definition for `custom_chunk_io_manager` 
resource_defs = {'custom_chunk_io_manager': chunkiomanager},
config = {...},
...
🙌 1
s
Ok understanding this a bit better now, so the output in all cases is the content you want to save, and then IO manager simply is the type of which you want to save it as. Sweet! I can investigate if I actually need to chunk these values, however this is great, thanks!
@Aabir Abubaker Kar One more question. Looking at the docs it looks like there is a way of setting the
outputcontext.name
to something other than 'result'. From your above example, I wasn't able to get filename from the
get_output_identifier
because it was set as the default value. is there a way for the output of my
op
to set the filename for the
context.name
a
My intuition tell me that name is defined in the
op
decorator like
Copy code
@dagster.op(out={'my_result_name': dagster.Out(dagster_type=requests.models.Response, io_manager_key='custom_chunk_io_manager'})
def __download(...):
    pass
s
dang... ok for whatever reason when I try to pass filename and set
self.filename
the value is not stored and I get the error:
AttributeError: 'ChunkIOManager' object has no attribute 'filename'
when the loader attempts to get the file.
So I was hoping there was something on the context that I could use to pass along the filename.
a
Hmm, I'm not sure how you're calling things so it's hard to be specific here With the way I described things, IOManager's
handle_output()
should be called once the
__downlaod()
op completes execution
load_input()
should get called when any downstream
op
(say
analyze_downloaded_stream()
is invoked. So it
self.filename
should have been defined by then I wrote up a simple working example that (among other things) differentiates between CustomIOManager and
fs_io_manager
here: https://bakerwho.github.io/posts/datascience/Deployable-Dagster-MVP/ Running it might clear things up for you.
Separately, if you're passing
filename
to the Custom IOManager, you should add an
__init__()
definition to the class. Something like
Copy code
class ChunkIOManager(dagster.IOManager):
    def __init__(self, filename):
        self.filename = filename

    def load_input(self, context):
        ...
s
So this is all dynamic, I don't know the name of the file until I get it from the prior
op
in which case the op before
__download
outputs a
yield DynamicOutput(download_url, mapping_key=str(idx))
Where
download_url
is a tuple of the filename and the url. In the
write_file_to_disk
I was able to pass the
filename
and response object by returning them as a tuple again and having the chunk accept the two, get the filename, and save the data. However I think my problem might lie now with the formation of the Dynamic output in the job:
Copy code
files = get_download_urls().map(write_file_to_disk)

copy_to_raw(files.collect())
my problem seems to be that before the
copy_to_raw
loads its ReInitializing the io_manager.
Copy code
Starting initialization of resources [custom_chunk_io_manager, io_manager].
a
My guess is that isn't the issue - pretty standard for all IOManagers used to be initialized (IOManagers are just a specific kind of Dagster
resource
)
I haven't dealt with ops that are actually generators - that might be causing complications. @Dagster Bot are you sentient enough to help here?
d
Invalid command. Did you mean to create an issue or a discussion? Try
@dagster_bot <issue|docs|discussion> <title>
s
@Aabir Abubaker Kar so I just put a logger on the io_manager declaration:
Copy code
@io_manager(config_schema={'base_path': Field(String)})
def chunk_io_manager(context):
    <http://context.log.info|context.log.info>("I am a new object")
    return ChunkIOManager(context.resource_config["base_path"])
During the step after download seems to be calling this again, getting a brand new object which wouldn't have the filename set. : \