Hi Maxime, many thanks for your great solution. It would be so great to have it in stock asyncio and use it out-of-the-box... I've made 4 fixes to it that are rather of "cosmetic" nature. Here is the final version:
import asyncio from concurrent import futures def as_completed_with_max_workers(tasks, *, loop=None, max_workers=5, timeout=None): loop = loop if loop is not None else asyncio.get_event_loop() workers = [] pending = set() done = asyncio.Queue(maxsize=max_workers, loop=loop) # Valery: respect the "loop" parameter exhausted = False timeout_handle = None # Valery: added to see, if we indeed have to call timeout_handle.cancel() @asyncio.coroutine def _worker(): nonlocal exhausted while not exhausted: try: t = next(tasks) pending.add(t) yield from t yield from done.put(t) pending.remove(t) except StopIteration: exhausted = True def _on_timeout(): for f in workers: f.cancel() workers.clear() # Wake up _wait_for_one() done.put_nowait(None) @asyncio.coroutine def _wait_for_one(): f = yield from done.get() if f is None: raise futures.TimeoutError() return f.result() workers = [asyncio.async(_worker(), loop=loop) for _ in range(max_workers)] # Valery: respect the "loop" parameter if workers and timeout is not None: timeout_handle = loop.call_later(timeout, _on_timeout) while not exhausted or pending or not done.empty(): yield _wait_for_one() if timeout_handle: # Valery: call timeout_handle.cancel() only if it is needed timeout_handle.cancel() best regards -- Valery A.Khamenya
-- https://mail.python.org/mailman/listinfo/python-list