https://dagster.io/ logo
#ask-ai
Title
# ask-ai
a

Andrew Grigorev

02/15/2024, 3:38 PM
@Scout what are caveats of using multiprocessing inside of dagster step handler? I get a bunch of this exceptions closing multiprocessing.Pool:
Copy code
Process ForkPoolWorker-16:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/multiprocessing/process.py", line 317, in _bootstrap
    util._exit_function()
  File "/usr/local/lib/python3.11/multiprocessing/util.py", line 337, in _exit_function
    _run_finalizers(0)
  File "/usr/local/lib/python3.11/multiprocessing/util.py", line 294, in _run_finalizers
    keys = [key for key in list(_finalizer_registry) if f(key)]
                           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_utils/interrupts.py", line 82, in _new_signal_handler
    raise error_cls()
dagster._core.errors.DagsterExecutionInterruptedError
s

Scout

02/15/2024, 3:38 PM
Using multiprocessing within a Dagster step handler can introduce some complexities due to the way Dagster manages process execution and resources. The documentation provides some insights into the execution model of Dagster, particularly with respect to the multiprocess executor, which executes each step in an individual process. One of the core design principles of Dagster's architecture is that user code never runs in the same process as Dagster system code, with all communication between the two happening over a structured API. This design allows users to run code in multiple isolated Python environments simultaneously, preventing issues in one user-defined job from destabilizing the system. When you use multiprocessing within a Dagster step, you are adding another layer of process management on top of Dagster's own process management. This can lead to several potential issues: 1. Resource Management: Dagster manages resources for steps, and using multiprocessing may interfere with this management, leading to resource leaks or contention. 2. Error Handling: Dagster wraps user code execution in an error boundary to catch exceptions and wrap them in a
DagsterUserCodeExecutionError
. When using multiprocessing, errors that occur in child processes need to be properly communicated back to the parent process and handled by Dagster's error boundary. 3. Process Cleanup: The error you are seeing indicates that there is an issue with cleaning up multiprocessing pools when a Dagster step is interrupted. Dagster has its own mechanisms for handling interrupts and graceful shutdowns, and using multiprocessing may conflict with these mechanisms. 4. Serialization: Data passed between processes must be serializable. Dagster's own execution plan involves serializable objects to pass information between steps. When using multiprocessing, you must ensure that all objects you pass between processes are serializable. 5. Debugging: Debugging issues that span multiple processes can be challenging, especially when dealing with process pools and worker processes. To address these issues, you may need to ensure that any multiprocessing code you write is robust against interrupts and can properly communicate errors back to the Dagster process. You should also make sure to clean up any resources used by multiprocessing, such as process pools, to avoid resource leaks. If you continue to encounter issues, it may be helpful to look into Dagster's own mechanisms for parallel execution, such as the multiprocess executor, which may provide a more integrated way to achieve parallelism within your Dagster jobs.
a

Andrew Grigorev

02/15/2024, 3:40 PM
@Scout The step is not interrupted, and job finishes cleanly, I just got this tracebacks in logs
s

Scout

02/15/2024, 3:40 PM
If you're seeing exceptions related to multiprocessing in the logs, but the job finishes cleanly, it suggests that there might be some edge cases or race conditions in the cleanup process of the multiprocessing pools that are not affecting the outcome of your job but are still being logged. Here are a few things to consider: 1. Graceful Shutdown: Ensure that your multiprocessing pools are being shut down gracefully. This involves calling
pool.close()
followed by
pool.join()
to ensure that all worker processes complete their work and exit cleanly before the main process exits. 2. Error Propagation: Even if the job completes successfully, worker processes might be encountering exceptions that are not properly propagated to the main process. Make sure that exceptions within worker processes are caught and handled or logged appropriately. 3. Logging Configuration: Dagster's logging system is designed to capture and structure logs from user code. If you're using multiprocessing, you might need to configure logging in the worker processes to ensure that logs are captured correctly and don't result in unhandled exceptions. 4. Concurrency Issues: There might be concurrency issues with the logging system itself, especially if multiple processes are trying to log messages at the same time. This can sometimes lead to race conditions that cause exceptions during the cleanup phase. 5. Dagster's Execution Context: If you're using multiprocessing within a Dagster step, ensure that you're not inadvertently passing Dagster's execution context objects to the child processes, as these are not meant to be serialized or used across process boundaries. 6. Dagster Execution Interrupted Error: The
DagsterExecutionInterruptedError
you're seeing is typically raised when a run is terminated by a signal, such as SIGTERM or SIGINT. If you're not intentionally interrupting the job, this might be a side effect of how the multiprocessing pool is being managed. Since the job is completing successfully, these exceptions might not be critical, but they do indicate that there's something unexpected happening during the cleanup of multiprocessing resources. To avoid these exceptions, you may need to review and adjust the way you're using multiprocessing within your Dagster steps, ensuring that resources are managed in a way that's compatible with Dagster's execution model. If the exceptions persist and you're unable to resolve them, consider reaching out to the Dagster community or filing an issue on the Dagster GitHub repository with details about your specific use case and the full traceback for further assistance.
2 Views