On May 8, 2014 12:57 PM, "Andrew McLean" <li...@andros.org.uk> wrote: > So far so good. However, I thought this would be an opportunity to > explore concurrent.futures and to see whether it offered any benefits > over the more explicit approach discussed above. The problem I am having > is that all the discussions I can find of the use of concurrent.futures > show use with toy problems involving just a few tasks. The url > downloader in the documentation is typical, it proceeds as follows: > > 1. Get an instance of concurrent.futuresThreadPoolExecutor > 2. Submit a few tasks to the executer > 3. Iterate over the results using concurrent.futures.as_completed > > That's fine, but I suspect that isn't a helpful pattern if I have a very > large number of tasks. In my case I could run out of memory if I tried > submitting all of the tasks to the executor before processing any of the > results.
I thought that ThreadPoolExecutor.map would handle this transparently if you passed it a lazy iterable such as a generator. From my testing though, that seems not to be the case; with a generator of 100000 items and a pool of 2 workers, the entire generator was consumed before any results were returned. > I'm guessing what I want to do is, submit tasks in batches of perhaps a > few hundred, iterate over the results until most are complete, then > submit some more tasks and so on. I'm struggling to see how to do this > elegantly without a lot of messy code just there to do "bookkeeping". > This can't be an uncommon scenario. Am I missing something, or is this > just not a job suitable for futures? I don't think it needs to be "messy". Something like this should do the trick, I think: from concurrent.futures import * from itertools import islice def batched_pool_runner(f, iterable, pool, batch_size): it = iter(iterable) # Submit the first batch of tasks. futures = set(pool.submit(f, x) for x in islice(it, batch_size)) while futures: done, futures = wait(futures, return_when=FIRST_COMPLETED) # Replenish submitted tasks up to the number that completed. futures.update(pool.submit(f, x) for x in islice(it, len(done))) yield from done
-- https://mail.python.org/mailman/listinfo/python-list