Carlo Rosati <crosati...@icloud.com> added the comment:
I've actually written a few workarounds that should be considered a multiprocessing specific tee function. I need feedback/critique on these. Hopefully we can all agree on one solution that's the best. It is unfortunate that the multiprocessing manager does not provide a dequeue. The first one I wrote uses a managed list. def multiprocessing_tee(iterable, n=2): """Write a multiprocessing safe itertools.tee""" it = iter(iterable) m = multiprocessing.Manager() mylock = m.Lock() lists = [m.list() for i in range(n)] def gen(local_list): for i in itertools.count(): with mylock: if not local_list: # when the local list is empty newval = next(it) # fetch a new value and for l in lists: # load it to all the lists l.append(newval) yield local_list.pop(0) return tuple(gen(l) for l in lists) The second two implementations use queues. def multiprocessing_tee_q(iterable, n=2): """Write a multiprocessing safe itertools.tee""" it = iter(iterable) m = multiprocessing.Manager() lock = m.Lock() queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block) def gen(myqueue): while True: with lock: # no one else touches anything try: newval = myqueue.get_nowait() except Queue.Empty: newval = next(it) for q in queues: q.put(newval) newval = myqueue.get() yield newval return tuple(gen(q) for q in queues) class Sentinel(object): """used as Queue Sentinel""" def multiprocessing_tee_q2(iterable, n=2): """Write a multiprocessing safe itertools.tee""" it = iter(iterable) m = multiprocessing.Manager() lock = m.Lock() queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block) def gen(myqueue): while True: try: retval = myqueue.get_nowait() except Queue.Empty: # what happens if the other process puts last item in my queue before i get lock? with lock: # no one else touches anything try: newval = next(it) except StopIteration: newval = Sentinel for q in queues: q.put(newval) retval = myqueue.get() if retval is Sentinel: raise StopIteration yield retval return tuple(gen(q) for q in queues) I'm just throwing out my sketches here. I'm hoping the more experienced here can weigh in on these implementations. ---------- _______________________________________ Python tracker <rep...@bugs.python.org> <https://bugs.python.org/issue34410> _______________________________________ _______________________________________________ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com