D3268: wireproto: implement command executor interface for version 1 peers

2018-04-13 Thread indygreg (Gregory Szorc)
This revision was automatically updated to reflect the committed changes.
Closed by commit rHGe1b32dc4646c: wireproto: implement command executor 
interface for version 1 peers (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3268?vs=8121&id=8167

REVISION DETAIL
  https://phab.mercurial-scm.org/D3268

AFFECTED FILES
  mercurial/localrepo.py
  mercurial/repository.py
  mercurial/setdiscovery.py
  mercurial/wireprotov1peer.py
  tests/test-check-interfaces.py

CHANGE DETAILS

diff --git a/tests/test-check-interfaces.py b/tests/test-check-interfaces.py
--- a/tests/test-check-interfaces.py
+++ b/tests/test-check-interfaces.py
@@ -23,6 +23,7 @@
 vfs as vfsmod,
 wireprotoserver,
 wireprototypes,
+wireprotov1peer,
 wireprotov2server,
 )
 
@@ -102,6 +103,14 @@
  localrepo.localpeer)
 checkzobject(localrepo.localpeer(dummyrepo()))
 
+ziverify.verifyClass(repository.ipeercommandexecutor,
+ localrepo.localcommandexecutor)
+checkzobject(localrepo.localcommandexecutor(None))
+
+ziverify.verifyClass(repository.ipeercommandexecutor,
+ wireprotov1peer.peerexecutor)
+checkzobject(wireprotov1peer.peerexecutor(None))
+
 ziverify.verifyClass(repository.ipeerbaselegacycommands,
  sshpeer.sshv1peer)
 checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, 
dummypipe(),
diff --git a/mercurial/wireprotov1peer.py b/mercurial/wireprotov1peer.py
--- a/mercurial/wireprotov1peer.py
+++ b/mercurial/wireprotov1peer.py
@@ -8,12 +8,15 @@
 from __future__ import absolute_import
 
 import hashlib
+import sys
 
 from .i18n import _
 from .node import (
 bin,
 )
-
+from .thirdparty.zope import (
+interface as zi,
+)
 from . import (
 bundle2,
 changegroup as changegroupmod,
@@ -177,14 +180,104 @@
 
 return ';'.join(cmds)
 
+@zi.implementer(repository.ipeercommandexecutor)
+class peerexecutor(object):
+def __init__(self, peer):
+self._peer = peer
+self._sent = False
+self._closed = False
+self._calls = []
+
+def __enter__(self):
+return self
+
+def __exit__(self, exctype, excvalee, exctb):
+self.close()
+
+def callcommand(self, command, args):
+if self._sent:
+raise error.ProgrammingError('callcommand() cannot be used '
+ 'after commands are sent')
+
+if self._closed:
+raise error.ProgrammingError('callcommand() cannot be used '
+ 'after close()')
+
+# Commands are dispatched through methods on the peer.
+fn = getattr(self._peer, pycompat.sysstr(command), None)
+
+if not fn:
+raise error.ProgrammingError(
+'cannot call command %s: method of same name not available '
+'on peer' % command)
+
+# Commands are either batchable or they aren't. If a command
+# isn't batchable, we send it immediately because the executor
+# can no longer accept new commands after a non-batchable command.
+# If a command is batchable, we queue it for later.
+
+if getattr(fn, 'batchable', False):
+pass
+else:
+if self._calls:
+raise error.ProgrammingError(
+'%s is not batchable and cannot be called on a command '
+'executor along with other commands' % command)
+
+# We don't support batching yet. So resolve it immediately.
+f = pycompat.futures.Future()
+self._calls.append((command, args, fn, f))
+self.sendcommands()
+return f
+
+def sendcommands(self):
+if self._sent:
+return
+
+if not self._calls:
+return
+
+self._sent = True
+
+calls = self._calls
+# Mainly to destroy references to futures.
+self._calls = None
+
+if len(calls) == 1:
+command, args, fn, f = calls[0]
+
+# Future was cancelled. Ignore it.
+if not f.set_running_or_notify_cancel():
+return
+
+try:
+result = fn(**pycompat.strkwargs(args))
+except Exception:
+f.set_exception_info(*sys.exc_info()[1:])
+else:
+f.set_result(result)
+
+return
+
+raise error.ProgrammingError('support for multiple commands not '
+ 'yet implemented')
+
+def close(self):
+self.sendcommands()
+
+self._closed = True
+
 class wirepeer(repository.legacypeer):
 """Client-side interface for communicating with a peer repository.
 
 Methods commonly call wire protocol commands of the same name.
 
 See also httppeer.py and sshpeer.py for protocol-specific
 implementations of t

D3268: wireproto: implement command executor interface for version 1 peers

2018-04-13 Thread indygreg (Gregory Szorc)
indygreg updated this revision to Diff 8121.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3268?vs=8034&id=8121

REVISION DETAIL
  https://phab.mercurial-scm.org/D3268

AFFECTED FILES
  mercurial/localrepo.py
  mercurial/repository.py
  mercurial/setdiscovery.py
  mercurial/wireprotov1peer.py
  tests/test-check-interfaces.py

CHANGE DETAILS

diff --git a/tests/test-check-interfaces.py b/tests/test-check-interfaces.py
--- a/tests/test-check-interfaces.py
+++ b/tests/test-check-interfaces.py
@@ -23,6 +23,7 @@
 vfs as vfsmod,
 wireprotoserver,
 wireprototypes,
+wireprotov1peer,
 wireprotov2server,
 )
 
@@ -102,6 +103,14 @@
  localrepo.localpeer)
 checkzobject(localrepo.localpeer(dummyrepo()))
 
+ziverify.verifyClass(repository.ipeercommandexecutor,
+ localrepo.localcommandexecutor)
+checkzobject(localrepo.localcommandexecutor(None))
+
+ziverify.verifyClass(repository.ipeercommandexecutor,
+ wireprotov1peer.peerexecutor)
+checkzobject(wireprotov1peer.peerexecutor(None))
+
 ziverify.verifyClass(repository.ipeerbaselegacycommands,
  sshpeer.sshv1peer)
 checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, 
dummypipe(),
diff --git a/mercurial/wireprotov1peer.py b/mercurial/wireprotov1peer.py
--- a/mercurial/wireprotov1peer.py
+++ b/mercurial/wireprotov1peer.py
@@ -8,12 +8,15 @@
 from __future__ import absolute_import
 
 import hashlib
+import sys
 
 from .i18n import _
 from .node import (
 bin,
 )
-
+from .thirdparty.zope import (
+interface as zi,
+)
 from . import (
 bundle2,
 changegroup as changegroupmod,
@@ -177,14 +180,104 @@
 
 return ';'.join(cmds)
 
+@zi.implementer(repository.ipeercommandexecutor)
+class peerexecutor(object):
+def __init__(self, peer):
+self._peer = peer
+self._sent = False
+self._closed = False
+self._calls = []
+
+def __enter__(self):
+return self
+
+def __exit__(self, exctype, excvalee, exctb):
+self.close()
+
+def callcommand(self, command, args):
+if self._sent:
+raise error.ProgrammingError('callcommand() cannot be used '
+ 'after commands are sent')
+
+if self._closed:
+raise error.ProgrammingError('callcommand() cannot be used '
+ 'after close()')
+
+# Commands are dispatched through methods on the peer.
+fn = getattr(self._peer, pycompat.sysstr(command), None)
+
+if not fn:
+raise error.ProgrammingError(
+'cannot call command %s: method of same name not available '
+'on peer' % command)
+
+# Commands are either batchable or they aren't. If a command
+# isn't batchable, we send it immediately because the executor
+# can no longer accept new commands after a non-batchable command.
+# If a command is batchable, we queue it for later.
+
+if getattr(fn, 'batchable', False):
+pass
+else:
+if self._calls:
+raise error.ProgrammingError(
+'%s is not batchable and cannot be called on a command '
+'executor along with other commands' % command)
+
+# We don't support batching yet. So resolve it immediately.
+f = pycompat.futures.Future()
+self._calls.append((command, args, fn, f))
+self.sendcommands()
+return f
+
+def sendcommands(self):
+if self._sent:
+return
+
+if not self._calls:
+return
+
+self._sent = True
+
+calls = self._calls
+# Mainly to destroy references to futures.
+self._calls = None
+
+if len(calls) == 1:
+command, args, fn, f = calls[0]
+
+# Future was cancelled. Ignore it.
+if not f.set_running_or_notify_cancel():
+return
+
+try:
+result = fn(**pycompat.strkwargs(args))
+except Exception:
+f.set_exception_info(*sys.exc_info()[1:])
+else:
+f.set_result(result)
+
+return
+
+raise error.ProgrammingError('support for multiple commands not '
+ 'yet implemented')
+
+def close(self):
+self.sendcommands()
+
+self._closed = True
+
 class wirepeer(repository.legacypeer):
 """Client-side interface for communicating with a peer repository.
 
 Methods commonly call wire protocol commands of the same name.
 
 See also httppeer.py and sshpeer.py for protocol-specific
 implementations of this interface.
 """
+def commandexecutor(self):
+return peerexecutor(self)
+
 # Begin of ipeercommands interface.
 
 def iterbatch(self):
diff --gi

D3268: wireproto: implement command executor interface for version 1 peers

2018-04-11 Thread indygreg (Gregory Szorc)
indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  Now that we've defined our new interface for issuing commands,
  let's implement it.
  
  We add the interface to the base peer interface. This means all
  peer types must implement it.
  
  The only peer types that we have are the local peer in localrepo
  and a shared wire peer for version 1 of the wire protocol.
  
  The local peer implementation is pretty straightforward. We
  don't do anything fancy and just return a resolved future with
  the result of a method call. This is similar to what
  localiterbatcher does.
  
  The wire protocol version 1 implementation is a bit more complicated
  and is a more robust implementation.
  
  The wire executor queues commands by default. And because the new
  executor interface always allows multiple commands but not all version
  1 commands are @batchable, it has to check that the requested commands
  are batchable if multiple commands are being requested.
  
  The wire executor currently only supports executing a single command.
  This is for simplicity reasons. Support for multiple commands will
  be added in a separate commit.
  
  To prove the new interface works, a call to the "known" command
  during discovery has been updated to use the new API.
  
  It's worth noting that both implementations require a method having
  the command name to exist on the peer. There is at least one caller
  in core that don't have a method calls peer._call() directly. We
  may need to shore up the requirements later...

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3268

AFFECTED FILES
  mercurial/localrepo.py
  mercurial/repository.py
  mercurial/setdiscovery.py
  mercurial/wireprotov1peer.py
  tests/test-check-interfaces.py

CHANGE DETAILS

diff --git a/tests/test-check-interfaces.py b/tests/test-check-interfaces.py
--- a/tests/test-check-interfaces.py
+++ b/tests/test-check-interfaces.py
@@ -23,6 +23,7 @@
 vfs as vfsmod,
 wireprotoserver,
 wireprototypes,
+wireprotov1peer,
 wireprotov2server,
 )
 
@@ -102,6 +103,14 @@
  localrepo.localpeer)
 checkzobject(localrepo.localpeer(dummyrepo()))
 
+ziverify.verifyClass(repository.ipeercommandexecutor,
+ localrepo.localcommandexecutor)
+checkzobject(localrepo.localcommandexecutor(None))
+
+ziverify.verifyClass(repository.ipeercommandexecutor,
+ wireprotov1peer.peerexecutor)
+checkzobject(wireprotov1peer.peerexecutor(None))
+
 ziverify.verifyClass(repository.ipeerbaselegacycommands,
  sshpeer.sshv1peer)
 checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, 
dummypipe(),
diff --git a/mercurial/wireprotov1peer.py b/mercurial/wireprotov1peer.py
--- a/mercurial/wireprotov1peer.py
+++ b/mercurial/wireprotov1peer.py
@@ -7,13 +7,17 @@
 
 from __future__ import absolute_import
 
+import contextlib
 import hashlib
+import sys
 
 from .i18n import _
 from .node import (
 bin,
 )
-
+from .thirdparty.zope import (
+interface as zi,
+)
 from . import (
 bundle2,
 changegroup as changegroupmod,
@@ -177,14 +181,96 @@
 
 return ';'.join(cmds)
 
+@zi.implementer(repository.ipeercommandexecutor)
+class peerexecutor(object):
+def __init__(self, peer):
+self._peer = peer
+self._sent = False
+self._closed = False
+self._calls = []
+
+def callcommand(self, command, args):
+if self._sent:
+raise error.ProgrammingError('callcommand() cannot be used '
+ 'after sendcommands()')
+
+if self._closed:
+raise error.ProgrammingError('callcommand() cannot be used '
+ 'after close()')
+
+# Commands typically have methods on the peer
+fn = getattr(self._peer, pycompat.sysstr(command), None)
+
+if fn:
+# Not all commands are batchable. So verify we don't attempt
+# to batch non-batchable commands.
+isbatchable = getattr(fn, 'batchable', False)
+
+if not isbatchable and self._calls:
+raise error.ProgrammingError(
+'%s is not batchable and cannot be called on a command '
+'executor along with other commands' % command)
+
+if self._calls and not self._calls[-1][2]:
+raise error.ProgrammingError(
+'%s cannot be called on a command executor after a '
+'non-batchable command')
+
+f = pycompat.futures.Future()
+
+self._calls.append((command, args, fn, f))
+
+return f
+
+def sendcommands(self):
+if self._sent:
+return
+
+if not self._calls:
+return
+
+self._sent = True
+
+if len(self._calls) == 1:
+