Question about dagstermill and intermediates. I ha...
# announcements
s
Question about dagstermill and intermediates. I have a pipeline that contains various dagstermill solids. The outputs from my solids are xarray DataArrays. When I call
dagstermill.yield_result(my_dataarray, 'name')
at the end of my notebook solid, I get an error-- it appears dagstermill is attempting to serialize the arrays using
scrapbook
, and this fails. How do I bypass scrapbook and use my own serialization logic? What I want to do is just keep an in-memory xarray dataset that stores my arrays. I attempted to do this by creating a custom
asset_store
, but this makes no difference-- dagstermill is still trying to use scrapbook when I call
dagstermill.yield_result
. Here is the code I used for the custom asset store, am I perhaps missing something important?
Copy code
python
import dagster as dg

class DatasetAssetStore(dg.AssetStore):

    def __init__(self):
        super(DatasetAssetStore, self)
        self.dataset = xr.Dataset()

    def get_asset(self, context):
        name = context.output_name
        return self.dataset[name]

    def set_asset(self, context, obj):
        name = context.output_name
        self.dataset[name] = obj

@dg.resource
def dataset_asset_store(_):
    return DatasetAssetStore()

dataset_mode = dg.ModeDefinition(
    resource_defs={"asset_store": dataset_asset_store}
)

my_pipeline = dg.PipelineDefinition(
    name='test',
    solid_defs=[...],
    dependencies={...},
    mode_defs=[dataset_mode]
)
m
Hi Sean
s
hi-- thanks for always being so responsive on here!
m
np
our pleasure
so
yield_result
serializes an output back from the notebook to the dagster machinery
if you set a key in some persistent store, that output could be, for instance, the key
and then a downstream solid could use the key to go access whatever it is that you set
if you can provide a resource that gives access to the persistent store you want, that's one pattern for doing this
i'm not familiar with xarray but would love to see the traceback you're getting
is saving to
netCDF
not the right thing to do (too slow, etc)?
s
so, two questions: (1) Do I have to persist the result to disk when using dagstermill, or can the dagster machine just hold an arbitrary Python object as a store in memory used to pass outputs between notebook solids?
m
it would be difficult but probably feasible; the issue is that notebooks run in isolated processes (Jupyter "kernels") and access to the kernels is quite restricted
s
OK-- well writing to NetCDF is workable
m
i can imagine solutions that would involve, e.g., mmapping a shared file into memory, and you would pass a handle
s
the question then is what do I need to do to override scrapbook, because I thought the code I posted would do that
m
yep, so scrapbook is the layer that papermill uses to get data in and out of the notebook
s
in other words can I make
yield_result
call
set_asset
on my provided
asset_store
resource
m
we don't currently have knobs to override that
s
instead of trying to write to the notebook with
scrapbook
ok, I see
m
no, not at present -- what you could do is use the
file_manager
resource to get a file handle to write out to, and then pass that handle
would love to see the scrapbook error though
s
ok, one sec:
m
i'm sure it isn't helpful and wonder if we could improve it
(feel free to open an issue -- i can investigate how invasive this would be to the underlying papermill layer, and it may be something they have anticipated or would be interested in)
s
papermill.exceptions.PapermillExecutionError:
---------------------------------------------------------------------------
Exception encountered at "In [4]":
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-4-6e97b5f4f935> in <module>
1 for k in ['neuron', 'orientation_bin', 'position_bin', 'position_tuning', 'orientation_tuning']:
----> 2     dm.yield_result(DS[k], k)
~/stm/code/apps/buzhc/analysis/.conda-env/lib/python3.8/site-packages/dagstermill/manager.py in yield_result(self, value, output_name)
270
271         out_file = os.path.join(self.marshal_dir, "output-{}".format(output_name))
--> 272         scrapbook.glue(output_name, write_value(dagster_type, value, out_file))
273
274     def yield_event(self, dagster_event):
~/stm/code/apps/buzhc/analysis/.conda-env/lib/python3.8/site-packages/scrapbook/utils.py in wrapper(*args, **kwds)
63         if not is_kernel():
64             warnings.warn("No kernel detected for '{fname}'.".format(fname=f.__name__))
---> 65         return f(*args, **kwds)
66
67     return wrapper
~/stm/code/apps/buzhc/analysis/.conda-env/lib/python3.8/site-packages/scrapbook/api.py in glue(name, data, encoder, display)
86             name, scrap_to_payload(encoder_registry.encode(Scrap(name, data, encoder))), encoder
87         )
---> 88         ip_display(ipy_data, metadata=metadata, raw=True)
89
90     # Only display data that is marked for display
~/stm/code/apps/buzhc/analysis/.conda-env/lib/python3.8/site-packages/IPython/core/display.py in display(include, exclude, metadata, transient, display_id, *objs, **kwargs)
309     for obj in objs:
310         if raw:
--> 311             publish_display_data(data=obj, metadata=metadata, **kwargs)
312         else:
313             format_dict, md_dict = format(obj, include=include, exclude=exclude)
~/stm/code/apps/buzhc/analysis/.conda-env/lib/python3.8/site-packages/IPython/core/display.py in publish_display_data(data, metadata, source, transient, **kwargs)
117         kwargs['transient'] = transient
118
--> 119     display_pub.publish(
120         data=data,
121         metadata=metadata,
~/.local/lib/python3.8/site-packages/ipykernel/zmqshell.py in publish(self, data, metadata, source, transient, update)
127         # hooks before potentially sending.
128         msg = self.session.msg(
--> 129             msg_type, json_clean(content),
130             parent=self.parent_header
131         )
~/.local/lib/python3.8/site-packages/ipykernel/jsonutil.py in json_clean(obj)
189         out = {}
190         for k,v in iteritems(obj):
--> 191             out[unicode_type(k)] = json_clean(v)
192         return out
193     if isinstance(obj, datetime):
~/.local/lib/python3.8/site-packages/ipykernel/jsonutil.py in json_clean(obj)
189         out = {}
190         for k,v in iteritems(obj):
--> 191             out[unicode_type(k)] = json_clean(v)
192         return out
193     if isinstance(obj, datetime):
~/.local/lib/python3.8/site-packages/ipykernel/jsonutil.py in json_clean(obj)
189         out = {}
190         for k,v in iteritems(obj):
--> 191             out[unicode_type(k)] = json_clean(v)
192         return out
193     if isinstance(obj, datetime):
~/.local/lib/python3.8/site-packages/ipykernel/jsonutil.py in json_clean(obj)
189         out = {}
190         for k,v in iteritems(obj):
--> 191             out[unicode_type(k)] = json_clean(v)
192         return out
193     if isinstance(obj, datetime):
~/.local/lib/python3.8/site-packages/ipykernel/jsonutil.py in json_clean(obj)
195
196     # we don't understand it, it's probably an unserializable object
--> 197     raise ValueError("Can't clean for JSON: %r" % obj)
ValueError: Can't clean for JSON: <xarray.DataArray 'neuron' (neuron: 1000)>
array([   1,    2,    3, ...,  998,  999, 1000])
Coordinates:
* neuron   (neuron) int64 1 2 3 4 5 6 7 8 ... 993 994 995 996 <tel:9979989991000|997 998 999 1000>
File "/Users/smackesey/stm/code/apps/buzhc/analysis/.conda-env/lib/python3.8/site-packages/dagster/core/errors.py", line 180, in user_code_error_boundary
yield
File "/Users/smackesey/stm/code/apps/buzhc/analysis/.conda-env/lib/python3.8/site-packages/dagstermill/solids.py", line 217, in _t_fn
raise exc
File "/Users/smackesey/stm/code/apps/buzhc/analysis/.conda-env/lib/python3.8/site-packages/dagstermill/solids.py", line 186, in _t_fn
papermill.execute_notebook(
File "/Users/smackesey/stm/code/apps/buzhc/analysis/.conda-env/lib/python3.8/site-packages/papermill/execute.py", line 108, in execute_notebook
raise_for_execution_errors(nb, output_path)
File "/Users/smackesey/stm/code/apps/buzhc/analysis/.conda-env/lib/python3.8/site-packages/papermill/execute.py", line 192, in raise_for_execution_errors
raise error
m
yeah, that is just terrible
would you mind opening an issue with that traceback?
i wonder if we'll be able to do anything good with it, it's deep in ipython, but maybe we can at least give some hints
s
sure
regarding your earlier suggestion though-- where exactly do I pass the file handle?
like, right now I just have my dataarray, and I call
dm.yield_result(array, name)
how should I change that call
m
thanks for this, i think this is a real weakness that we need to address somehow
um
i woud do sth like:
s
Regarding addressing the issue, what I have found to be the biggest hole in the docs is a clear explanation in one place of how intermediates are handled and the various abstractions for doing so (and configuring whether they will be persisted or not). There are a couple different pieces like
Asset Stores
and
SystemStorageDefinition
where it is not really clear how they interact.
m
yes, to some degree this reflects the underlying apis being in flux -- with 0.11.0 we should have significantly more streamlined system
very interested in your thoughts on how it should work to make the most sense though
yeah what i'm going to suggest here is not delightful but
s
I'll have to think on it, but certainly for dagstermill I think one shouldn't have to go through
scrapbook
m
yeah i think we can maybe drop scrapbook completely
which would greatly simplify this
this dovetails with a bunch of other dagstermill work, actually, and i can take a look this week
as a stopgap you may be able to do sth like the following:
Copy code
import dagstermill as dm

from dagster import ModeDefinition, local_file_manager

context = dm.get_context(
    mode_def=ModeDefinition(resource_defs={"file_manager": local_file_manager}),
    run_config={"resources": {"file_manager": {"config": {"base_dir": ...}}}},
)

file_handle = context.resources.file_manager.write_data(dataset.to_netcdf())

dm.yield_result('out_path', file_handle.path)
s
cool, yeah I have always thought of scrapbook as a tool for storing objects in the notebook file itself, not for writing to an arbitrary store
1
m
where when executed by dagstermill, the file manager will be whatever you're using in the pipeline
the dagstermill code in question predates all of our efforts around intermediates/assets
and revisiting it will i think yield a simpler and more effective design
s
I see
ok, so per your suggestion I need to pass paths rather than the array objects themselves as intermediates
m
yes i think that would be the best way to unblock you
while i look at a more principled fix
s
alright, that'll do as a workaround-- thanks for the help! I'll post an issue and link it here regarding the backtrace
m
yes thanks, and thank you for this report, really helpful in clarifying what we need to do here
s