D3960: worker: use one pipe per posix worker and select() in parent process

2018-07-19 Thread hooper (Danny Hooper)
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG9e6afe7fca31: worker: use one pipe per posix worker and 
select() in parent process (authored by hooper, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3960?vs=9626&id=9628

REVISION DETAIL
  https://phab.mercurial-scm.org/D3960

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -14,6 +14,12 @@
 import threading
 import time
 
+try:
+import selectors
+selectors.BaseSelector
+except ImportError:
+from .thirdparty import selectors2 as selectors
+
 from .i18n import _
 from . import (
 encoding,
@@ -89,7 +95,6 @@
 return func(*staticargs + (args,))
 
 def _posixworker(ui, func, staticargs, args):
-rfd, wfd = os.pipe()
 workers = _numworkers(ui)
 oldhandler = signal.getsignal(signal.SIGINT)
 signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +143,15 @@
 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
 ui.flush()
 parentpid = os.getpid()
+pipes = []
 for pargs in partition(args, workers):
+# Every worker gets its own pipe to send results on, so we don't have 
to
+# implement atomic writes larger than PIPE_BUF. Each forked process has
+# its own pipe's descriptors in the local variables, and the parent
+# process has the full list of pipe descriptors (and it doesn't really
+# care what order they're in).
+rfd, wfd = os.pipe()
+pipes.append((rfd, wfd))
 # make sure we use os._exit in all worker code paths. otherwise the
 # worker may do some clean-ups which could cause surprises like
 # deadlock. see sshpeer.cleanup for example.
@@ -154,6 +167,9 @@
 signal.signal(signal.SIGCHLD, oldchldhandler)
 
 def workerfunc():
+for r, w in pipes[:-1]:
+os.close(r)
+os.close(w)
 os.close(rfd)
 for result in func(*(staticargs + (pargs,))):
 os.write(wfd, util.pickle.dumps(result))
@@ -175,8 +191,10 @@
 finally:
 os._exit(ret & 255)
 pids.add(pid)
-os.close(wfd)
-fp = os.fdopen(rfd, r'rb', 0)
+selector = selectors.DefaultSelector()
+for rfd, wfd in pipes:
+os.close(wfd)
+selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
 def cleanup():
 signal.signal(signal.SIGINT, oldhandler)
 waitforworkers()
@@ -187,15 +205,19 @@
 os.kill(os.getpid(), -status)
 sys.exit(status)
 try:
-while True:
-try:
-yield util.pickle.load(fp)
-except EOFError:
-break
-except IOError as e:
-if e.errno == errno.EINTR:
-continue
-raise
+openpipes = len(pipes)
+while openpipes > 0:
+for key, events in selector.select():
+try:
+yield util.pickle.load(key.fileobj)
+except EOFError:
+selector.unregister(key.fileobj)
+key.fileobj.close()
+openpipes -= 1
+except IOError as e:
+if e.errno == errno.EINTR:
+continue
+raise
 except: # re-raises
 killworkers()
 cleanup()



To: hooper, #hg-reviewers
Cc: yuja, mercurial-devel
___
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel


Re: D3960: worker: use one pipe per posix worker and select() in parent process

2018-07-19 Thread Yuya Nishihara
Queued, thanks. We'll probably need `selector.close()` somewhere.

>   Do you want to move the selector import stuff to pycompat?

Sounds nice. We'll have to pull the latest selectors2 module to get rid of
reference cycle. The issue addressed by a568a46751b6 appears to be fixed
in upstream.
___
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel


D3960: worker: use one pipe per posix worker and select() in parent process

2018-07-19 Thread yuja (Yuya Nishihara)
yuja added a comment.


  Queued, thanks. We'll probably need `selector.close()` somewhere.
  
  >   Do you want to move the selector import stuff to pycompat?
  
  Sounds nice. We'll have to pull the latest selectors2 module to get rid of
  reference cycle. The issue addressed by 
https://phab.mercurial-scm.org/rHGa568a46751b6fc87437b143582f594bfe87451f6 
appears to be fixed
  in upstream.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3960

To: hooper, #hg-reviewers
Cc: yuja, mercurial-devel
___
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel


D3960: worker: use one pipe per posix worker and select() in parent process

2018-07-18 Thread hooper (Danny Hooper)
hooper added a comment.


  Do you want to move the selector import stuff to pycompat?

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3960

To: hooper, #hg-reviewers
Cc: yuja, mercurial-devel
___
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel


D3960: worker: use one pipe per posix worker and select() in parent process

2018-07-18 Thread hooper (Danny Hooper)
hooper updated this revision to Diff 9626.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3960?vs=9618&id=9626

REVISION DETAIL
  https://phab.mercurial-scm.org/D3960

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -14,6 +14,12 @@
 import threading
 import time
 
+try:
+import selectors
+selectors.BaseSelector
+except ImportError:
+from .thirdparty import selectors2 as selectors
+
 from .i18n import _
 from . import (
 encoding,
@@ -89,7 +95,6 @@
 return func(*staticargs + (args,))
 
 def _posixworker(ui, func, staticargs, args):
-rfd, wfd = os.pipe()
 workers = _numworkers(ui)
 oldhandler = signal.getsignal(signal.SIGINT)
 signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +143,15 @@
 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
 ui.flush()
 parentpid = os.getpid()
+pipes = []
 for pargs in partition(args, workers):
+# Every worker gets its own pipe to send results on, so we don't have 
to
+# implement atomic writes larger than PIPE_BUF. Each forked process has
+# its own pipe's descriptors in the local variables, and the parent
+# process has the full list of pipe descriptors (and it doesn't really
+# care what order they're in).
+rfd, wfd = os.pipe()
+pipes.append((rfd, wfd))
 # make sure we use os._exit in all worker code paths. otherwise the
 # worker may do some clean-ups which could cause surprises like
 # deadlock. see sshpeer.cleanup for example.
@@ -154,6 +167,9 @@
 signal.signal(signal.SIGCHLD, oldchldhandler)
 
 def workerfunc():
+for r, w in pipes[:-1]:
+os.close(r)
+os.close(w)
 os.close(rfd)
 for result in func(*(staticargs + (pargs,))):
 os.write(wfd, util.pickle.dumps(result))
@@ -175,8 +191,10 @@
 finally:
 os._exit(ret & 255)
 pids.add(pid)
-os.close(wfd)
-fp = os.fdopen(rfd, r'rb', 0)
+selector = selectors.DefaultSelector()
+for rfd, wfd in pipes:
+os.close(wfd)
+selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
 def cleanup():
 signal.signal(signal.SIGINT, oldhandler)
 waitforworkers()
@@ -187,15 +205,19 @@
 os.kill(os.getpid(), -status)
 sys.exit(status)
 try:
-while True:
-try:
-yield util.pickle.load(fp)
-except EOFError:
-break
-except IOError as e:
-if e.errno == errno.EINTR:
-continue
-raise
+openpipes = len(pipes)
+while openpipes > 0:
+for key, events in selector.select():
+try:
+yield util.pickle.load(key.fileobj)
+except EOFError:
+selector.unregister(key.fileobj)
+key.fileobj.close()
+openpipes -= 1
+except IOError as e:
+if e.errno == errno.EINTR:
+continue
+raise
 except: # re-raises
 killworkers()
 cleanup()



To: hooper, #hg-reviewers
Cc: yuja, mercurial-devel
___
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel


Re: D3960: worker: use one pipe per posix worker and select() in parent process

2018-07-18 Thread Yuya Nishihara
> @@ -138,7 +138,15 @@
>  oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
>  ui.flush()
>  parentpid = os.getpid()
> +pipes = []
>  for pargs in partition(args, workers):
> +# Every worker gets its own pipe to send results on, so we don't 
> have to
> +# implement atomic writes larger than PIPE_BUF. Each forked process 
> has
> +# its own pipe's descriptors in the local variables, and the parent
> +# process has the full list of pipe descriptors (and it doesn't 
> really
> +# care what order they're in).
> +rfd, wfd = os.pipe()
> +pipes.append((rfd, wfd))
>  # make sure we use os._exit in all worker code paths. otherwise the
>  # worker may do some clean-ups which could cause surprises like
>  # deadlock. see sshpeer.cleanup for example.
> @@ -175,8 +183,10 @@
>  finally:
>  os._exit(ret & 255)
>  pids.add(pid)
> -os.close(wfd)
> -fp = os.fdopen(rfd, r'rb', 0)
> +fps = []
> +for rfd, wfd in pipes:
> +os.close(wfd)
> +fps.append(os.fdopen(rfd, r'rb', 0))

This isn't enough. For child processes, all pipe fds except for the last
wfd have to be closed at the beginning of `workerfunc()`.

> +rlist, wlist, xlist = select.select(fps, [], fps)

Can you rewrite it to use the selectors module? commandserver.py has an
example.
___
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel


D3960: worker: use one pipe per posix worker and select() in parent process

2018-07-18 Thread yuja (Yuya Nishihara)
yuja added a comment.


  > @@ -138,7 +138,15 @@
  > 
  >   oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
  >   ui.flush()
  >   parentpid = os.getpid()
  > 
  > +pipes = []
  > 
  >   for pargs in partition(args, workers):
  > 
  > +# Every worker gets its own pipe to send results on, so we don't 
have to
  >  +# implement atomic writes larger than PIPE_BUF. Each forked 
process has
  >  +# its own pipe's descriptors in the local variables, and the 
parent
  >  +# process has the full list of pipe descriptors (and it doesn't 
really
  >  +# care what order they're in).
  >  +rfd, wfd = os.pipe()
  >  +pipes.append((rfd, wfd))
  > 
  >   1. make sure we use os._exit in all worker code paths. otherwise the
  >   2. worker may do some clean-ups which could cause surprises like
  >   3. deadlock. see sshpeer.cleanup for example. @@ -175,8 +183,10 @@ 
finally: os._exit(ret & 255) pids.add(pid)
  > - os.close(wfd)
  > - fp = os.fdopen(rfd, r'rb', 0) +fps = [] +for rfd, wfd in pipes: + 
   os.close(wfd) +fps.append(os.fdopen(rfd, r'rb', 0))
  
  This isn't enough. For child processes, all pipe fds except for the last
  wfd have to be closed at the beginning of `workerfunc()`.
  
  > +rlist, wlist, xlist = select.select(fps, [], fps)
  
  Can you rewrite it to use the selectors module? commandserver.py has an
  example.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3960

To: hooper, #hg-reviewers
Cc: yuja, mercurial-devel
___
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel


D3960: worker: use one pipe per posix worker and select() in parent process

2018-07-17 Thread hooper (Danny Hooper)
hooper created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This allows us to pass results larger than PIPE_BUF through the pipes without
  interleaving them. This is necessary now because "hg fix" sends file contents
  as the result from workers.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3960

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -9,6 +9,7 @@
 
 import errno
 import os
+import select
 import signal
 import sys
 import threading
@@ -89,7 +90,6 @@
 return func(*staticargs + (args,))
 
 def _posixworker(ui, func, staticargs, args):
-rfd, wfd = os.pipe()
 workers = _numworkers(ui)
 oldhandler = signal.getsignal(signal.SIGINT)
 signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +138,15 @@
 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
 ui.flush()
 parentpid = os.getpid()
+pipes = []
 for pargs in partition(args, workers):
+# Every worker gets its own pipe to send results on, so we don't have 
to
+# implement atomic writes larger than PIPE_BUF. Each forked process has
+# its own pipe's descriptors in the local variables, and the parent
+# process has the full list of pipe descriptors (and it doesn't really
+# care what order they're in).
+rfd, wfd = os.pipe()
+pipes.append((rfd, wfd))
 # make sure we use os._exit in all worker code paths. otherwise the
 # worker may do some clean-ups which could cause surprises like
 # deadlock. see sshpeer.cleanup for example.
@@ -175,8 +183,10 @@
 finally:
 os._exit(ret & 255)
 pids.add(pid)
-os.close(wfd)
-fp = os.fdopen(rfd, r'rb', 0)
+fps = []
+for rfd, wfd in pipes:
+os.close(wfd)
+fps.append(os.fdopen(rfd, r'rb', 0))
 def cleanup():
 signal.signal(signal.SIGINT, oldhandler)
 waitforworkers()
@@ -187,15 +197,23 @@
 os.kill(os.getpid(), -status)
 sys.exit(status)
 try:
-while True:
+while fps:
 try:
-yield util.pickle.load(fp)
-except EOFError:
-break
-except IOError as e:
-if e.errno == errno.EINTR:
+rlist, wlist, xlist = select.select(fps, [], fps)
+except select.error as e:
+if e[0] == errno.EINTR:
 continue
 raise
+for fp in rlist + xlist:
+try:
+yield util.pickle.load(fp)
+except EOFError:
+fp.close()
+except IOError as e:
+if e.errno == errno.EINTR:
+continue
+raise
+fps = [fp for fp in fps if not fp.closed]
 except: # re-raises
 killworkers()
 cleanup()



To: hooper, #hg-reviewers
Cc: mercurial-devel
___
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel