[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-26 Thread Andrew Svetlov


Andrew Svetlov  added the comment:

I don't mind, sorry.

Feel free to make a pull request with the fix though.

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-26 Thread Géry

Géry  added the comment:

@Andrew Svetlov

> Adding a new state for "not running and not pending but something in between" 
> is useless

I have not suggested that. I have just reported that when the number of 
submitted calls is strictly greater than the number of pool worker processes, 
the number of RUNNING calls returned by the method `Future.running()` makes no 
sense. Probably because the current `ProcessPoolExecutor` implementation uses 2 
PENDING stores (the `_pending_work_items` `dict` and the `call_queue` 
`multiprocessing.Queue`) but treats the second store as a RUNNING store, 
contrary to the `ThreadPoolExecutor` implementation which has only 1 PENDING 
store (the `_work_queue` `queue.SimpleQueue`). The proper thing to do would be 
to correct the current implementation, not to create a new future state of 
course.

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-26 Thread Pablo Galindo Salgado


Pablo Galindo Salgado  added the comment:

I concur with Andrew

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-26 Thread Andrew Svetlov


Andrew Svetlov  added the comment:

Adding a new state for "not running and not pending but something in between" 
is useless, it can make .running() result a little more accurate but doesn't 
improve the real functionality.

The easy "fix" is documentation updating to point that the value returned by 
running is an approximation. That's true anyway because the future may change 
its state between reading self._state and returning a result.

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-26 Thread Géry

Géry  added the comment:

@Pablo Galindo Salgado

Thank you for the debugging information. I would have expected 8 "Adding a new 
item to the call_queue" instead of 3, since I submitted 8 calls to the process 
pool.

The concurrent.futures._base module defines 5 future states:

> _FUTURE_STATES = [
> PENDING,
> RUNNING,
> CANCELLED,
> CANCELLED_AND_NOTIFIED,
> FINISHED
> ]

The concurrent.futures.process module explains the job flow:

> Local worker thread:
> - reads work ids from the "Work Ids" queue and looks up the corresponding
>   WorkItem from the "Work Items" dict: if the work item has been cancelled 
> then
>   it is simply removed from the dict, otherwise it is repackaged as a
>   _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
>   until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
>   calls placed in the "Call Q" can no longer be cancelled with 
> Future.cancel().
> - reads _ResultItems from "Result Q", updates the future stored in the
>   "Work Items" dict and deletes the dict entry

So for implementation reasons (allowing `Future.cancel()`), submitted calls are 
put in an intermediary unbounded pending dict instead of directly in the call 
queue (which should be the pending queue), and this queue is bounded instead of 
being unbounded. Note that is not the case for the concurrent.futures.thread 
module for which calls are directly submitted to an unbounded call queue 
(pending queue).

And for optimization reasons, the chosen size for this bounded call queue is 
`max_workers + 1` instead of `max_workers`, as defined here:

> # Create communication channels for the executor
> # Make the call queue slightly larger than the number of processes to
> # prevent the worker processes from idling. But don't make it too big
> # because futures in the call queue cannot be cancelled.
> queue_size = self._max_workers + EXTRA_QUEUED_CALLS
> self._call_queue = _SafeQueue(
> max_size=queue_size, ctx=self._mp_context,
> pending_work_items=self._pending_work_items)

and here:

> # Controls how many more calls than processes will be queued in the call 
> queue.
> # A smaller number will mean that processes spend more time idle waiting for
> # work while a larger number will make Future.cancel() succeed less frequently
> # (Futures in the call queue cannot be cancelled).
> EXTRA_QUEUED_CALLS = 1

PENDING calls are put in the call queue until the queue is full:

> while True:
> if call_queue.full():
> return
> try:
> work_id = work_ids.get(block=False)
> except queue.Empty:
> return
> else:
> work_item = pending_work_items[work_id]
> 
> if work_item.future.set_running_or_notify_cancel():
> call_queue.put(_CallItem(work_id,
>  work_item.fn,
>  work_item.args,
>  work_item.kwargs),
>block=True)
> else:
> del pending_work_items[work_id]
> continue

The state of the call is updated to RUNNING right *before* the call is put in 
the call queue by the method `Future.set_running_or_notify_cancel()` instead of 
when the call is consumed from the call queue by a worker process here:

> while True:
> call_item = call_queue.get(block=True)
> if call_item is None:
> # Wake up queue management thread
> result_queue.put(os.getpid())
> return
> try:
> r = call_item.fn(*call_item.args, **call_item.kwargs)
> except BaseException as e:
> exc = _ExceptionWithTraceback(e, e.__traceback__)
> _sendback_result(result_queue, call_item.work_id, exception=exc)
> else:
> _sendback_result(result_queue, call_item.work_id, result=r)
> 
> # Liberate the resource as soon as possible, to avoid holding onto
> # open files or shared memory that is not needed anymore
> del call_item

So when creating a process pool of 2 workers, a call queue of size 3 is 
created. When submitting 8 calls to the pool, all of them are put in a pending 
dict, then 3 of 8 of them are updated from the PENDING state to the RUNNING 
state and put in the call queue, then 2 of 3 are consumed by the 2 workers 
leaving the call queue with 2 empty places and finally 2 of 5 remaining calls 
in the pending dict are updated from the PENDING state to the RUNNING state and 
put in the call queue. So to my understanding, in the end the current 
implementation should show 5 RUNNING states (2 calls currently being executed 
by the 2 workers, 3 calls pending in the call queue).
On MacOS I get these 5 RUNNING states, but sometimes 4. On Windows always 3.
But this is wrong in all cases: the user expects 2, since RUNNING = "being 
executed by a worker 

[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-25 Thread Pablo Galindo Salgado


Pablo Galindo Salgado  added the comment:

All the pending work items are added to a call queue that the processes consume 
(_add_call_item_to_queue) and the status is set before adding it to said queue 
(by calling set_running_or_notify_cancel()). Notice that the fact that 
future.running() == True does not mean that a worker has picked up the work 
item. The worker function (_process_worker for the ProcessPoolExecutor) gets 
items for this queue and then sends back results in another queue (result 
queue). If you add extra debugging you will see that only two items are 
dequeued from the call_queue:


--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -231,6 +231,7 @@ def _process_worker(call_queue, result_queue, initializer, 
initargs):
 return
 while True:
 call_item = call_queue.get(block=True)
+print("Worker: Getting a new item")
 if call_item is None:
 # Wake up queue management thread
 result_queue.put(os.getpid())
@@ -277,6 +278,7 @@ def _add_call_item_to_queue(pending_work_items,
 work_item = pending_work_items[work_id]
 
 if work_item.future.set_running_or_notify_cancel():
+print("Adding a new item to the call_queue")
 call_queue.put(_CallItem(work_id,
  work_item.fn,
  work_item.args,
(END)


Executing your script:

import time, concurrent.futures

def call():
while True:
time.sleep(1)


if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(call) for _ in range(8)]
time.sleep(2)

for future in futures:
print(future.running())

Prints:

Adding a new item to the call_queue
Adding a new item to the call_queue
Adding a new item to the call_queue
Worker: Getting a new item
Worker: Getting a new item
True
True
True
False
False
False
False
False

--
nosy: +pablogsal

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-25 Thread Géry

Géry  added the comment:

@Ned Deily

Okay, I did not know if I had to list the potentially interested people 
(according to their Github contribution on the module for instance) or let them 
do it themselves. Thank you for clarifying.

@Carol Willing

The number of RUNNING futures is always `max_workers + 1` for 
`ProcessPoolExecutor` but only on Windows (CPython 3.7, Windows 10). On MacOS, 
this number varies, depending on the time you wait before calling 
`print(future.running())`.
So to reproduce you could add the expression `time.sleep(n)` right after  the 
statement `futures = [executor.submit(call) for _ in range(8)]` and see if the 
number of RUNNING futures varies with n.

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-25 Thread Carol Willing


Carol Willing  added the comment:

I've run the code snippet several times on Mac 10.14.5 with Python 3.7.3. I'm 
not able to replicate your result for the `ProcessPoolExecutor` but can 
replicate results for `ThreadPoolExecutor`. Do you have another example where 
you are seeing this behavior?

--
nosy: +willingc

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-25 Thread Ned Deily


Change by Ned Deily :


--
nosy:  -ned.deily

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-25 Thread Ned Deily


Ned Deily  added the comment:

@maggyero, Please do not spam a list of people by add their names to issues; it 
will not speed a resolution.  Let the people doing bug triage evaluate the 
issue and, if necessary, nosy the appropriate developers.

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-25 Thread Géry

Géry  added the comment:

Initial post: https://stackoverflow.com/q/56587166/2326961

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-14 Thread STINNER Victor


Change by STINNER Victor :


--
nosy:  -vstinner

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue37276] Incorrect number of running calls in ProcessPoolExecutor

2019-06-14 Thread Géry

New submission from Géry :

In the `concurrent.futures` standard module, the number of running calls in a 
`ProcessPoolExecutor` is `max_workers + 1` (unexpected) instead of 
`max_workers` (expected) like in a `ThreadingPoolExecutor`.

The following code snippet which submits 8 calls to 2 workers in a 
`ProcessPoolExecutor`:

import concurrent.futures
import time


def call():
while True:
time.sleep(1)


if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(call) for _ in range(8)]

for future in futures:
print(future.running())

prints this (3 running calls; unexpected since there are 2 workers):

> True
> True
> True
> False
> False
> False
> False
> False

while using a `ThreadPoolExecutor` prints this (2 running calls; expected):

> True
> True
> False
> False
> False
> False
> False
> False

Tested on both Windows 10 and MacOS 10.14.

--
components: Library (Lib)
messages: 345553
nosy: asvetlov, bquinlan, inada.naoki, lukasz.langa, maggyero, ned.deily, 
pitrou, serhiy.storchaka, vstinner
priority: normal
severity: normal
status: open
title: Incorrect number of running calls in ProcessPoolExecutor
type: behavior
versions: Python 3.7

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com