D3388: wireprotov2: add support for more response types

2018-04-16 Thread indygreg (Gregory Szorc)
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG564a3eec6e63: wireprotov2: add support for more response 
types (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3388?vs=8304=8331

REVISION DETAIL
  https://phab.mercurial-scm.org/D3388

AFFECTED FILES
  mercurial/wireprotoframing.py
  mercurial/wireprototypes.py
  mercurial/wireprotov2server.py

CHANGE DETAILS

diff --git a/mercurial/wireprotov2server.py b/mercurial/wireprotov2server.py
--- a/mercurial/wireprotov2server.py
+++ b/mercurial/wireprotov2server.py
@@ -306,6 +306,15 @@
 action, meta = reactor.oncommandresponseready(outstream,
   command['requestid'],
   encoded)
+elif isinstance(rsp, wireprototypes.v2streamingresponse):
+action, meta = reactor.oncommandresponsereadygen(outstream,
+ command['requestid'],
+ rsp.gen)
+elif isinstance(rsp, wireprototypes.v2errorresponse):
+action, meta = reactor.oncommanderror(outstream,
+  command['requestid'],
+  rsp.message,
+  rsp.args)
 else:
 action, meta = reactor.onservererror(
 _('unhandled response type from wire proto command'))
diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py
--- a/mercurial/wireprototypes.py
+++ b/mercurial/wireprototypes.py
@@ -106,6 +106,22 @@
 def __init__(self, v):
 self.value = v
 
+class v2errorresponse(object):
+"""Represents a command error for version 2 transports."""
+def __init__(self, message, args=None):
+self.message = message
+self.args = args
+
+class v2streamingresponse(object):
+"""A response whose data is supplied by a generator.
+
+The generator can either consist of data structures to CBOR
+encode or a stream of already-encoded bytes.
+"""
+def __init__(self, gen, compressible=True):
+self.gen = gen
+self.compressible = compressible
+
 # list of nodes encoding / decoding
 def decodelist(l, sep=' '):
 if l:
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -386,6 +386,56 @@
 if done:
 break
 
+def createbytesresponseframesfromgen(stream, requestid, gen,
+ maxframesize=DEFAULT_MAX_FRAME_SIZE):
+overall = cbor.dumps({b'status': b'ok'}, canonical=True)
+
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+   payload=overall)
+
+cb = util.chunkbuffer(gen)
+
+flags = 0
+
+while True:
+chunk = cb.read(maxframesize)
+if not chunk:
+break
+
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=flags,
+   payload=chunk)
+
+flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
+
+flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
+flags |= FLAG_COMMAND_RESPONSE_EOS
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=flags,
+   payload=b'')
+
+def createcommanderrorresponse(stream, requestid, message, args=None):
+m = {
+b'status': b'error',
+b'error': {
+b'message': message,
+}
+}
+
+if args:
+m[b'error'][b'args'] = args
+
+overall = cbor.dumps(m, canonical=True)
+
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=FLAG_COMMAND_RESPONSE_EOS,
+   payload=overall)
+
 def createerrorframe(stream, requestid, msg, errtype):
 # TODO properly handle frame size limits.
 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
@@ -634,6 +684,19 @@
 'framegen': result,
 }
 
+def oncommandresponsereadygen(self, stream, requestid, gen):
+"""Signal that a bytes response is ready, with data as a generator."""
+ensureserverstream(stream)
+
+def sendframes():
+for frame in createbytesresponseframesfromgen(stream, requestid,
+  gen):
+yield frame
+
+self._activecommands.remove(requestid)
+
+return self._handlesendframes(sendframes())
+
 def 

D3388: wireprotov2: add support for more response types

2018-04-15 Thread indygreg (Gregory Szorc)
indygreg updated this revision to Diff 8304.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3388?vs=8300=8304

REVISION DETAIL
  https://phab.mercurial-scm.org/D3388

AFFECTED FILES
  mercurial/wireprotoframing.py
  mercurial/wireprototypes.py
  mercurial/wireprotov2server.py

CHANGE DETAILS

diff --git a/mercurial/wireprotov2server.py b/mercurial/wireprotov2server.py
--- a/mercurial/wireprotov2server.py
+++ b/mercurial/wireprotov2server.py
@@ -306,6 +306,15 @@
 action, meta = reactor.oncommandresponseready(outstream,
   command['requestid'],
   encoded)
+elif isinstance(rsp, wireprototypes.v2streamingresponse):
+action, meta = reactor.oncommandresponsereadygen(outstream,
+ command['requestid'],
+ rsp.gen)
+elif isinstance(rsp, wireprototypes.v2errorresponse):
+action, meta = reactor.oncommanderror(outstream,
+  command['requestid'],
+  rsp.message,
+  rsp.args)
 else:
 action, meta = reactor.onservererror(
 _('unhandled response type from wire proto command'))
diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py
--- a/mercurial/wireprototypes.py
+++ b/mercurial/wireprototypes.py
@@ -106,6 +106,22 @@
 def __init__(self, v):
 self.value = v
 
+class v2errorresponse(object):
+"""Represents a command error for version 2 transports."""
+def __init__(self, message, args=None):
+self.message = message
+self.args = args
+
+class v2streamingresponse(object):
+"""A response whose data is supplied by a generator.
+
+The generator can either consist of data structures to CBOR
+encode or a stream of already-encoded bytes.
+"""
+def __init__(self, gen, compressible=True):
+self.gen = gen
+self.compressible = compressible
+
 # list of nodes encoding / decoding
 def decodelist(l, sep=' '):
 if l:
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -386,6 +386,56 @@
 if done:
 break
 
+def createbytesresponseframesfromgen(stream, requestid, gen,
+ maxframesize=DEFAULT_MAX_FRAME_SIZE):
+overall = cbor.dumps({b'status': b'ok'}, canonical=True)
+
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+   payload=overall)
+
+cb = util.chunkbuffer(gen)
+
+flags = 0
+
+while True:
+chunk = cb.read(maxframesize)
+if not chunk:
+break
+
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=flags,
+   payload=chunk)
+
+flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
+
+flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
+flags |= FLAG_COMMAND_RESPONSE_EOS
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=flags,
+   payload=b'')
+
+def createcommanderrorresponse(stream, requestid, message, args=None):
+m = {
+b'status': b'error',
+b'error': {
+b'message': message,
+}
+}
+
+if args:
+m[b'error'][b'args'] = args
+
+overall = cbor.dumps(m, canonical=True)
+
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=FLAG_COMMAND_RESPONSE_EOS,
+   payload=overall)
+
 def createerrorframe(stream, requestid, msg, errtype):
 # TODO properly handle frame size limits.
 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
@@ -634,6 +684,19 @@
 'framegen': result,
 }
 
+def oncommandresponsereadygen(self, stream, requestid, gen):
+"""Signal that a bytes response is ready, with data as a generator."""
+ensureserverstream(stream)
+
+def sendframes():
+for frame in createbytesresponseframesfromgen(stream, requestid,
+  gen):
+yield frame
+
+self._activecommands.remove(requestid)
+
+return self._handlesendframes(sendframes())
+
 def oninputeof(self):
 """Signals that end of input has been received.
 
@@ -655,13 +718,39 @@
 'framegen': makegen(),
 }
 
+def 

D3388: wireprotov2: add support for more response types

2018-04-14 Thread indygreg (Gregory Szorc)
indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This adds types to represent error and generator responses from
  server commands.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3388

AFFECTED FILES
  mercurial/wireprotoframing.py
  mercurial/wireprototypes.py
  mercurial/wireprotov2server.py

CHANGE DETAILS

diff --git a/mercurial/wireprotov2server.py b/mercurial/wireprotov2server.py
--- a/mercurial/wireprotov2server.py
+++ b/mercurial/wireprotov2server.py
@@ -306,6 +306,15 @@
 action, meta = reactor.oncommandresponseready(outstream,
   command['requestid'],
   encoded)
+elif isinstance(rsp, wireprototypes.v2streamingresponse):
+action, meta = reactor.oncommandresponsereadygen(outstream,
+ command['requestid'],
+ rsp.gen)
+elif isinstance(rsp, wireprototypes.v2errorresponse):
+action, meta = reactor.oncommanderror(outstream,
+  command['requestid'],
+  rsp.message,
+  rsp.args)
 else:
 action, meta = reactor.onservererror(
 _('unhandled response type from wire proto command'))
diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py
--- a/mercurial/wireprototypes.py
+++ b/mercurial/wireprototypes.py
@@ -106,6 +106,23 @@
 def __init__(self, v):
 self.value = v
 
+class v2errorresponse(object):
+"""Represents a command error for version 2 transports."""
+def __init__(self, message, args=None):
+self.message = message
+self.args = args
+
+class v2streamingresponse(object):
+"""A response whose data is supplied by a generator.
+
+The generator can either consist of data structures to CBOR
+encode or a stream of already-encoded bytes.
+"""
+def __init__(self, gen, compressible=True, iscbor=False):
+self.gen = gen
+self.compressible = compressible
+self.iscbor = iscbor
+
 # list of nodes encoding / decoding
 def decodelist(l, sep=' '):
 if l:
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -386,6 +386,56 @@
 if done:
 break
 
+def createbytesresponseframesfromgen(stream, requestid, gen,
+ maxframesize=DEFAULT_MAX_FRAME_SIZE):
+overall = cbor.dumps({b'status': b'ok'}, canonical=True)
+
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+   payload=overall)
+
+cb = util.chunkbuffer(gen)
+
+flags = 0
+
+while True:
+chunk = cb.read(maxframesize)
+if not chunk:
+break
+
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=flags,
+   payload=chunk)
+
+flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
+
+flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
+flags |= FLAG_COMMAND_RESPONSE_EOS
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=flags,
+   payload=b'')
+
+def createcommanderrorresponse(stream, requestid, message, args=None):
+m = {
+b'status': b'error',
+b'error': {
+b'message': message,
+}
+}
+
+if args:
+m[b'error'][b'args'] = args
+
+overall = cbor.dumps(m, canonical=True)
+
+yield stream.makeframe(requestid=requestid,
+   typeid=FRAME_TYPE_COMMAND_RESPONSE,
+   flags=FLAG_COMMAND_RESPONSE_EOS,
+   payload=overall)
+
 def createerrorframe(stream, requestid, msg, errtype):
 # TODO properly handle frame size limits.
 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
@@ -634,6 +684,19 @@
 'framegen': result,
 }
 
+def oncommandresponsereadygen(self, stream, requestid, gen):
+"""Signal that a bytes response is ready, with data as a generator."""
+ensureserverstream(stream)
+
+def sendframes():
+for frame in createbytesresponseframesfromgen(stream, requestid,
+  gen):
+yield frame
+
+self._activecommands.remove(requestid)
+
+return self._handlesendframes(sendframes())
+
 def oninputeof(self):