I'm trying out the celery toy example here: <https...
# announcements
a
I'm trying out the celery toy example here: https://docs.dagster.io/docs/deploying/celery. For some reason, my celery workers fail to run the job and fail with a
Couldn't import module_name
exception.
Here's the full exception:
Copy code
[2020-05-26 15:18:00,502: ERROR/ForkPoolWorker-8] Task execute_plan[54776a4f-3bbc-4aa1-b7e4-0f7ceca7e4dd] raised unexpected: CheckError("Failure condition: Couldn't import module module_name when attempting to rehydrate the configurable class module_name.class_name",)
Traceback (most recent call last):
  File "/home/al/.local/lib/python3.6/site-packages/dagster/serdes/__init__.py", line 298, in rehydrate
    module = importlib.import_module(self.module_name)
  File "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 994, in _gcd_import
  File "<frozen importlib._bootstrap>", line 971, in _find_and_load
  File "<frozen importlib._bootstrap>", line 953, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'module_name'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/al/.local/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/al/.local/lib/python3.6/site-packages/celery/app/trace.py", line 650, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/al/.local/lib/python3.6/site-packages/dagster_celery/tasks.py", line 32, in _execute_plan
    instance = DagsterInstance.from_ref(instance_ref)
  File "/home/al/.local/lib/python3.6/site-packages/dagster/core/instance/__init__.py", line 262, in from_ref
    local_artifact_storage=instance_ref.local_artifact_storage,
  File "/home/al/.local/lib/python3.6/site-packages/dagster/core/instance/ref.py", line 175, in local_artifact_storage
    return self.local_artifact_storage_data.rehydrate()
  File "/home/al/.local/lib/python3.6/site-packages/dagster/serdes/__init__.py", line 304, in rehydrate
    configurable_class=self.module_name + '.' + self.class_name,
  File "/home/al/.local/lib/python3.6/site-packages/dagster/check/__init__.py", line 109, in failed
    raise_with_traceback(CheckError('Failure condition: {desc}'.format(desc=desc)))
  File "/home/al/.local/lib/python3.6/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "/home/al/.local/lib/python3.6/site-packages/dagster/serdes/__init__.py", line 298, in rehydrate
    module = importlib.import_module(self.module_name)
  File "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 994, in _gcd_import
  File "<frozen importlib._bootstrap>", line 971, in _find_and_load
  File "<frozen importlib._bootstrap>", line 953, in _find_and_load_unlocked
dagster.check.CheckError: Failure condition: Couldn't import module module_name when attempting to rehydrate the configurable class module_name.class_name
I dug into the source a little to figure out what the issue might be and it appears the issue is somewhere in the serialization/deserialization process in
serdes
.
of course, it might just be a mistake I made but I tried to follow the toy example pretty faithfully.
any suggestions/pointers on how to address this?
I'm using 0.7.14 of dagit, dagster, and dagster-celery
m
hmm, i think that you may be trying to use a nonexistent
module_name.class_name
somewhere
possibly in your
dagster.yaml
?
can i see the contents of that file?
a
i didn't actually create this file for this example
should I have? what would I place in it? I'm really just trying to run that simple
celery_pipeline.py
file
m
can you run the following in your python environment
Copy code
from dagster import DagsterInstance
DagsterInstance.get().get_ref()
a
please give me just a bit
Copy code
$ python
Python 3.6.9 (default, Apr 18 2020, 01:56:04)
[GCC 8.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dagster import DagsterInstance
>>> DagsterInstance.get().get_ref()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/al/projects/toy-celery/venv/lib/python3.6/site-packages/dagster/core/instance/__init__.py", line 288, in get_ref
    check.failed('Can not produce an instance reference for {t}'.format(t=self))
  File "/home/al/projects/toy-celery/venv/lib/python3.6/site-packages/dagster/check/__init__.py", line 109, in failed
    raise_with_traceback(CheckError('Failure condition: {desc}'.format(desc=desc)))
  File "/home/al/projects/toy-celery/venv/lib/python3.6/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
dagster.check.CheckError: Failure condition: Can not produce an instance reference for <dagster.core.instance.DagsterInstance object at 0x7fbcbab44cf8>
m
@nate i think this might be an issue with docs -- do we require a non-ephemeral instance for the celery executor?
@alir do you have the environment variable
DAGSTER_HOME
set?
and at what step in the tutorial are things breaking for you?
a
I set it in the shell when running
dagit
(
DAGSTER_HOME=$(pwd) dagit -f celery_pipeline.py -n parallel_pipeline
) but not the celery worker. let me try again
i'm at the quick-start section, right where it says "Now you can execute the parallel pipeline from Dagit with the following config":
Copy code
execution:
  celery:
storage:
  filesystem:
m
and what are the contents of the
DAGSTER_HOME
directory
a
just that one file,
celery_pipeline.py
, with the following contents (copy/pasted from the tutorial):
Copy code
from dagster_celery import celery_executor

from dagster import ModeDefinition, default_executors, pipeline, solid

celery_mode_defs = [ModeDefinition(executor_defs=default_executors + [celery_executor])]

@solid
def not_much(_):
    return

@pipeline(mode_defs=celery_mode_defs)
def parallel_pipeline():
    for i in range(50):
        not_much.alias('not_much_' + str(i))()
and wel, the virtualenv stuff
that's what I created. There are some directories (
dagster_celery/
,
history
,
logs
,
schedules
,
storage
) created by dagster and celery. I didn't do anything to them
m
yep
a
setting
DAGSTER_HOME
when invoking dagit and dagster-celery did not work, unfortunately.
i wonder if there's something wrong with my environment. i'm assuming the same exact setup works for everyone else
m
yeah, we can't reproduce this
did you get the same error when you set
DAGSTER_HOME
?
and this is when executing from dagit?
a
yea, it must be an issue on my side then. let me restart from scratch. i'll report back
yea, this is when executing from dagit and yea, i have the same error
m
it seems as if you must have a
dagster.yaml
somewhere
n
and you’re working off of PyPI, not master, correct?
m
with the dummy values
"module_name"
and
"class_name"
somewhere
a
yep, i'm working off of PyPI
n
@alir yes, let me know if you’re able to repro from a clean restart—I just walked through the example you posted w/ a clean dagster/dagit/dagster-celery install and it seemed to work fine
a
i've good news. I was not able to reproduce this from a clean directory and I narrowed down the problem to the virtualenv I used in my initial report. If I use that particular virtualenv, I get the issue I reported. otherwise, it works as expected
i'll report back if I find out what's so special about the virtualenv that's causing the problem. I can't find any
dagster.yaml
there so I'm not sure.
Thanks for the help and time!
n
ok, thanks!