Carlo Rosati <crosati...@icloud.com> added the comment:

I've actually written a few workarounds that should be considered a 
multiprocessing specific tee function. I need feedback/critique on these. 
Hopefully we can all agree on one solution that's the best. It is unfortunate 
that the multiprocessing manager does not provide a dequeue.

The first one I wrote uses a managed list. 

def multiprocessing_tee(iterable, n=2):
    """Write a multiprocessing safe itertools.tee"""
    it = iter(iterable)
    m = multiprocessing.Manager()
    mylock = m.Lock()
    lists = [m.list() for i in range(n)]
    def gen(local_list):
        for i in itertools.count():
            with mylock:
                if not local_list:         # when the local list is empty
                    newval = next(it)      # fetch a new value and
                    for l in lists:        # load it to all the lists
                        l.append(newval)
            yield local_list.pop(0)
    return tuple(gen(l) for l in lists)

The second two implementations use queues. 

def multiprocessing_tee_q(iterable, n=2):
    """Write a multiprocessing safe itertools.tee"""
    it = iter(iterable)
    m = multiprocessing.Manager()
    lock = m.Lock()
    queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so 
puts won't block)
    def gen(myqueue):
        while True:
            with lock: # no one else touches anything
                try:
                    newval = myqueue.get_nowait()
                except Queue.Empty:
                    newval = next(it)
                    for q in queues:
                        q.put(newval)
                    newval = myqueue.get()
            yield newval
    return tuple(gen(q) for q in queues)

class Sentinel(object):
    """used as Queue Sentinel"""

def multiprocessing_tee_q2(iterable, n=2):
    """Write a multiprocessing safe itertools.tee"""
    it = iter(iterable)
    m = multiprocessing.Manager()
    lock = m.Lock()
    queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so 
puts won't block)
    def gen(myqueue):
        while True:
            try:
                retval = myqueue.get_nowait()
            except Queue.Empty:
                # what happens if the other process puts last item in my queue 
before i get lock?
                with lock: # no one else touches anything
                    try:
                        newval = next(it)
                    except StopIteration:
                        newval = Sentinel
                    for q in queues:
                        q.put(newval)
                retval = myqueue.get()
            if retval is Sentinel:
                raise StopIteration
            yield retval
    return tuple(gen(q) for q in queues)

I'm just throwing out my sketches here. I'm hoping the more experienced here 
can weigh in on these implementations.

----------

_______________________________________
Python tracker <rep...@bugs.python.org>
<https://bugs.python.org/issue34410>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to