D4474: wireprotov2peer: stream decoded responses
This revision was automatically updated to reflect the committed changes. Closed by commit rHGd06834e0f48e: wireprotov2peer: stream decoded responses (authored by indygreg, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D4474?vs=10792=10935 REVISION DETAIL https://phab.mercurial-scm.org/D4474 AFFECTED FILES mercurial/debugcommands.py mercurial/wireprotov2peer.py tests/test-http-api-httpv2.t tests/test-wireproto-command-capabilities.t CHANGE DETAILS diff --git a/tests/test-wireproto-command-capabilities.t b/tests/test-wireproto-command-capabilities.t --- a/tests/test-wireproto-command-capabilities.t +++ b/tests/test-wireproto-command-capabilities.t @@ -349,10 +349,7 @@ s> 0\r\n s> \r\n received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) - response: [ -{ - b'status': b'ok' -}, + response: gen[ { b'commands': { b'branchmap': { diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t --- a/tests/test-http-api-httpv2.t +++ b/tests/test-http-api-httpv2.t @@ -225,10 +225,7 @@ s> 0\r\n s> \r\n received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) - response: [ -{ - b'status': b'ok' -}, + response: gen[ b'customreadonly bytes response' ] diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py --- a/mercurial/wireprotov2peer.py +++ b/mercurial/wireprotov2peer.py @@ -7,11 +7,12 @@ from __future__ import absolute_import +import threading + from .i18n import _ from . import ( encoding, error, -util, wireprotoframing, ) from .utils import ( @@ -34,20 +35,101 @@ return b''.join(chunks) class commandresponse(object): -"""Represents the response to a command request.""" +"""Represents the response to a command request. + +Instances track the state of the command and hold its results. + +An external entity is required to update the state of the object when +events occur. +""" def __init__(self, requestid, command): self.requestid = requestid self.command = command -self.b = util.bytesio() +# Whether all remote input related to this command has been +# received. +self._inputcomplete = False + +# We have a lock that is acquired when important object state is +# mutated. This is to prevent race conditions between 1 thread +# sending us new data and another consuming it. +self._lock = threading.RLock() + +# An event is set when state of the object changes. This event +# is waited on by the generator emitting objects. +self._serviceable = threading.Event() + +self._pendingevents = [] +self._decoder = cborutil.bufferingdecoder() +self._seeninitial = False + +def _oninputcomplete(self): +with self._lock: +self._inputcomplete = True +self._serviceable.set() + +def _onresponsedata(self, data): +available, readcount, wanted = self._decoder.decode(data) + +if not available: +return + +with self._lock: +for o in self._decoder.getavailable(): +if not self._seeninitial: +self._handleinitial(o) +continue + +self._pendingevents.append(o) + +self._serviceable.set() -def cborobjects(self): -"""Obtain decoded CBOR objects from this response.""" -self.b.seek(0) +def _handleinitial(self, o): +self._seeninitial = True +if o[b'status'] == 'ok': +return + +atoms = [{'msg': o[b'error'][b'message']}] +if b'args' in o[b'error']: +atoms[0]['args'] = o[b'error'][b'args'] + +raise error.RepoError(formatrichmessage(atoms)) + +def objects(self): +"""Obtained decoded objects from this response. + +This is a generator of data structures that were decoded from the +command response. + +Obtaining the next member of the generator may block due to waiting +on external data to become available. -for v in cborutil.decodeall(self.b.getvalue()): -yield v +If the server encountered an error in the middle of serving the data +or if another error occurred, an exception may be raised when +advancing the generator. +""" +while True: +# TODO this can infinite loop if self._inputcomplete is never +# set. We likely want to tie the lifetime of this object/state +# to that of the background thread receiving frames and updating +# our state. +self._serviceable.wait(1.0) + +with self._lock: +self._serviceable.clear() + +# Make copies because
D4474: wireprotov2peer: stream decoded responses
indygreg updated this revision to Diff 10792. REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D4474?vs=10771=10792 REVISION DETAIL https://phab.mercurial-scm.org/D4474 AFFECTED FILES mercurial/debugcommands.py mercurial/wireprotov2peer.py tests/test-http-api-httpv2.t tests/test-wireproto-command-capabilities.t CHANGE DETAILS diff --git a/tests/test-wireproto-command-capabilities.t b/tests/test-wireproto-command-capabilities.t --- a/tests/test-wireproto-command-capabilities.t +++ b/tests/test-wireproto-command-capabilities.t @@ -349,10 +349,7 @@ s> 0\r\n s> \r\n received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) - response: [ -{ - b'status': b'ok' -}, + response: gen[ { b'commands': { b'branchmap': { diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t --- a/tests/test-http-api-httpv2.t +++ b/tests/test-http-api-httpv2.t @@ -225,10 +225,7 @@ s> 0\r\n s> \r\n received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) - response: [ -{ - b'status': b'ok' -}, + response: gen[ b'customreadonly bytes response' ] diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py --- a/mercurial/wireprotov2peer.py +++ b/mercurial/wireprotov2peer.py @@ -7,11 +7,12 @@ from __future__ import absolute_import +import threading + from .i18n import _ from . import ( encoding, error, -util, wireprotoframing, ) from .utils import ( @@ -34,20 +35,101 @@ return b''.join(chunks) class commandresponse(object): -"""Represents the response to a command request.""" +"""Represents the response to a command request. + +Instances track the state of the command and hold its results. + +An external entity is required to update the state of the object when +events occur. +""" def __init__(self, requestid, command): self.requestid = requestid self.command = command -self.b = util.bytesio() +# Whether all remote input related to this command has been +# received. +self._inputcomplete = False + +# We have a lock that is acquired when important object state is +# mutated. This is to prevent race conditions between 1 thread +# sending us new data and another consuming it. +self._lock = threading.RLock() + +# An event is set when state of the object changes. This event +# is waited on by the generator emitting objects. +self._serviceable = threading.Event() + +self._pendingevents = [] +self._decoder = cborutil.bufferingdecoder() +self._seeninitial = False + +def _oninputcomplete(self): +with self._lock: +self._inputcomplete = True +self._serviceable.set() + +def _onresponsedata(self, data): +available, readcount, wanted = self._decoder.decode(data) + +if not available: +return + +with self._lock: +for o in self._decoder.getavailable(): +if not self._seeninitial: +self._handleinitial(o) +continue + +self._pendingevents.append(o) + +self._serviceable.set() -def cborobjects(self): -"""Obtain decoded CBOR objects from this response.""" -self.b.seek(0) +def _handleinitial(self, o): +self._seeninitial = True +if o[b'status'] == 'ok': +return + +atoms = [{'msg': o[b'error'][b'message']}] +if b'args' in o[b'error']: +atoms[0]['args'] = o[b'error'][b'args'] + +raise error.RepoError(formatrichmessage(atoms)) + +def objects(self): +"""Obtained decoded objects from this response. + +This is a generator of data structures that were decoded from the +command response. + +Obtaining the next member of the generator may block due to waiting +on external data to become available. -for v in cborutil.decodeall(self.b.getvalue()): -yield v +If the server encountered an error in the middle of serving the data +or if another error occurred, an exception may be raised when +advancing the generator. +""" +while True: +# TODO this can infinite loop if self._inputcomplete is never +# set. We likely want to tie the lifetime of this object/state +# to that of the background thread receiving frames and updating +# our state. +self._serviceable.wait(1.0) + +with self._lock: +self._serviceable.clear() + +# Make copies because objects could be mutated during +# iteration. +stop = self._inputcomplete +pending =
D4474: wireprotov2peer: stream decoded responses
indygreg created this revision. Herald added subscribers: mercurial-devel, mjpieters. Herald added a reviewer: hg-reviewers. REVISION SUMMARY Previously, wire protocol version 2 would buffer all response data. Only once all data was received did we CBOR decode it and resolve the future associated with the command. This was obviously not desirable. In future commits that introduce large response payloads, this caused significant memory bloat and slowed down client operations due to waiting on the server. This commit refactors the response handling code so that response data can be streamed. Command response objects now contain a buffered CBOR decoder. As new data arrives, it is fed into the decoder. Decoded objects are made available to the generator as they are decoded. Because there is a separate thread processing incoming frames and feeding data into the response object, there is the potential for race conditions when mutating response objects. So a lock has been added to guard access to critical state variables. Because the generator emitting decoded objects needs to wait on those objects to become available, we've added an Event for the generator to wait on so it doesn't busy loop. This does mean there is the potential for deadlocks. And I'm pretty sure they can occur in some scenarios. We already have a handful of TODOs around this. But I've added some more. Fixing this will likely require moving the background thread receiving frames into clienthandler. We likely would have done this anyway when implementing the client bits for the SSH transport. Test output changes because the initial CBOR map holding the overall response state is now always handled internally by the response object. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D4474 AFFECTED FILES mercurial/debugcommands.py mercurial/wireprotov2peer.py tests/test-http-api-httpv2.t tests/test-wireproto-command-capabilities.t CHANGE DETAILS diff --git a/tests/test-wireproto-command-capabilities.t b/tests/test-wireproto-command-capabilities.t --- a/tests/test-wireproto-command-capabilities.t +++ b/tests/test-wireproto-command-capabilities.t @@ -349,10 +349,7 @@ s> 0\r\n s> \r\n received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) - response: [ -{ - b'status': b'ok' -}, + response: gen[ { b'commands': { b'branchmap': { diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t --- a/tests/test-http-api-httpv2.t +++ b/tests/test-http-api-httpv2.t @@ -225,10 +225,7 @@ s> 0\r\n s> \r\n received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) - response: [ -{ - b'status': b'ok' -}, + response: gen[ b'customreadonly bytes response' ] diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py --- a/mercurial/wireprotov2peer.py +++ b/mercurial/wireprotov2peer.py @@ -7,11 +7,12 @@ from __future__ import absolute_import +import threading + from .i18n import _ from . import ( encoding, error, -util, wireprotoframing, ) from .utils import ( @@ -34,20 +35,101 @@ return b''.join(chunks) class commandresponse(object): -"""Represents the response to a command request.""" +"""Represents the response to a command request. + +Instances track the state of the command and hold its results. + +An external entity is required to update the state of the object when +events occur. +""" def __init__(self, requestid, command): self.requestid = requestid self.command = command -self.b = util.bytesio() +# Whether all remote input related to this command has been +# received. +self._inputcomplete = False + +# We have a lock that is acquired when important object state is +# mutated. This is to prevent race conditions between 1 thread +# sending us new data and another consuming it. +self._lock = threading.RLock() + +# An event is set when state of the object changes. This event +# is waited on by the generator emitting objects. +self._serviceable = threading.Event() + +self._pendingevents = [] +self._decoder = cborutil.bufferingdecoder() +self._seeninitial = False + +def _oninputcomplete(self): +with self._lock: +self._inputcomplete = True +self._serviceable.set() + +def _onresponsedata(self, data): +available, readcount, wanted = self._decoder.decode(data) + +if not available: +return + +with self._lock: +for o in self._decoder.getavailable(): +if not self._seeninitial: +self._handleinitial(o) +continue + +