D4473: wireprotoframing: buffer emitted data to reduce frame count

2018-09-12 Thread indygreg (Gregory Szorc)
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG84bf6ded9317: wireprotoframing: buffer emitted data to 
reduce frame count (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D4473?vs=10770=10934

REVISION DETAIL
  https://phab.mercurial-scm.org/D4473

AFFECTED FILES
  mercurial/wireprotoframing.py

CHANGE DETAILS

diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -511,6 +511,98 @@
flags=0,
payload=payload)
 
+class bufferingcommandresponseemitter(object):
+"""Helper object to emit command response frames intelligently.
+
+Raw command response data is likely emitted in chunks much smaller
+than what can fit in a single frame. This class exists to buffer
+chunks until enough data is available to fit in a single frame.
+
+TODO we'll need something like this when compression is supported.
+So it might make sense to implement this functionality at the stream
+level.
+"""
+def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
+self._stream = stream
+self._requestid = requestid
+self._maxsize = maxframesize
+self._chunks = []
+self._chunkssize = 0
+
+def send(self, data):
+"""Send new data for emission.
+
+Is a generator of new frames that were derived from the new input.
+
+If the special input ``None`` is received, flushes all buffered
+data to frames.
+"""
+
+if data is None:
+for frame in self._flush():
+yield frame
+return
+
+# There is a ton of potential to do more complicated things here.
+# Our immediate goal is to coalesce small chunks into big frames,
+# not achieve the fewest number of frames possible. So we go with
+# a simple implementation:
+#
+# * If a chunk is too large for a frame, we flush and emit frames
+#   for the new chunk.
+# * If a chunk can be buffered without total buffered size limits
+#   being exceeded, we do that.
+# * If a chunk causes us to go over our buffering limit, we flush
+#   and then buffer the new chunk.
+
+if len(data) > self._maxsize:
+for frame in self._flush():
+yield frame
+
+# Now emit frames for the big chunk.
+offset = 0
+while True:
+chunk = data[offset:offset + self._maxsize]
+offset += len(chunk)
+
+yield self._stream.makeframe(
+self._requestid,
+typeid=FRAME_TYPE_COMMAND_RESPONSE,
+flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+payload=chunk)
+
+if offset == len(data):
+return
+
+# If we don't have enough to constitute a full frame, buffer and
+# return.
+if len(data) + self._chunkssize < self._maxsize:
+self._chunks.append(data)
+self._chunkssize += len(data)
+return
+
+# Else flush what we have and buffer the new chunk. We could do
+# something more intelligent here, like break the chunk. Let's
+# keep things simple for now.
+for frame in self._flush():
+yield frame
+
+self._chunks.append(data)
+self._chunkssize = len(data)
+
+def _flush(self):
+payload = b''.join(self._chunks)
+assert len(payload) <= self._maxsize
+
+self._chunks[:] = []
+self._chunkssize = 0
+
+yield self._stream.makeframe(
+self._requestid,
+typeid=FRAME_TYPE_COMMAND_RESPONSE,
+flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+payload=payload)
+
 class stream(object):
 """Represents a logical unidirectional series of frames."""
 
@@ -716,10 +808,14 @@
 
 def sendframes():
 emitted = False
+emitter = bufferingcommandresponseemitter(stream, requestid)
 while True:
 try:
 o = next(objs)
 except StopIteration:
+for frame in emitter.send(None):
+yield frame
+
 if emitted:
 yield createcommandresponseeosframe(stream, requestid)
 break
@@ -743,11 +839,10 @@
 yield createcommandresponseokframe(stream, requestid)
 emitted = True
 
-# TODO buffer chunks so emitted frame payloads can be
-# larger.
-for frame in createbytesresponseframesfromgen(
-stream, requestid, cborutil.streamencode(o)):

D4473: wireprotoframing: buffer emitted data to reduce frame count

2018-09-04 Thread indygreg (Gregory Szorc)
indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  An upcoming commit introduces a wire protocol command that can emit
  hundreds of thousands of small objects. Without a buffering layer,
  we would emit a single, small frame for every object. Performance
  profiling revealed this to be a source of significant overhead for
  both client and server.
  
  This commit introduces a very crude buffering layer so that we emit
  fewer, bigger frames in such a scenario. This code will likely get
  rewritten in the future to be part of the streams API, as we'll
  need a similar strategy for compressing data. I don't want to think
  about it too much at the moment though.
  
  server
  before: user 32.500+0.000 sys 1.160+0.000
  after:  user 20.230+0.010 sys 0.180+0.000
  
  client
  before: user 133.400+0.000 sys 93.120+0.000
  after:  user  68.370+0.000 sys 32.950+0.000
  
  This appears to indicate we have significant overhead in the frame
  processing code on both client and server. It might be worth profiling
  that at some point...

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D4473

AFFECTED FILES
  mercurial/wireprotoframing.py

CHANGE DETAILS

diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -511,6 +511,98 @@
flags=0,
payload=payload)
 
+class bufferingcommandresponseemitter(object):
+"""Helper object to emit command response frames intelligently.
+
+Raw command response data is likely emitted in chunks much smaller
+than what can fit in a single frame. This class exists to buffer
+chunks until enough data is available to fit in a single frame.
+
+TODO we'll need something like this when compression is supported.
+So it might make sense to implement this functionality at the stream
+level.
+"""
+def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
+self._stream = stream
+self._requestid = requestid
+self._maxsize = maxframesize
+self._chunks = []
+self._chunkssize = 0
+
+def send(self, data):
+"""Send new data for emission.
+
+Is a generator of new frames that were derived from the new input.
+
+If the special input ``None`` is received, flushes all buffered
+data to frames.
+"""
+
+if data is None:
+for frame in self._flush():
+yield frame
+return
+
+# There is a ton of potential to do more complicated things here.
+# Our immediate goal is to coalesce small chunks into big frames,
+# not achieve the fewest number of frames possible. So we go with
+# a simple implementation:
+#
+# * If a chunk is too large for a frame, we flush and emit frames
+#   for the new chunk.
+# * If a chunk can be buffered without total buffered size limits
+#   being exceeded, we do that.
+# * If a chunk causes us to go over our buffering limit, we flush
+#   and then buffer the new chunk.
+
+if len(data) > self._maxsize:
+for frame in self._flush():
+yield frame
+
+# Now emit frames for the big chunk.
+offset = 0
+while True:
+chunk = data[offset:offset + self._maxsize]
+offset += len(chunk)
+
+yield self._stream.makeframe(
+self._requestid,
+typeid=FRAME_TYPE_COMMAND_RESPONSE,
+flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+payload=chunk)
+
+if offset == len(data):
+return
+
+# If we don't have enough to constitute a full frame, buffer and
+# return.
+if len(data) + self._chunkssize < self._maxsize:
+self._chunks.append(data)
+self._chunkssize += len(data)
+return
+
+# Else flush what we have and buffer the new chunk. We could do
+# something more intelligent here, like break the chunk. Let's
+# keep things simple for now.
+for frame in self._flush():
+yield frame
+
+self._chunks.append(data)
+self._chunkssize = len(data)
+
+def _flush(self):
+payload = b''.join(self._chunks)
+assert len(payload) <= self._maxsize
+
+self._chunks[:] = []
+self._chunkssize = 0
+
+yield self._stream.makeframe(
+self._requestid,
+typeid=FRAME_TYPE_COMMAND_RESPONSE,
+flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+payload=payload)
+
 class stream(object):
 """Represents a logical unidirectional series of frames."""
 
@@ -716,10 +808,14 @@
 
 def sendframes():