D3960: worker: use one pipe per posix worker and select() in parent process
This revision was automatically updated to reflect the committed changes. Closed by commit rHG9e6afe7fca31: worker: use one pipe per posix worker and select() in parent process (authored by hooper, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D3960?vs=9626&id=9628 REVISION DETAIL https://phab.mercurial-scm.org/D3960 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -14,6 +14,12 @@ import threading import time +try: +import selectors +selectors.BaseSelector +except ImportError: +from .thirdparty import selectors2 as selectors + from .i18n import _ from . import ( encoding, @@ -89,7 +95,6 @@ return func(*staticargs + (args,)) def _posixworker(ui, func, staticargs, args): -rfd, wfd = os.pipe() workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -138,7 +143,15 @@ oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) ui.flush() parentpid = os.getpid() +pipes = [] for pargs in partition(args, workers): +# Every worker gets its own pipe to send results on, so we don't have to +# implement atomic writes larger than PIPE_BUF. Each forked process has +# its own pipe's descriptors in the local variables, and the parent +# process has the full list of pipe descriptors (and it doesn't really +# care what order they're in). +rfd, wfd = os.pipe() +pipes.append((rfd, wfd)) # make sure we use os._exit in all worker code paths. otherwise the # worker may do some clean-ups which could cause surprises like # deadlock. see sshpeer.cleanup for example. @@ -154,6 +167,9 @@ signal.signal(signal.SIGCHLD, oldchldhandler) def workerfunc(): +for r, w in pipes[:-1]: +os.close(r) +os.close(w) os.close(rfd) for result in func(*(staticargs + (pargs,))): os.write(wfd, util.pickle.dumps(result)) @@ -175,8 +191,10 @@ finally: os._exit(ret & 255) pids.add(pid) -os.close(wfd) -fp = os.fdopen(rfd, r'rb', 0) +selector = selectors.DefaultSelector() +for rfd, wfd in pipes: +os.close(wfd) +selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ) def cleanup(): signal.signal(signal.SIGINT, oldhandler) waitforworkers() @@ -187,15 +205,19 @@ os.kill(os.getpid(), -status) sys.exit(status) try: -while True: -try: -yield util.pickle.load(fp) -except EOFError: -break -except IOError as e: -if e.errno == errno.EINTR: -continue -raise +openpipes = len(pipes) +while openpipes > 0: +for key, events in selector.select(): +try: +yield util.pickle.load(key.fileobj) +except EOFError: +selector.unregister(key.fileobj) +key.fileobj.close() +openpipes -= 1 +except IOError as e: +if e.errno == errno.EINTR: +continue +raise except: # re-raises killworkers() cleanup() To: hooper, #hg-reviewers Cc: yuja, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
Re: D3960: worker: use one pipe per posix worker and select() in parent process
Queued, thanks. We'll probably need `selector.close()` somewhere. > Do you want to move the selector import stuff to pycompat? Sounds nice. We'll have to pull the latest selectors2 module to get rid of reference cycle. The issue addressed by a568a46751b6 appears to be fixed in upstream. ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D3960: worker: use one pipe per posix worker and select() in parent process
yuja added a comment. Queued, thanks. We'll probably need `selector.close()` somewhere. > Do you want to move the selector import stuff to pycompat? Sounds nice. We'll have to pull the latest selectors2 module to get rid of reference cycle. The issue addressed by https://phab.mercurial-scm.org/rHGa568a46751b6fc87437b143582f594bfe87451f6 appears to be fixed in upstream. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D3960 To: hooper, #hg-reviewers Cc: yuja, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D3960: worker: use one pipe per posix worker and select() in parent process
hooper added a comment. Do you want to move the selector import stuff to pycompat? REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D3960 To: hooper, #hg-reviewers Cc: yuja, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D3960: worker: use one pipe per posix worker and select() in parent process
hooper updated this revision to Diff 9626. REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D3960?vs=9618&id=9626 REVISION DETAIL https://phab.mercurial-scm.org/D3960 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -14,6 +14,12 @@ import threading import time +try: +import selectors +selectors.BaseSelector +except ImportError: +from .thirdparty import selectors2 as selectors + from .i18n import _ from . import ( encoding, @@ -89,7 +95,6 @@ return func(*staticargs + (args,)) def _posixworker(ui, func, staticargs, args): -rfd, wfd = os.pipe() workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -138,7 +143,15 @@ oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) ui.flush() parentpid = os.getpid() +pipes = [] for pargs in partition(args, workers): +# Every worker gets its own pipe to send results on, so we don't have to +# implement atomic writes larger than PIPE_BUF. Each forked process has +# its own pipe's descriptors in the local variables, and the parent +# process has the full list of pipe descriptors (and it doesn't really +# care what order they're in). +rfd, wfd = os.pipe() +pipes.append((rfd, wfd)) # make sure we use os._exit in all worker code paths. otherwise the # worker may do some clean-ups which could cause surprises like # deadlock. see sshpeer.cleanup for example. @@ -154,6 +167,9 @@ signal.signal(signal.SIGCHLD, oldchldhandler) def workerfunc(): +for r, w in pipes[:-1]: +os.close(r) +os.close(w) os.close(rfd) for result in func(*(staticargs + (pargs,))): os.write(wfd, util.pickle.dumps(result)) @@ -175,8 +191,10 @@ finally: os._exit(ret & 255) pids.add(pid) -os.close(wfd) -fp = os.fdopen(rfd, r'rb', 0) +selector = selectors.DefaultSelector() +for rfd, wfd in pipes: +os.close(wfd) +selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ) def cleanup(): signal.signal(signal.SIGINT, oldhandler) waitforworkers() @@ -187,15 +205,19 @@ os.kill(os.getpid(), -status) sys.exit(status) try: -while True: -try: -yield util.pickle.load(fp) -except EOFError: -break -except IOError as e: -if e.errno == errno.EINTR: -continue -raise +openpipes = len(pipes) +while openpipes > 0: +for key, events in selector.select(): +try: +yield util.pickle.load(key.fileobj) +except EOFError: +selector.unregister(key.fileobj) +key.fileobj.close() +openpipes -= 1 +except IOError as e: +if e.errno == errno.EINTR: +continue +raise except: # re-raises killworkers() cleanup() To: hooper, #hg-reviewers Cc: yuja, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
Re: D3960: worker: use one pipe per posix worker and select() in parent process
> @@ -138,7 +138,15 @@ > oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) > ui.flush() > parentpid = os.getpid() > +pipes = [] > for pargs in partition(args, workers): > +# Every worker gets its own pipe to send results on, so we don't > have to > +# implement atomic writes larger than PIPE_BUF. Each forked process > has > +# its own pipe's descriptors in the local variables, and the parent > +# process has the full list of pipe descriptors (and it doesn't > really > +# care what order they're in). > +rfd, wfd = os.pipe() > +pipes.append((rfd, wfd)) > # make sure we use os._exit in all worker code paths. otherwise the > # worker may do some clean-ups which could cause surprises like > # deadlock. see sshpeer.cleanup for example. > @@ -175,8 +183,10 @@ > finally: > os._exit(ret & 255) > pids.add(pid) > -os.close(wfd) > -fp = os.fdopen(rfd, r'rb', 0) > +fps = [] > +for rfd, wfd in pipes: > +os.close(wfd) > +fps.append(os.fdopen(rfd, r'rb', 0)) This isn't enough. For child processes, all pipe fds except for the last wfd have to be closed at the beginning of `workerfunc()`. > +rlist, wlist, xlist = select.select(fps, [], fps) Can you rewrite it to use the selectors module? commandserver.py has an example. ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D3960: worker: use one pipe per posix worker and select() in parent process
yuja added a comment. > @@ -138,7 +138,15 @@ > > oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) > ui.flush() > parentpid = os.getpid() > > +pipes = [] > > for pargs in partition(args, workers): > > +# Every worker gets its own pipe to send results on, so we don't have to > +# implement atomic writes larger than PIPE_BUF. Each forked process has > +# its own pipe's descriptors in the local variables, and the parent > +# process has the full list of pipe descriptors (and it doesn't really > +# care what order they're in). > +rfd, wfd = os.pipe() > +pipes.append((rfd, wfd)) > > 1. make sure we use os._exit in all worker code paths. otherwise the > 2. worker may do some clean-ups which could cause surprises like > 3. deadlock. see sshpeer.cleanup for example. @@ -175,8 +183,10 @@ finally: os._exit(ret & 255) pids.add(pid) > - os.close(wfd) > - fp = os.fdopen(rfd, r'rb', 0) +fps = [] +for rfd, wfd in pipes: + os.close(wfd) +fps.append(os.fdopen(rfd, r'rb', 0)) This isn't enough. For child processes, all pipe fds except for the last wfd have to be closed at the beginning of `workerfunc()`. > +rlist, wlist, xlist = select.select(fps, [], fps) Can you rewrite it to use the selectors module? commandserver.py has an example. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D3960 To: hooper, #hg-reviewers Cc: yuja, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D3960: worker: use one pipe per posix worker and select() in parent process
hooper created this revision. Herald added a subscriber: mercurial-devel. Herald added a reviewer: hg-reviewers. REVISION SUMMARY This allows us to pass results larger than PIPE_BUF through the pipes without interleaving them. This is necessary now because "hg fix" sends file contents as the result from workers. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D3960 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -9,6 +9,7 @@ import errno import os +import select import signal import sys import threading @@ -89,7 +90,6 @@ return func(*staticargs + (args,)) def _posixworker(ui, func, staticargs, args): -rfd, wfd = os.pipe() workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -138,7 +138,15 @@ oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) ui.flush() parentpid = os.getpid() +pipes = [] for pargs in partition(args, workers): +# Every worker gets its own pipe to send results on, so we don't have to +# implement atomic writes larger than PIPE_BUF. Each forked process has +# its own pipe's descriptors in the local variables, and the parent +# process has the full list of pipe descriptors (and it doesn't really +# care what order they're in). +rfd, wfd = os.pipe() +pipes.append((rfd, wfd)) # make sure we use os._exit in all worker code paths. otherwise the # worker may do some clean-ups which could cause surprises like # deadlock. see sshpeer.cleanup for example. @@ -175,8 +183,10 @@ finally: os._exit(ret & 255) pids.add(pid) -os.close(wfd) -fp = os.fdopen(rfd, r'rb', 0) +fps = [] +for rfd, wfd in pipes: +os.close(wfd) +fps.append(os.fdopen(rfd, r'rb', 0)) def cleanup(): signal.signal(signal.SIGINT, oldhandler) waitforworkers() @@ -187,15 +197,23 @@ os.kill(os.getpid(), -status) sys.exit(status) try: -while True: +while fps: try: -yield util.pickle.load(fp) -except EOFError: -break -except IOError as e: -if e.errno == errno.EINTR: +rlist, wlist, xlist = select.select(fps, [], fps) +except select.error as e: +if e[0] == errno.EINTR: continue raise +for fp in rlist + xlist: +try: +yield util.pickle.load(fp) +except EOFError: +fp.close() +except IOError as e: +if e.errno == errno.EINTR: +continue +raise +fps = [fp for fp in fps if not fp.closed] except: # re-raises killworkers() cleanup() To: hooper, #hg-reviewers Cc: mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel