Hello team, I have a question regarding dagster-ce...
# ask-community
k
Hello team, I have a question regarding dagster-celery-docker. I am running it on EC2, along with the docker run launcher. I am able to get jobs to kick off and in the example I tried, the first op ran successfully including the hooks attached to the op. It failed to start the second op, and threw an error about json decoding. Does anyone have an advice about what configuration may be missing here? I can see that the IO manager is connecting to s3 and is able to create folders for job runs and their step output. It looks to me like there is an issue I took a look at the core_execution_loop class in dagster-celery and it looks like its trying to deserialize the json output of one of the step events, and its failing. Looking at rabbit and flower, I'm not seeing anything reporting a failure. I'm running the latest version of dagster (0.14.13) for all of my dagster components. My run config looks like this:
execution:
config:
broker: <pyamqp://rabbitmq:rabbitmq@rabbitmq/dagster>
docker:
env_vars:
- DAGSTER_CURRENT_IMAGE
- DAGSTER_POSTGRES_HOST
- DAGSTER_POSTGRES_HOST_AUTH_METHOD
- DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB
- DAGSTER_POSTGRES_SCHEMA
image: pipelines:test_jn_celery
network: docker_network
ops:
.....
resources:
io_manager:
config:
s3_bucket: bucket
s3_prefix: prefix
d
Hi Keith - thanks for the stack trace, it looks like there is indeed something in the queue that isn't a serialized step event and dagster is getting confused. Any chance you could share or DM some job code that is triggering this? Our tests that use this executor aren't hitting this so I'm wondering if it's something specific about the code that's needed to reproduce the problem
k
hi @daniel, thanks for getting back to me. PFA a repro, feel free to ping me with any questions. Instructions are in the readme file. You will also need to update the run config with your s3_bucket and s3_prefix.
🙏 1
It would be great if this example can be added to the docs somewhere, its been difficult to get this configured and a simple working starting point would be helpful to others I'm sure.
d
Got it - I see the problem and it should be a quick fix. Will report back here when there's a PR / workaround. Thanks for the report! And noted about better examples here too, will see what we can do there.
k
Awesome! Thank you I appreciate the help.
d
Link to the fix PR here if you're curious - https://github.com/dagster-io/dagster/pull/7665 it should be no problem to get this out in the release this week. As you noted it stems from output in your resource being incorrectly picked up - a workaround until the fix is ready could be to suppress those logs if that's possible (or you could pull in the executor code in that PR and attach it to your job instead of the default executor)
k
Great thank you. Will test it out shortly
j
Hi Daniel, I've tried plugging this new executor you have made in your pull request into our dagster deployment along with the necessary utility change you made as well, but we are still receiving an error. This time it appears to be coming from the core_celery_execution_loop within the celery docker executor. This seems to be a different issue than the one you had previously addressed. The error is thrown in between the end of the first op execution (which executes successfully, with the resources logging properly) and before the second op is picked up for execution.
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/api.py", line 789, in pipeline_execution_iterator for event in pipeline_context.executor.execute(pipeline_context, execution_plan): File "/usr/local/lib/python3.9/site-packages/dagster_celery/core_execution_loop.py", line 101, in core_celery_execution_loop event = deserialize_json_to_dagster_namedtuple(step_event) File "/usr/local/lib/python3.9/site-packages/dagster/serdes/serdes.py", line 424, in deserialize_json_to_dagster_namedtuple dagster_namedtuple = _deserialize_json( File "/usr/local/lib/python3.9/site-packages/dagster/serdes/serdes.py", line 463, in _deserialize_json value = seven.json.loads(json_str) File "/usr/local/lib/python3.9/json/__init__.py", line 359, in loads return cls(**kw).decode(s) File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "/usr/local/lib/python3.9/json/decoder.py", line 355, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None
d
Hi Jim - that looks like the same error that Keith reported, yeah. Is it possible to triple check that you're using the new executor with the fix applied? Do you possibly have code I can run that reproduces the problem?
j
Hi Daniel, we've copied the executor files into the POC project that Keith sent you earlier and defined the newly copied executor in the job definition, and this error is still arising. It seems that you were right, our new executor code isn't being executed, as the core celery execution loop that is failing is located inside the python package files. Is there a way we can test this by installing the branch your pull request is built off of, or another way we can copy the executor in and be sure that it is using ours? Or should we wait for the release with this updated code in it
d
and defined the newly copied executor in the job definition
is it possible to share the code for this? I would expect that to be sufficient to fix the problem
k
Hi @daniel thanks for the code, I was able to make a few changes to the dockerfile and included the exector and utils classes from your PR. If I enable the print statement in the resource the job passes now instead of throwing the serialization error. We can use this for the time being and look forward to the merge and release. Thanks again.
condagster 1