[issue40110] multiprocessing.Pool.imap() should be lazy

2020-04-03 Thread Tim Peters


Tim Peters  added the comment:

Whenever there's parallel processing with communication, there's always the 
potential for producers to pump out data faster than consumers can process 
them.  But builtin primitives generally don't try to address that directly.  
They don't - and can't - know enough about the application's intent.

Instead, as I briefly alluded to earlier, mediation (when needed) is frequently 
accomplished by users by explicit use of bounded queues.  When process A 
produces data for process B, it sends the data over a bounded queue.  Nothing 
is done to slow A down, except that when a bounded queue is full, an attempt to 
enqueue a new piece of data blocks until B removes some old data from the 
queue.  That's a dead easy, painless, and foolproof way to limit A's speed to 
the rate at which B can consume data.  Nothing in A's logic changes - it's the 
communication channel that applies the brakes.

I'll attach `pipe.py` to illustrate.  It constructs a 10-stage pipeline.  The 
first process in the chain *could* produce data at a ferocious rate - but the 
bounded queue connecting it to the next process slows it to the rate at which 
the second process runs.  All the other processes in the pipeline grab data 
from the preceding process via a bounded queue, work on it for a second (just a 
sleep(1) here), then enqueue the result for the next process in the pipeline.  
The main program loops, pulling data off the final process as fast as results 
show up.

So, if you run it, you'll see that new data is produced once per second, then 
when the pipeline is full final results are delivered once per second.  When 
the first process is done, results continue to be pulled off one per second 
until the pipeline is drained.

The queues here have maximum size 1, just to show that it works.  In practice, 
a larger bound is usually used, to allow for that processes in real life often 
take varying amounts of time depending on the data they're currently 
processing.  Letting a handful of work items queue up keeps processes busy - if 
processing some piece of data goes especially fast, fine, there may be more in 
the input queue waiting to go immediately.  How many is "a handful"?  Again, 
that's something that can't in general be guessed - the programmer has to apply 
their best judgment based on their deep knowledge of intended application 
behavior.

Just to be cute ;-) , the code here uses imap() to start the 10 worker 
processes.  The result of calling imap() is never used, it's just an easy way 
to apply a Pool to the task.  And it absolutely relies on that imap() consumes 
its iterable (range(NSTAGES)) as fast as it can, to get the NSTAGES=10 worker 
processes running.

--
Added file: https://bugs.python.org/file49032/pipe.py

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue40110] multiprocessing.Pool.imap() should be lazy

2020-04-01 Thread Nick Guenther


Nick Guenther  added the comment:

Thank you for taking the time to consider my points! Yes, I think you 
understood exactly what I was getting at.

I slept on it and thought about what I'd posted the day after and realized most 
of the points you raise, especially that serialized next() would mean 
serialized processing. So the naive approach is out.

I am motivated by trying to introduce backpressure to my pipelines. The example 
you gave has potentially infinite memory usage; if I simply slow it down with 
sleep() I get a memory leak and the main python proc pinning my CPU, even 
though it "isn't" doing anything:

with multiprocessing.Pool(4) as pool:
for i, v in enumerate(pool.imap(worker, itertools.count(1)), 1):
time.sleep(7)
print(f"At {i}: {v}, memory usage is {sys.getallocatedblocks()}")

At 1->1, memory usage is 230617
At 2->8, memory usage is 411053
At 3->27, memory usage is 581439
At 4->64, memory usage is 748584
At 5->125, memory usage is 909694
At 6->216, memory usage is 1074304
At 7->343, memory usage is 1238106
At 8->512, memory usage is 1389162
At 9->729, memory usage is 1537830
At 10->1000, memory usage is 1648502
At 11->1331, memory usage is 1759864
At 12->1728, memory usage is 1909807
At 13->2197, memory usage is 2005700
At 14->2744, memory usage is 2067407
At 15->3375, memory usage is 2156479
At 16->4096, memory usage is 2240936
At 17->4913, memory usage is 2328123
At 18->5832, memory usage is 2456865
At 19->6859, memory usage is 2614602
At 20->8000, memory usage is 2803736
At 21->9261, memory usage is 2999129

  PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+ COMMAND 
11314 kousu 20   0  303308  40996   6340 S  91.4  2.1   0:21.68 python3.8   
11317 kousu 20   0   54208  10264   4352 S  16.2  0.5   0:03.76 python3.8   
11315 kousu 20   0   54208  10260   4352 S  15.8  0.5   0:03.74 python3.8   
11316 kousu 20   0   54208  10260   4352 S  15.8  0.5   0:03.74 python3.8   
11318 kousu 20   0   54208  10264   4352 S  15.5  0.5   0:03.72 python3.8 

It seems to me like any usage of Pool.imap() either has the same issue lurking 
or is run on a small finite data set where you are better off using Pool.map().

I like generators because they keep constant-time constant-memory work, which 
seems like a natural fit for stream processing situations. Unix pipelines have 
backpressure built-in, because write() blocks when the pipe buffer is full.

I did come up with one possibility after sleeping on it again: run the final 
iteration in parallel, perhaps by a special plist() method that makes as many 
parallel next() calls as possible. There's definitely details to work out but I 
plan to prototype when I have spare time in the next couple weeks.

You're entirely right that it's a risky change to suggest, so maybe it would be 
best expressed as a package if I get it working. Can I keep this issue open to 
see if it draws in insights from anyone else in the meantime?

Thanks again for taking a look! That's really cool of you!

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue40110] multiprocessing.Pool.imap() should be lazy

2020-03-31 Thread Tim Peters


Tim Peters  added the comment:

"Lazy" has several possible aspects, of which Pool.imap() satisfies some:

- Its iterable argument is materialized one object at a time.

- It delivers results one at a time.

So, for example, if `worker` is a function that takes a single int, then

pool = multiprocessing.Pool(4)
for i in pool.imap(worker, itertools.count(1)):
print(i)

works fine, despite that the iterable argument, and the result sequence, are 
"infinite".

You seem to have something more severe in mind, more along the lines of that 
the iterable isn't advanced unless it absolutely _needs_ to be advanced in 
order to deliver a result that's being demanded.  That's how, e.g., the builtin 
Python 3 `map()` works.

But if the iterable isn't advanced until the main program _demands_ the next 
result from imap(), then the main program blocks until the machinery peels off 
the next object from the iterable, picks a worker to send it to, sends it, 
waits for the worker to deliver the result back on an internal queue, then 
delivers the result to the main program.  There's no parallelism then.

The way things are now, imap() consumes the iterable as quickly as possible, 
keeping all workers as busy as possible, regardless of how quickly (or even 
whether) results are demanded.  And seems to me that's overwhelmingly what 
people using multiprocessing would want.  In any case, that's what they _have_, 
so that couldn't be changed lightly (if it all).

Perhaps it would be more profitable to think about ways to implement your 
pipelines using other primitives?  For example, the first thing I'd try for an 
N-stage pipeline is a chain of N processes (not in a Pool) connected one to the 
next by queues.  If for some reason I was determined not to let any process 
"get ahead", easy - specify a max size of 1 for the queues.  map-like 
facilities are inherently SIMD style, but pipelines typically have very 
different code in different stages.

--
nosy: +tim.peters

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue40110] multiprocessing.Pool.imap() should be lazy

2020-03-31 Thread Raymond Hettinger


Change by Raymond Hettinger :


--
components: +Library (Lib)
nosy: +rhettinger
type:  -> enhancement
versions: +Python 3.9 -Python 2.7, Python 3.5, Python 3.6, Python 3.7, Python 
3.8

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue40110] multiprocessing.Pool.imap() should be lazy

2020-03-29 Thread Nick Guenther


New submission from Nick Guenther :

multiprocessing.Pool.imap() is supposed to be a lazy version of map. But it's 
not: it submits work to its workers eagerly. As a consequence, in a pipeline, 
all the work from earlier steps is queued, performed, and finished first, 
before starting later steps.

If you use python3's built-in map() -- aka the old itertools.imap() -- the 
operations are on-demand, so it surprised me that Pool.imap() doesn't. It's 
basically no better than using Pool.map(). Maybe it saves memory by not 
materializing large iterables in every worker process? But it still 
materializes the CPU time from the iterables even if unneeded.

This can be partially worked around by giving each step of the pipeline its own 
Pool -- then, at least the earlier steps of the pipeline don't block the later 
steps -- but the jobs are still done eagerly instead of waiting for their 
results to actually be requested.

--
files: multiprocessing-eager-imap.py
messages: 365295
nosy: kousu
priority: normal
severity: normal
status: open
title: multiprocessing.Pool.imap() should be lazy
versions: Python 2.7, Python 3.5, Python 3.6, Python 3.7, Python 3.8
Added file: https://bugs.python.org/file49010/multiprocessing-eager-imap.py

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com