Hi! Yesterday i got a problem with parallel execut...
# ask-community
a
Hi! Yesterday i got a problem with parallel execution in k8s. Dagster spawns a zombie processes who will alive until job will completed. There is a simple job:
Copy code
@d.op(out=d.DynamicOut())
def produce_dynamic_range():
    start, stop, offset = 0, 86399, 1800

    while start < stop:
        start_offset = start + offset - 1
        yield d.DynamicOutput(
            {"start": start, "stop": start_offset},
            f"{start}_{start_offset}"
        )
        start = min(start + offset, stop)


@d.op()
def extract(data):
    #   just pass scalar
    time.sleep(5)
    return 1


@d.op()
def load(data):
    #   just bypass
    time.sleep(10)
    return data


@d.graph()
def composite(data):
    return load(extract(data))


@d.job(
    resource_defs={**default_resources},
    tags={**default_k8s_tags},
    config={**multiprocess_10}
)
def solve():
    result = produce_dynamic_range().map(composite).collect()
Somewhere in the middle of execution:
1000@dagster-run-b7381b03-8f34-4ecc-bf2d-95d3e3ebd385-nsrmq:/opt/dagster/app$ ps aux | grep python
1000 1 1.6 2.0 164300 126760 ? Ss 16:41 0:04 /opt/venv/bin/python -m dagster api execute_run {“class”: “ExecuteRunArgs”, “instance_ref”: null, “pipeline_origin”: {“class”: “PipelinePythonOrigin”, “pipeline_name”: “solve”, “repository_origin”: {“class”: “RepositoryPythonOrigin”, “code_pointer”: {“class”: “FileCodePointer”, “fn_name”: “main_test_repository”, “python_file”: “repository.py”, “working_directory”: “/opt/dagster/app”}, “container_image”: null, “executable_path”: “/opt/venv/bin/python”}}, “pipeline_run_id”: “b7381b03-8f34-4ecc-bf2d-95d3e3ebd385”} 1000 8 0.0 0.2 16792 13024 ? S 16:41 0:00 /opt/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(3) 1000 13 0.0 0.0 0 0 ? Z 16:41 0:00 [python] <defunct> 1000 137 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct> 1000 140 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct> 1000 142 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct> 1000 162 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct> 1000 167 0.1 0.0 0 0 ? Z 16:42 0:00 [python] <defunct> 1000 170 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct> 1000 186 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct> 1000 193 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct> 1000 195 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct> 1000 205 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct> 1000 219 0.1 0.0 0 0 ? Z 16:44 0:00 [python] <defunct> 1000 221 0.1 0.0 0 0 ? Z 16:44 0:00 [python] <defunct> 1000 229 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct> 1000 237 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct> 1000 245 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct> 1000 253 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct> 1000 267 0.3 0.0 0 0 ? Z 16:45 0:00 [python] <defunct> 1000 269 0.2 0.0 0 0 ? Z 16:45 0:00 [python] <defunct> 1000 277 0.4 0.0 0 0 ? Z 16:45 0:00 [python] <defunct> 1000 285 0.6 0.0 0 0 ? Z 16:45 0:00 [python] <defunct> 1000 289 10.3 2.0 311488 126428 ? Sl 16:45 0:02 /opt/venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=14) --multiprocessing-fork 1000 293 0.6 0.2 16896 13048 ? S 16:45 0:00 /opt/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(5) 1000 297 21.4 1.9 306072 121140 ? Sl 16:46 0:01 /opt/venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=14) --multiprocessing-fork 1000 301 3.4 0.2 16968 13192 ? S 16:46 0:00 /opt/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(5) 1000 305 0.0 0.0 10380 5852 ? R 16:46 0:00 /opt/venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=14) --multiprocessing-fork 1000 307 0.0 0.0 4844 880 pts/0 S+ 16:46 0:00 grep python It’s looks like some processes wasn’t joined properly.
d
hi artem - do you know if there were any other failures during the job run when this happened? any logs about subprocesses crashing or failing to complete?
a
I didn’t spoke any failures in dagit ui. Or u mean pod logs?
d
Yeah, either - I'm just looking through the code and the only place I see where a join wouldn't be happening would be if the subprocess crashed or never finished: https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/core/executor/child_process_executor.py?L137-159
a
yeah, i lookup this part too
i will try to collect logs from pod
I saw only debug level messages about computation steps and passing intermediate results to s3 storage. Maybe you can tell me what exactly i need to check? Or i can just attach logfiles?
d
we can see if we can reproduce locally. does it happen every time you run the job?
a
Yes. Doesn’t matter how it was triggered: manually or through schedule - the same result.
d
hmmmm interesting, i'm not able to reproduce this locally when i run that pipeline (No zombie processes, they're all cleaned up). Does it happen for you when you run it locally too? or just in k8s?
a
This is first step what i did, i cannot reproduce it locally, only in k8s. One important thing: this behavior we got only on python 3.7. With python 3.8 everything seems is ok.
d
Are the zombie processes causing problems for you?
a
Jobs completed properly. For me personally isn’t problem, but our devops are sad when they see triggers for a large number of processes inside the pod 🙂
a
@daniel I am having this same problem using docker. There's a defunct process started for every op but it doesn't get killed after the op's done executing. Not able to to kill these processes manually either from outside the docker. Only when I stop the container, these get removed. Any suggestions?