Artem S
12/02/2021, 4:57 PM@d.op(out=d.DynamicOut())
def produce_dynamic_range():
start, stop, offset = 0, 86399, 1800
while start < stop:
start_offset = start + offset - 1
yield d.DynamicOutput(
{"start": start, "stop": start_offset},
f"{start}_{start_offset}"
)
start = min(start + offset, stop)
@d.op()
def extract(data):
# just pass scalar
time.sleep(5)
return 1
@d.op()
def load(data):
# just bypass
time.sleep(10)
return data
@d.graph()
def composite(data):
return load(extract(data))
@d.job(
resource_defs={**default_resources},
tags={**default_k8s_tags},
config={**multiprocess_10}
)
def solve():
result = produce_dynamic_range().map(composite).collect()
Somewhere in the middle of execution:
1000@dagster-run-b7381b03-8f34-4ecc-bf2d-95d3e3ebd385-nsrmq:/opt/dagster/app$ ps aux | grep python
1000 1 1.6 2.0 164300 126760 ? Ss 16:41 0:04 /opt/venv/bin/python -m dagster api execute_run {“class”: “ExecuteRunArgs”, “instance_ref”: null, “pipeline_origin”: {“class”: “PipelinePythonOrigin”, “pipeline_name”: “solve”, “repository_origin”: {“class”: “RepositoryPythonOrigin”, “code_pointer”: {“class”: “FileCodePointer”, “fn_name”: “main_test_repository”, “python_file”: “repository.py”, “working_directory”: “/opt/dagster/app”}, “container_image”: null, “executable_path”: “/opt/venv/bin/python”}}, “pipeline_run_id”: “b7381b03-8f34-4ecc-bf2d-95d3e3ebd385”}
1000 8 0.0 0.2 16792 13024 ? S 16:41 0:00 /opt/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(3)
1000 13 0.0 0.0 0 0 ? Z 16:41 0:00 [python] <defunct>
1000 137 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 140 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 142 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 162 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 167 0.1 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 170 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 186 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct>
1000 193 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct>
1000 195 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct>
1000 205 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct>
1000 219 0.1 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 221 0.1 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 229 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 237 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 245 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 253 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 267 0.3 0.0 0 0 ? Z 16:45 0:00 [python] <defunct>
1000 269 0.2 0.0 0 0 ? Z 16:45 0:00 [python] <defunct>
1000 277 0.4 0.0 0 0 ? Z 16:45 0:00 [python] <defunct>
1000 285 0.6 0.0 0 0 ? Z 16:45 0:00 [python] <defunct>
1000 289 10.3 2.0 311488 126428 ? Sl 16:45 0:02 /opt/venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=14) --multiprocessing-fork
1000 293 0.6 0.2 16896 13048 ? S 16:45 0:00 /opt/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(5)
1000 297 21.4 1.9 306072 121140 ? Sl 16:46 0:01 /opt/venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=14) --multiprocessing-fork
1000 301 3.4 0.2 16968 13192 ? S 16:46 0:00 /opt/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(5)
1000 305 0.0 0.0 10380 5852 ? R 16:46 0:00 /opt/venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=14) --multiprocessing-fork
1000 307 0.0 0.0 4844 880 pts/0 S+ 16:46 0:00 grep python
It’s looks like some processes wasn’t joined properly.daniel
12/02/2021, 6:04 PMArtem S
12/02/2021, 6:05 PMdaniel
12/02/2021, 6:07 PMArtem S
12/02/2021, 6:08 PMdaniel
12/02/2021, 6:49 PMArtem S
12/02/2021, 6:52 PMdaniel
12/02/2021, 10:48 PMArtem S
12/03/2021, 9:43 AMdaniel
12/03/2021, 10:14 AMArtem S
12/03/2021, 10:57 AMAnoop Sharma
03/21/2022, 8:20 PM