indygreg updated this revision to Diff 8121.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3268?vs=8034&id=8121

REVISION DETAIL
  https://phab.mercurial-scm.org/D3268

AFFECTED FILES
  mercurial/localrepo.py
  mercurial/repository.py
  mercurial/setdiscovery.py
  mercurial/wireprotov1peer.py
  tests/test-check-interfaces.py

CHANGE DETAILS

diff --git a/tests/test-check-interfaces.py b/tests/test-check-interfaces.py
--- a/tests/test-check-interfaces.py
+++ b/tests/test-check-interfaces.py
@@ -23,6 +23,7 @@
     vfs as vfsmod,
     wireprotoserver,
     wireprototypes,
+    wireprotov1peer,
     wireprotov2server,
 )
 
@@ -102,6 +103,14 @@
                          localrepo.localpeer)
     checkzobject(localrepo.localpeer(dummyrepo()))
 
+    ziverify.verifyClass(repository.ipeercommandexecutor,
+                         localrepo.localcommandexecutor)
+    checkzobject(localrepo.localcommandexecutor(None))
+
+    ziverify.verifyClass(repository.ipeercommandexecutor,
+                         wireprotov1peer.peerexecutor)
+    checkzobject(wireprotov1peer.peerexecutor(None))
+
     ziverify.verifyClass(repository.ipeerbaselegacycommands,
                          sshpeer.sshv1peer)
     checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, 
dummypipe(),
diff --git a/mercurial/wireprotov1peer.py b/mercurial/wireprotov1peer.py
--- a/mercurial/wireprotov1peer.py
+++ b/mercurial/wireprotov1peer.py
@@ -8,12 +8,15 @@
 from __future__ import absolute_import
 
 import hashlib
+import sys
 
 from .i18n import _
 from .node import (
     bin,
 )
-
+from .thirdparty.zope import (
+    interface as zi,
+)
 from . import (
     bundle2,
     changegroup as changegroupmod,
@@ -177,14 +180,104 @@
 
     return ';'.join(cmds)
 
+@zi.implementer(repository.ipeercommandexecutor)
+class peerexecutor(object):
+    def __init__(self, peer):
+        self._peer = peer
+        self._sent = False
+        self._closed = False
+        self._calls = []
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exctype, excvalee, exctb):
+        self.close()
+
+    def callcommand(self, command, args):
+        if self._sent:
+            raise error.ProgrammingError('callcommand() cannot be used '
+                                         'after commands are sent')
+
+        if self._closed:
+            raise error.ProgrammingError('callcommand() cannot be used '
+                                         'after close()')
+
+        # Commands are dispatched through methods on the peer.
+        fn = getattr(self._peer, pycompat.sysstr(command), None)
+
+        if not fn:
+            raise error.ProgrammingError(
+                'cannot call command %s: method of same name not available '
+                'on peer' % command)
+
+        # Commands are either batchable or they aren't. If a command
+        # isn't batchable, we send it immediately because the executor
+        # can no longer accept new commands after a non-batchable command.
+        # If a command is batchable, we queue it for later.
+
+        if getattr(fn, 'batchable', False):
+            pass
+        else:
+            if self._calls:
+                raise error.ProgrammingError(
+                    '%s is not batchable and cannot be called on a command '
+                    'executor along with other commands' % command)
+
+        # We don't support batching yet. So resolve it immediately.
+        f = pycompat.futures.Future()
+        self._calls.append((command, args, fn, f))
+        self.sendcommands()
+        return f
+
+    def sendcommands(self):
+        if self._sent:
+            return
+
+        if not self._calls:
+            return
+
+        self._sent = True
+
+        calls = self._calls
+        # Mainly to destroy references to futures.
+        self._calls = None
+
+        if len(calls) == 1:
+            command, args, fn, f = calls[0]
+
+            # Future was cancelled. Ignore it.
+            if not f.set_running_or_notify_cancel():
+                return
+
+            try:
+                result = fn(**pycompat.strkwargs(args))
+            except Exception:
+                f.set_exception_info(*sys.exc_info()[1:])
+            else:
+                f.set_result(result)
+
+            return
+
+        raise error.ProgrammingError('support for multiple commands not '
+                                     'yet implemented')
+
+    def close(self):
+        self.sendcommands()
+
+        self._closed = True
+
 class wirepeer(repository.legacypeer):
     """Client-side interface for communicating with a peer repository.
 
     Methods commonly call wire protocol commands of the same name.
 
     See also httppeer.py and sshpeer.py for protocol-specific
     implementations of this interface.
     """
+    def commandexecutor(self):
+        return peerexecutor(self)
+
     # Begin of ipeercommands interface.
 
     def iterbatch(self):
diff --git a/mercurial/setdiscovery.py b/mercurial/setdiscovery.py
--- a/mercurial/setdiscovery.py
+++ b/mercurial/setdiscovery.py
@@ -228,7 +228,12 @@
                  % (roundtrips, len(undecided), len(sample)))
         # indices between sample and externalized version must match
         sample = list(sample)
-        yesno = remote.known(dag.externalizeall(sample))
+
+        with remote.commandexecutor() as e:
+            yesno = e.callcommand('known', {
+                'nodes': dag.externalizeall(sample),
+            }).result()
+
         full = True
 
         if sample:
diff --git a/mercurial/repository.py b/mercurial/repository.py
--- a/mercurial/repository.py
+++ b/mercurial/repository.py
@@ -278,7 +278,8 @@
         being issued.
         """
 
-class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands):
+class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands,
+                ipeerrequests):
     """Unified interface for peer repositories.
 
     All peer instances must conform to this interface.
diff --git a/mercurial/localrepo.py b/mercurial/localrepo.py
--- a/mercurial/localrepo.py
+++ b/mercurial/localrepo.py
@@ -11,6 +11,7 @@
 import hashlib
 import os
 import random
+import sys
 import time
 import weakref
 
@@ -167,6 +168,49 @@
             resref.set(getattr(self.local, name)(*args, **opts))
             yield resref.value
 
+@zi.implementer(repository.ipeercommandexecutor)
+class localcommandexecutor(object):
+    def __init__(self, peer):
+        self._peer = peer
+        self._sent = False
+        self._closed = False
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exctype, excvalue, exctb):
+        self.close()
+
+    def callcommand(self, command, args):
+        if self._sent:
+            raise error.ProgrammingError('callcommand() cannot be used after '
+                                         'sendcommands()')
+
+        if self._closed:
+            raise error.ProgrammingError('callcommand() cannot be used after '
+                                         'close()')
+
+        # We don't need to support anything fancy. Just call the named
+        # method on the peer and return a resolved future.
+        fn = getattr(self._peer, pycompat.sysstr(command))
+
+        f = pycompat.futures.Future()
+
+        try:
+            result = fn(**args)
+        except Exception:
+            f.set_exception_info(*sys.exc_info()[1:])
+        else:
+            f.set_result(result)
+
+        return f
+
+    def sendcommands(self):
+        self._sent = True
+
+    def close(self):
+        self._closed = True
+
 class localpeer(repository.peer):
     '''peer for a local repo; reflects only the most recent API'''
 
@@ -286,6 +330,9 @@
 
     # Begin of peer interface.
 
+    def commandexecutor(self):
+        return localcommandexecutor(self)
+
     def iterbatch(self):
         return localiterbatcher(self)
 



To: indygreg, #hg-reviewers
Cc: mercurial-devel
_______________________________________________
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel

Reply via email to