# HG changeset patch
# User Gregory Szorc <gregory.sz...@gmail.com>
# Date 1476641421 25200
#      Sun Oct 16 11:10:21 2016 -0700
# Node ID 23457f8910d43bd637b945ced2f963ab71255844
# Parent  3f18f7464e651128a5f8d9c9312805adbc22f547
wireproto: compress data from a generator

Currently, the "getbundle" wire protocol command obtains a generator of
data, converts it to a util.chunkbuffer, then converts it back to a
generator via the protocol's groupchunks() implementation. For the SSH
protocol, groupchunks() simply reads 4kb chunks then write()s the
data to a file descriptor. For the HTTP protocol, groupchunks() reads
32kb chunks, feeds those into a zlib compressor, emits compressed data
as it is available, and that is sent to the WSGI layer, where it is
likely turned into HTTP chunked transfer chunks as is or further
buffered and turned into a larger chunk.

For both the SSH and HTTP protocols, there is inefficiency from using

For SSH, emitting consistent 4kb chunks sounds nice. However, the file
descriptor it is writing to is almost certainly buffered. That means
that a Python .write() probably doesn't translate into exactly what is
written to the I/O layer.

For HTTP, we're going through an intermediate layer to zlib compress
data. So all util.chunkbuffer is doing is ensuring that the chunks we
feed into the zlib compressor are of uniform size. This means more CPU
time in Python buffering and emitting chunks in util.chunkbuffer but
fewer function calls to zlib.

This patch introduces and implements a new wire protocol abstract
method: compresschunks(). It is like groupchunks() except it operates
on a generator instead of something with a .read(). The SSH
implementation simply proxies chunks. The HTTP implementation uses
zlib compression.

To avoid duplicate code, the HTTP groupchunks() has been reimplemented
in terms of compresschunks().

To prove this all works, the "getbundle" wire protocol command has been
switched to compresschunks(). This removes the util.chunkbuffer from
that command. Now, data essentially streams straight from the
changegroup emitter to the wire, possibly through a zlib compressor.
Generators all the way, baby.

There were slim to no performance changes on the server as measured
with the mozilla-central repository. This is likely because CPU
time is dominated by reading revlogs, producing the changegroup, and
zlib compressing the output stream. Still, this brings us a little
closer to our ideal of using generators everywhere.

diff --git a/mercurial/hgweb/protocol.py b/mercurial/hgweb/protocol.py
--- a/mercurial/hgweb/protocol.py
+++ b/mercurial/hgweb/protocol.py
@@ -68,31 +68,40 @@ class webproto(wireproto.abstractserverp
     def redirect(self):
         self.oldio = self.ui.fout, self.ui.ferr
         self.ui.ferr = self.ui.fout = stringio()
     def restore(self):
         val = self.ui.fout.getvalue()
         self.ui.ferr, self.ui.fout = self.oldio
         return val
     def groupchunks(self, fh):
+        def getchunks():
+            while True:
+                chunk = fh.read(32768)
+                if not chunk:
+                    break
+                yield chunk
+        return self.compresschunks(getchunks())
+    def compresschunks(self, chunks):
         # Don't allow untrusted settings because disabling compression or
         # setting a very high compression level could lead to flooding
         # the server's network or CPU.
         z = zlib.compressobj(self.ui.configint('server', 'zliblevel', -1))
-        while True:
-            chunk = fh.read(32768)
-            if not chunk:
-                break
+        for chunk in chunks:
             data = z.compress(chunk)
             # Not all calls to compress() emit data. It is cheaper to inspect
             # that here than to send it via the generator.
             if data:
                 yield data
         yield z.flush()
     def _client(self):
         return 'remote:%s:%s:%s' % (
             self.req.env.get('wsgi.url_scheme') or 'http',
             urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
             urlreq.quote(self.req.env.get('REMOTE_USER', '')))
 def iscmd(cmd):
     return cmd in wireproto.commands
diff --git a/mercurial/sshserver.py b/mercurial/sshserver.py
--- a/mercurial/sshserver.py
+++ b/mercurial/sshserver.py
@@ -66,16 +66,20 @@ class sshserver(wireproto.abstractserver
             count = int(self.fin.readline())
     def redirect(self):
     def groupchunks(self, fh):
         return iter(lambda: fh.read(4096), '')
+    def compresschunks(self, chunks):
+        for chunk in chunks:
+            yield chunk
     def sendresponse(self, v):
         self.fout.write("%d\n" % len(v))
     def sendstream(self, source):
         write = self.fout.write
         for chunk in source.gen:
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -80,16 +80,24 @@ class abstractserverproto(object):
     def groupchunks(self, fh):
         """Generator of chunks to send to the client.
         Some protocols may have compressed the contents.
         raise NotImplementedError()
+    def compresschunks(self, chunks):
+        """Generator of possible compressed chunks to send to the client.
+        This is like ``groupchunks()`` except it accepts a generator as
+        its argument.
+        """
+        raise NotImplementedError()
 class remotebatch(peer.batcher):
     '''batches the queued calls; uses as few roundtrips as possible'''
     def __init__(self, remote):
         '''remote must support _submitbatch(encbatch) and
         _submitone(op, encargs)'''
         self.remote = remote
     def submit(self):
@@ -768,19 +776,17 @@ def getbundle(repo, proto, others):
             raise KeyError('unknown getbundle option type %s'
                            % keytype)
     if not bundle1allowed(repo, 'pull'):
         if not exchange.bundle2requested(opts.get('bundlecaps')):
             return ooberror(bundle2required)
     chunks = exchange.getbundlechunks(repo, 'serve', **opts)
-    # TODO avoid util.chunkbuffer() here since it is adding overhead to
-    # what is fundamentally a generator proxying operation.
-    return streamres(proto.groupchunks(util.chunkbuffer(chunks)))
+    return streamres(proto.compresschunks(chunks))
 def heads(repo, proto):
     h = repo.heads()
     return encodelist(h) + "\n"
 def hello(repo, proto):
Mercurial-devel mailing list

Reply via email to