D3388: wireprotov2: add support for more response types
This revision was automatically updated to reflect the committed changes. Closed by commit rHG564a3eec6e63: wireprotov2: add support for more response types (authored by indygreg, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D3388?vs=8304=8331 REVISION DETAIL https://phab.mercurial-scm.org/D3388 AFFECTED FILES mercurial/wireprotoframing.py mercurial/wireprototypes.py mercurial/wireprotov2server.py CHANGE DETAILS diff --git a/mercurial/wireprotov2server.py b/mercurial/wireprotov2server.py --- a/mercurial/wireprotov2server.py +++ b/mercurial/wireprotov2server.py @@ -306,6 +306,15 @@ action, meta = reactor.oncommandresponseready(outstream, command['requestid'], encoded) +elif isinstance(rsp, wireprototypes.v2streamingresponse): +action, meta = reactor.oncommandresponsereadygen(outstream, + command['requestid'], + rsp.gen) +elif isinstance(rsp, wireprototypes.v2errorresponse): +action, meta = reactor.oncommanderror(outstream, + command['requestid'], + rsp.message, + rsp.args) else: action, meta = reactor.onservererror( _('unhandled response type from wire proto command')) diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py --- a/mercurial/wireprototypes.py +++ b/mercurial/wireprototypes.py @@ -106,6 +106,22 @@ def __init__(self, v): self.value = v +class v2errorresponse(object): +"""Represents a command error for version 2 transports.""" +def __init__(self, message, args=None): +self.message = message +self.args = args + +class v2streamingresponse(object): +"""A response whose data is supplied by a generator. + +The generator can either consist of data structures to CBOR +encode or a stream of already-encoded bytes. +""" +def __init__(self, gen, compressible=True): +self.gen = gen +self.compressible = compressible + # list of nodes encoding / decoding def decodelist(l, sep=' '): if l: diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -386,6 +386,56 @@ if done: break +def createbytesresponseframesfromgen(stream, requestid, gen, + maxframesize=DEFAULT_MAX_FRAME_SIZE): +overall = cbor.dumps({b'status': b'ok'}, canonical=True) + +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_CONTINUATION, + payload=overall) + +cb = util.chunkbuffer(gen) + +flags = 0 + +while True: +chunk = cb.read(maxframesize) +if not chunk: +break + +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=flags, + payload=chunk) + +flags |= FLAG_COMMAND_RESPONSE_CONTINUATION + +flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION +flags |= FLAG_COMMAND_RESPONSE_EOS +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=flags, + payload=b'') + +def createcommanderrorresponse(stream, requestid, message, args=None): +m = { +b'status': b'error', +b'error': { +b'message': message, +} +} + +if args: +m[b'error'][b'args'] = args + +overall = cbor.dumps(m, canonical=True) + +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_EOS, + payload=overall) + def createerrorframe(stream, requestid, msg, errtype): # TODO properly handle frame size limits. assert len(msg) <= DEFAULT_MAX_FRAME_SIZE @@ -634,6 +684,19 @@ 'framegen': result, } +def oncommandresponsereadygen(self, stream, requestid, gen): +"""Signal that a bytes response is ready, with data as a generator.""" +ensureserverstream(stream) + +def sendframes(): +for frame in createbytesresponseframesfromgen(stream, requestid, + gen): +yield frame + +self._activecommands.remove(requestid) + +return self._handlesendframes(sendframes()) + def
D3388: wireprotov2: add support for more response types
indygreg updated this revision to Diff 8304. REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D3388?vs=8300=8304 REVISION DETAIL https://phab.mercurial-scm.org/D3388 AFFECTED FILES mercurial/wireprotoframing.py mercurial/wireprototypes.py mercurial/wireprotov2server.py CHANGE DETAILS diff --git a/mercurial/wireprotov2server.py b/mercurial/wireprotov2server.py --- a/mercurial/wireprotov2server.py +++ b/mercurial/wireprotov2server.py @@ -306,6 +306,15 @@ action, meta = reactor.oncommandresponseready(outstream, command['requestid'], encoded) +elif isinstance(rsp, wireprototypes.v2streamingresponse): +action, meta = reactor.oncommandresponsereadygen(outstream, + command['requestid'], + rsp.gen) +elif isinstance(rsp, wireprototypes.v2errorresponse): +action, meta = reactor.oncommanderror(outstream, + command['requestid'], + rsp.message, + rsp.args) else: action, meta = reactor.onservererror( _('unhandled response type from wire proto command')) diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py --- a/mercurial/wireprototypes.py +++ b/mercurial/wireprototypes.py @@ -106,6 +106,22 @@ def __init__(self, v): self.value = v +class v2errorresponse(object): +"""Represents a command error for version 2 transports.""" +def __init__(self, message, args=None): +self.message = message +self.args = args + +class v2streamingresponse(object): +"""A response whose data is supplied by a generator. + +The generator can either consist of data structures to CBOR +encode or a stream of already-encoded bytes. +""" +def __init__(self, gen, compressible=True): +self.gen = gen +self.compressible = compressible + # list of nodes encoding / decoding def decodelist(l, sep=' '): if l: diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -386,6 +386,56 @@ if done: break +def createbytesresponseframesfromgen(stream, requestid, gen, + maxframesize=DEFAULT_MAX_FRAME_SIZE): +overall = cbor.dumps({b'status': b'ok'}, canonical=True) + +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_CONTINUATION, + payload=overall) + +cb = util.chunkbuffer(gen) + +flags = 0 + +while True: +chunk = cb.read(maxframesize) +if not chunk: +break + +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=flags, + payload=chunk) + +flags |= FLAG_COMMAND_RESPONSE_CONTINUATION + +flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION +flags |= FLAG_COMMAND_RESPONSE_EOS +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=flags, + payload=b'') + +def createcommanderrorresponse(stream, requestid, message, args=None): +m = { +b'status': b'error', +b'error': { +b'message': message, +} +} + +if args: +m[b'error'][b'args'] = args + +overall = cbor.dumps(m, canonical=True) + +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_EOS, + payload=overall) + def createerrorframe(stream, requestid, msg, errtype): # TODO properly handle frame size limits. assert len(msg) <= DEFAULT_MAX_FRAME_SIZE @@ -634,6 +684,19 @@ 'framegen': result, } +def oncommandresponsereadygen(self, stream, requestid, gen): +"""Signal that a bytes response is ready, with data as a generator.""" +ensureserverstream(stream) + +def sendframes(): +for frame in createbytesresponseframesfromgen(stream, requestid, + gen): +yield frame + +self._activecommands.remove(requestid) + +return self._handlesendframes(sendframes()) + def oninputeof(self): """Signals that end of input has been received. @@ -655,13 +718,39 @@ 'framegen': makegen(), } +def
D3388: wireprotov2: add support for more response types
indygreg created this revision. Herald added a subscriber: mercurial-devel. Herald added a reviewer: hg-reviewers. REVISION SUMMARY This adds types to represent error and generator responses from server commands. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D3388 AFFECTED FILES mercurial/wireprotoframing.py mercurial/wireprototypes.py mercurial/wireprotov2server.py CHANGE DETAILS diff --git a/mercurial/wireprotov2server.py b/mercurial/wireprotov2server.py --- a/mercurial/wireprotov2server.py +++ b/mercurial/wireprotov2server.py @@ -306,6 +306,15 @@ action, meta = reactor.oncommandresponseready(outstream, command['requestid'], encoded) +elif isinstance(rsp, wireprototypes.v2streamingresponse): +action, meta = reactor.oncommandresponsereadygen(outstream, + command['requestid'], + rsp.gen) +elif isinstance(rsp, wireprototypes.v2errorresponse): +action, meta = reactor.oncommanderror(outstream, + command['requestid'], + rsp.message, + rsp.args) else: action, meta = reactor.onservererror( _('unhandled response type from wire proto command')) diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py --- a/mercurial/wireprototypes.py +++ b/mercurial/wireprototypes.py @@ -106,6 +106,23 @@ def __init__(self, v): self.value = v +class v2errorresponse(object): +"""Represents a command error for version 2 transports.""" +def __init__(self, message, args=None): +self.message = message +self.args = args + +class v2streamingresponse(object): +"""A response whose data is supplied by a generator. + +The generator can either consist of data structures to CBOR +encode or a stream of already-encoded bytes. +""" +def __init__(self, gen, compressible=True, iscbor=False): +self.gen = gen +self.compressible = compressible +self.iscbor = iscbor + # list of nodes encoding / decoding def decodelist(l, sep=' '): if l: diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -386,6 +386,56 @@ if done: break +def createbytesresponseframesfromgen(stream, requestid, gen, + maxframesize=DEFAULT_MAX_FRAME_SIZE): +overall = cbor.dumps({b'status': b'ok'}, canonical=True) + +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_CONTINUATION, + payload=overall) + +cb = util.chunkbuffer(gen) + +flags = 0 + +while True: +chunk = cb.read(maxframesize) +if not chunk: +break + +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=flags, + payload=chunk) + +flags |= FLAG_COMMAND_RESPONSE_CONTINUATION + +flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION +flags |= FLAG_COMMAND_RESPONSE_EOS +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=flags, + payload=b'') + +def createcommanderrorresponse(stream, requestid, message, args=None): +m = { +b'status': b'error', +b'error': { +b'message': message, +} +} + +if args: +m[b'error'][b'args'] = args + +overall = cbor.dumps(m, canonical=True) + +yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_EOS, + payload=overall) + def createerrorframe(stream, requestid, msg, errtype): # TODO properly handle frame size limits. assert len(msg) <= DEFAULT_MAX_FRAME_SIZE @@ -634,6 +684,19 @@ 'framegen': result, } +def oncommandresponsereadygen(self, stream, requestid, gen): +"""Signal that a bytes response is ready, with data as a generator.""" +ensureserverstream(stream) + +def sendframes(): +for frame in createbytesresponseframesfromgen(stream, requestid, + gen): +yield frame + +self._activecommands.remove(requestid) + +return self._handlesendframes(sendframes()) + def oninputeof(self):