D4474: wireprotov2peer: stream decoded responses

2018-09-12 Thread indygreg (Gregory Szorc)
This revision was automatically updated to reflect the committed changes.
Closed by commit rHGd06834e0f48e: wireprotov2peer: stream decoded responses 
(authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D4474?vs=10792=10935

REVISION DETAIL
  https://phab.mercurial-scm.org/D4474

AFFECTED FILES
  mercurial/debugcommands.py
  mercurial/wireprotov2peer.py
  tests/test-http-api-httpv2.t
  tests/test-wireproto-command-capabilities.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-command-capabilities.t 
b/tests/test-wireproto-command-capabilities.t
--- a/tests/test-wireproto-command-capabilities.t
+++ b/tests/test-wireproto-command-capabilities.t
@@ -349,10 +349,7 @@
   s> 0\r\n
   s> \r\n
   received frame(size=0; request=1; stream=2; streamflags=; 
type=command-response; flags=eos)
-  response: [
-{
-  b'status': b'ok'
-},
+  response: gen[
 {
   b'commands': {
 b'branchmap': {
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -225,10 +225,7 @@
   s> 0\r\n
   s> \r\n
   received frame(size=0; request=1; stream=2; streamflags=; 
type=command-response; flags=eos)
-  response: [
-{
-  b'status': b'ok'
-},
+  response: gen[
 b'customreadonly bytes response'
   ]
 
diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py
--- a/mercurial/wireprotov2peer.py
+++ b/mercurial/wireprotov2peer.py
@@ -7,11 +7,12 @@
 
 from __future__ import absolute_import
 
+import threading
+
 from .i18n import _
 from . import (
 encoding,
 error,
-util,
 wireprotoframing,
 )
 from .utils import (
@@ -34,20 +35,101 @@
 return b''.join(chunks)
 
 class commandresponse(object):
-"""Represents the response to a command request."""
+"""Represents the response to a command request.
+
+Instances track the state of the command and hold its results.
+
+An external entity is required to update the state of the object when
+events occur.
+"""
 
 def __init__(self, requestid, command):
 self.requestid = requestid
 self.command = command
 
-self.b = util.bytesio()
+# Whether all remote input related to this command has been
+# received.
+self._inputcomplete = False
+
+# We have a lock that is acquired when important object state is
+# mutated. This is to prevent race conditions between 1 thread
+# sending us new data and another consuming it.
+self._lock = threading.RLock()
+
+# An event is set when state of the object changes. This event
+# is waited on by the generator emitting objects.
+self._serviceable = threading.Event()
+
+self._pendingevents = []
+self._decoder = cborutil.bufferingdecoder()
+self._seeninitial = False
+
+def _oninputcomplete(self):
+with self._lock:
+self._inputcomplete = True
+self._serviceable.set()
+
+def _onresponsedata(self, data):
+available, readcount, wanted = self._decoder.decode(data)
+
+if not available:
+return
+
+with self._lock:
+for o in self._decoder.getavailable():
+if not self._seeninitial:
+self._handleinitial(o)
+continue
+
+self._pendingevents.append(o)
+
+self._serviceable.set()
 
-def cborobjects(self):
-"""Obtain decoded CBOR objects from this response."""
-self.b.seek(0)
+def _handleinitial(self, o):
+self._seeninitial = True
+if o[b'status'] == 'ok':
+return
+
+atoms = [{'msg': o[b'error'][b'message']}]
+if b'args' in o[b'error']:
+atoms[0]['args'] = o[b'error'][b'args']
+
+raise error.RepoError(formatrichmessage(atoms))
+
+def objects(self):
+"""Obtained decoded objects from this response.
+
+This is a generator of data structures that were decoded from the
+command response.
+
+Obtaining the next member of the generator may block due to waiting
+on external data to become available.
 
-for v in cborutil.decodeall(self.b.getvalue()):
-yield v
+If the server encountered an error in the middle of serving the data
+or if another error occurred, an exception may be raised when
+advancing the generator.
+"""
+while True:
+# TODO this can infinite loop if self._inputcomplete is never
+# set. We likely want to tie the lifetime of this object/state
+# to that of the background thread receiving frames and updating
+# our state.
+self._serviceable.wait(1.0)
+
+with self._lock:
+self._serviceable.clear()
+
+# Make copies because 

D4474: wireprotov2peer: stream decoded responses

2018-09-05 Thread indygreg (Gregory Szorc)
indygreg updated this revision to Diff 10792.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D4474?vs=10771=10792

REVISION DETAIL
  https://phab.mercurial-scm.org/D4474

AFFECTED FILES
  mercurial/debugcommands.py
  mercurial/wireprotov2peer.py
  tests/test-http-api-httpv2.t
  tests/test-wireproto-command-capabilities.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-command-capabilities.t 
b/tests/test-wireproto-command-capabilities.t
--- a/tests/test-wireproto-command-capabilities.t
+++ b/tests/test-wireproto-command-capabilities.t
@@ -349,10 +349,7 @@
   s> 0\r\n
   s> \r\n
   received frame(size=0; request=1; stream=2; streamflags=; 
type=command-response; flags=eos)
-  response: [
-{
-  b'status': b'ok'
-},
+  response: gen[
 {
   b'commands': {
 b'branchmap': {
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -225,10 +225,7 @@
   s> 0\r\n
   s> \r\n
   received frame(size=0; request=1; stream=2; streamflags=; 
type=command-response; flags=eos)
-  response: [
-{
-  b'status': b'ok'
-},
+  response: gen[
 b'customreadonly bytes response'
   ]
 
diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py
--- a/mercurial/wireprotov2peer.py
+++ b/mercurial/wireprotov2peer.py
@@ -7,11 +7,12 @@
 
 from __future__ import absolute_import
 
+import threading
+
 from .i18n import _
 from . import (
 encoding,
 error,
-util,
 wireprotoframing,
 )
 from .utils import (
@@ -34,20 +35,101 @@
 return b''.join(chunks)
 
 class commandresponse(object):
-"""Represents the response to a command request."""
+"""Represents the response to a command request.
+
+Instances track the state of the command and hold its results.
+
+An external entity is required to update the state of the object when
+events occur.
+"""
 
 def __init__(self, requestid, command):
 self.requestid = requestid
 self.command = command
 
-self.b = util.bytesio()
+# Whether all remote input related to this command has been
+# received.
+self._inputcomplete = False
+
+# We have a lock that is acquired when important object state is
+# mutated. This is to prevent race conditions between 1 thread
+# sending us new data and another consuming it.
+self._lock = threading.RLock()
+
+# An event is set when state of the object changes. This event
+# is waited on by the generator emitting objects.
+self._serviceable = threading.Event()
+
+self._pendingevents = []
+self._decoder = cborutil.bufferingdecoder()
+self._seeninitial = False
+
+def _oninputcomplete(self):
+with self._lock:
+self._inputcomplete = True
+self._serviceable.set()
+
+def _onresponsedata(self, data):
+available, readcount, wanted = self._decoder.decode(data)
+
+if not available:
+return
+
+with self._lock:
+for o in self._decoder.getavailable():
+if not self._seeninitial:
+self._handleinitial(o)
+continue
+
+self._pendingevents.append(o)
+
+self._serviceable.set()
 
-def cborobjects(self):
-"""Obtain decoded CBOR objects from this response."""
-self.b.seek(0)
+def _handleinitial(self, o):
+self._seeninitial = True
+if o[b'status'] == 'ok':
+return
+
+atoms = [{'msg': o[b'error'][b'message']}]
+if b'args' in o[b'error']:
+atoms[0]['args'] = o[b'error'][b'args']
+
+raise error.RepoError(formatrichmessage(atoms))
+
+def objects(self):
+"""Obtained decoded objects from this response.
+
+This is a generator of data structures that were decoded from the
+command response.
+
+Obtaining the next member of the generator may block due to waiting
+on external data to become available.
 
-for v in cborutil.decodeall(self.b.getvalue()):
-yield v
+If the server encountered an error in the middle of serving the data
+or if another error occurred, an exception may be raised when
+advancing the generator.
+"""
+while True:
+# TODO this can infinite loop if self._inputcomplete is never
+# set. We likely want to tie the lifetime of this object/state
+# to that of the background thread receiving frames and updating
+# our state.
+self._serviceable.wait(1.0)
+
+with self._lock:
+self._serviceable.clear()
+
+# Make copies because objects could be mutated during
+# iteration.
+stop = self._inputcomplete
+pending = 

D4474: wireprotov2peer: stream decoded responses

2018-09-04 Thread indygreg (Gregory Szorc)
indygreg created this revision.
Herald added subscribers: mercurial-devel, mjpieters.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  Previously, wire protocol version 2 would buffer all response data.
  Only once all data was received did we CBOR decode it and resolve
  the future associated with the command. This was obviously not
  desirable. In future commits that introduce large response payloads,
  this caused significant memory bloat and slowed down client
  operations due to waiting on the server.
  
  This commit refactors the response handling code so that response
  data can be streamed.
  
  Command response objects now contain a buffered CBOR decoder. As
  new data arrives, it is fed into the decoder. Decoded objects are
  made available to the generator as they are decoded.
  
  Because there is a separate thread processing incoming frames and
  feeding data into the response object, there is the potential for
  race conditions when mutating response objects. So a lock has been
  added to guard access to critical state variables.
  
  Because the generator emitting decoded objects needs to wait on
  those objects to become available, we've added an Event for the
  generator to wait on so it doesn't busy loop. This does mean
  there is the potential for deadlocks. And I'm pretty sure they can
  occur in some scenarios. We already have a handful of TODOs around
  this. But I've added some more. Fixing this will likely require
  moving the background thread receiving frames into clienthandler.
  We likely would have done this anyway when implementing the client
  bits for the SSH transport.
  
  Test output changes because the initial CBOR map holding the overall
  response state is now always handled internally by the response
  object.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D4474

AFFECTED FILES
  mercurial/debugcommands.py
  mercurial/wireprotov2peer.py
  tests/test-http-api-httpv2.t
  tests/test-wireproto-command-capabilities.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-command-capabilities.t 
b/tests/test-wireproto-command-capabilities.t
--- a/tests/test-wireproto-command-capabilities.t
+++ b/tests/test-wireproto-command-capabilities.t
@@ -349,10 +349,7 @@
   s> 0\r\n
   s> \r\n
   received frame(size=0; request=1; stream=2; streamflags=; 
type=command-response; flags=eos)
-  response: [
-{
-  b'status': b'ok'
-},
+  response: gen[
 {
   b'commands': {
 b'branchmap': {
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -225,10 +225,7 @@
   s> 0\r\n
   s> \r\n
   received frame(size=0; request=1; stream=2; streamflags=; 
type=command-response; flags=eos)
-  response: [
-{
-  b'status': b'ok'
-},
+  response: gen[
 b'customreadonly bytes response'
   ]
 
diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py
--- a/mercurial/wireprotov2peer.py
+++ b/mercurial/wireprotov2peer.py
@@ -7,11 +7,12 @@
 
 from __future__ import absolute_import
 
+import threading
+
 from .i18n import _
 from . import (
 encoding,
 error,
-util,
 wireprotoframing,
 )
 from .utils import (
@@ -34,20 +35,101 @@
 return b''.join(chunks)
 
 class commandresponse(object):
-"""Represents the response to a command request."""
+"""Represents the response to a command request.
+
+Instances track the state of the command and hold its results.
+
+An external entity is required to update the state of the object when
+events occur.
+"""
 
 def __init__(self, requestid, command):
 self.requestid = requestid
 self.command = command
 
-self.b = util.bytesio()
+# Whether all remote input related to this command has been
+# received.
+self._inputcomplete = False
+
+# We have a lock that is acquired when important object state is
+# mutated. This is to prevent race conditions between 1 thread
+# sending us new data and another consuming it.
+self._lock = threading.RLock()
+
+# An event is set when state of the object changes. This event
+# is waited on by the generator emitting objects.
+self._serviceable = threading.Event()
+
+self._pendingevents = []
+self._decoder = cborutil.bufferingdecoder()
+self._seeninitial = False
+
+def _oninputcomplete(self):
+with self._lock:
+self._inputcomplete = True
+self._serviceable.set()
+
+def _onresponsedata(self, data):
+available, readcount, wanted = self._decoder.decode(data)
+
+if not available:
+return
+
+with self._lock:
+for o in self._decoder.getavailable():
+if not self._seeninitial:
+self._handleinitial(o)
+continue
+
+