New submission from Kyle Stanley <aeros...@gmail.com>:

This feature enhancement issue is based on the following python-ideas thread: 
https://mail.python.org/archives/list/python-id...@python.org/thread/ZSUIFN5GTDO56H6LLOPXOLQK7EQQZJHJ/

In summary, the main suggestion was to implement a new parameter called 
"cancel" (some bikeshedding over the name is welcome, I was thinking 
"cancel_futures" might be another option) for Executor.shutdown(), that would 
be added to both ThreadPoolExecutor and ProcessPoolExecutor. When set to True, 
this parameter would cancel all pending futures that were scheduled to the 
executor just after setting self._shutdown.

In order to build some experience in working with the internals of the 
executors (particularly for implementing native pools in asyncio in the 
future), I plan on working on this issue; assuming Antoine and/or Brian are +1 
on it. Guido seemed to approve of the idea.

The implementation in ThreadPoolExecutor should be fairly straightforward, as 
it would use much of the same logic that's in the private method 
_initializer_failed() 
(https://github.com/python/cpython/blob/fad8b5674c66d9e00bb788e30adddb0c256c787b/Lib/concurrent/futures/thread.py#L205-L216).
 Minus the setting of self._broken, and cancelling each of the work_items 
(pending futures) instead of setting the BrokenThreadPool exception.

For ProcessPoolExecutor, I'll likely have to spend some more time looking into 
the implementation details of it to figure out how the cancellation will work. 
IIUC, it would involve adding some additional logic in 
_queue_management_worker(), the function which is used by the queue management 
thread to communicate between the main process and the worker processes spawned 
by ProcessPoolExecutor.

Specifically, in the "if shutting_down()" block 
(https://github.com/python/cpython/blob/fad8b5674c66d9e00bb788e30adddb0c256c787b/Lib/concurrent/futures/process.py#L432-L446),
 I think we could add an additional conditional check to see if 
self._cancel_pending_work_items is true (new internal flag set during 
executor.shutdown() if *cancel* is true, just after setting 
"self._shutdown_thread = True"). In this block, it would iterate through the 
pending work items, and cancel their associated future. Here's a rough example 
of what I have in mind:

```
        if shutting_down():
            try:
                # Flag the executor as shutting down as early as possible if it
                # is not gc-ed yet.
                if executor is not None:
                    executor._shutdown_thread = True
+                  if executor._cancel_pending_work_items:
+                      # We only care about the values in the dict, which are
+                      # the actual work items.
+                      for work_item in pending_work_items.values():
+                          work_item.future.cancel()
                # Since no new work items can be added, it is safe to shutdown
                # this thread if there are no pending work items.
                if not pending_work_items:
                    shutdown_worker()
                    return
            except Full:
                # This is not a problem: we will eventually be woken up (in
                # result_queue.get()) and be able to send a sentinel again.
                pass
```

Would something along the lines of the above be a potentially viable method of 
implementing the *cancel* parameter for ProcessPoolExecutor.shutdown()? The 
main downside to this implementation is that it can't cancel anything that is 
already running (pushed from pending_work_items to call_queue). But from my 
understanding, there isn't a viable means of cancelling anything in the call 
queue; at that point it's too late.

Anyways, I'll work on the ThreadPoolExecutor implementation in the meantime. As 
mentioned previously, that one should be more straightforward. After getting it 
working, I'll open a PR for just ThreadPoolExecutor, and then work on 
ProcessPoolExecutor in another PR after receiving some feedback on the above 
idea.

----------
assignee: aeros
messages: 360093
nosy: aeros, bquinlan, pitrou
priority: normal
severity: normal
stage: needs patch
status: open
title: Add "cancel" parameter to concurrent.futures.Executor.shutdown()
type: enhancement
versions: Python 3.9

_______________________________________
Python tracker <rep...@bugs.python.org>
<https://bugs.python.org/issue39349>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to