D4473: wireprotoframing: buffer emitted data to reduce frame count
This revision was automatically updated to reflect the committed changes. Closed by commit rHG84bf6ded9317: wireprotoframing: buffer emitted data to reduce frame count (authored by indygreg, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D4473?vs=10770=10934 REVISION DETAIL https://phab.mercurial-scm.org/D4473 AFFECTED FILES mercurial/wireprotoframing.py CHANGE DETAILS diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -511,6 +511,98 @@ flags=0, payload=payload) +class bufferingcommandresponseemitter(object): +"""Helper object to emit command response frames intelligently. + +Raw command response data is likely emitted in chunks much smaller +than what can fit in a single frame. This class exists to buffer +chunks until enough data is available to fit in a single frame. + +TODO we'll need something like this when compression is supported. +So it might make sense to implement this functionality at the stream +level. +""" +def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE): +self._stream = stream +self._requestid = requestid +self._maxsize = maxframesize +self._chunks = [] +self._chunkssize = 0 + +def send(self, data): +"""Send new data for emission. + +Is a generator of new frames that were derived from the new input. + +If the special input ``None`` is received, flushes all buffered +data to frames. +""" + +if data is None: +for frame in self._flush(): +yield frame +return + +# There is a ton of potential to do more complicated things here. +# Our immediate goal is to coalesce small chunks into big frames, +# not achieve the fewest number of frames possible. So we go with +# a simple implementation: +# +# * If a chunk is too large for a frame, we flush and emit frames +# for the new chunk. +# * If a chunk can be buffered without total buffered size limits +# being exceeded, we do that. +# * If a chunk causes us to go over our buffering limit, we flush +# and then buffer the new chunk. + +if len(data) > self._maxsize: +for frame in self._flush(): +yield frame + +# Now emit frames for the big chunk. +offset = 0 +while True: +chunk = data[offset:offset + self._maxsize] +offset += len(chunk) + +yield self._stream.makeframe( +self._requestid, +typeid=FRAME_TYPE_COMMAND_RESPONSE, +flags=FLAG_COMMAND_RESPONSE_CONTINUATION, +payload=chunk) + +if offset == len(data): +return + +# If we don't have enough to constitute a full frame, buffer and +# return. +if len(data) + self._chunkssize < self._maxsize: +self._chunks.append(data) +self._chunkssize += len(data) +return + +# Else flush what we have and buffer the new chunk. We could do +# something more intelligent here, like break the chunk. Let's +# keep things simple for now. +for frame in self._flush(): +yield frame + +self._chunks.append(data) +self._chunkssize = len(data) + +def _flush(self): +payload = b''.join(self._chunks) +assert len(payload) <= self._maxsize + +self._chunks[:] = [] +self._chunkssize = 0 + +yield self._stream.makeframe( +self._requestid, +typeid=FRAME_TYPE_COMMAND_RESPONSE, +flags=FLAG_COMMAND_RESPONSE_CONTINUATION, +payload=payload) + class stream(object): """Represents a logical unidirectional series of frames.""" @@ -716,10 +808,14 @@ def sendframes(): emitted = False +emitter = bufferingcommandresponseemitter(stream, requestid) while True: try: o = next(objs) except StopIteration: +for frame in emitter.send(None): +yield frame + if emitted: yield createcommandresponseeosframe(stream, requestid) break @@ -743,11 +839,10 @@ yield createcommandresponseokframe(stream, requestid) emitted = True -# TODO buffer chunks so emitted frame payloads can be -# larger. -for frame in createbytesresponseframesfromgen( -stream, requestid, cborutil.streamencode(o)):
D4473: wireprotoframing: buffer emitted data to reduce frame count
indygreg created this revision. Herald added a subscriber: mercurial-devel. Herald added a reviewer: hg-reviewers. REVISION SUMMARY An upcoming commit introduces a wire protocol command that can emit hundreds of thousands of small objects. Without a buffering layer, we would emit a single, small frame for every object. Performance profiling revealed this to be a source of significant overhead for both client and server. This commit introduces a very crude buffering layer so that we emit fewer, bigger frames in such a scenario. This code will likely get rewritten in the future to be part of the streams API, as we'll need a similar strategy for compressing data. I don't want to think about it too much at the moment though. server before: user 32.500+0.000 sys 1.160+0.000 after: user 20.230+0.010 sys 0.180+0.000 client before: user 133.400+0.000 sys 93.120+0.000 after: user 68.370+0.000 sys 32.950+0.000 This appears to indicate we have significant overhead in the frame processing code on both client and server. It might be worth profiling that at some point... REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D4473 AFFECTED FILES mercurial/wireprotoframing.py CHANGE DETAILS diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -511,6 +511,98 @@ flags=0, payload=payload) +class bufferingcommandresponseemitter(object): +"""Helper object to emit command response frames intelligently. + +Raw command response data is likely emitted in chunks much smaller +than what can fit in a single frame. This class exists to buffer +chunks until enough data is available to fit in a single frame. + +TODO we'll need something like this when compression is supported. +So it might make sense to implement this functionality at the stream +level. +""" +def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE): +self._stream = stream +self._requestid = requestid +self._maxsize = maxframesize +self._chunks = [] +self._chunkssize = 0 + +def send(self, data): +"""Send new data for emission. + +Is a generator of new frames that were derived from the new input. + +If the special input ``None`` is received, flushes all buffered +data to frames. +""" + +if data is None: +for frame in self._flush(): +yield frame +return + +# There is a ton of potential to do more complicated things here. +# Our immediate goal is to coalesce small chunks into big frames, +# not achieve the fewest number of frames possible. So we go with +# a simple implementation: +# +# * If a chunk is too large for a frame, we flush and emit frames +# for the new chunk. +# * If a chunk can be buffered without total buffered size limits +# being exceeded, we do that. +# * If a chunk causes us to go over our buffering limit, we flush +# and then buffer the new chunk. + +if len(data) > self._maxsize: +for frame in self._flush(): +yield frame + +# Now emit frames for the big chunk. +offset = 0 +while True: +chunk = data[offset:offset + self._maxsize] +offset += len(chunk) + +yield self._stream.makeframe( +self._requestid, +typeid=FRAME_TYPE_COMMAND_RESPONSE, +flags=FLAG_COMMAND_RESPONSE_CONTINUATION, +payload=chunk) + +if offset == len(data): +return + +# If we don't have enough to constitute a full frame, buffer and +# return. +if len(data) + self._chunkssize < self._maxsize: +self._chunks.append(data) +self._chunkssize += len(data) +return + +# Else flush what we have and buffer the new chunk. We could do +# something more intelligent here, like break the chunk. Let's +# keep things simple for now. +for frame in self._flush(): +yield frame + +self._chunks.append(data) +self._chunkssize = len(data) + +def _flush(self): +payload = b''.join(self._chunks) +assert len(payload) <= self._maxsize + +self._chunks[:] = [] +self._chunkssize = 0 + +yield self._stream.makeframe( +self._requestid, +typeid=FRAME_TYPE_COMMAND_RESPONSE, +flags=FLAG_COMMAND_RESPONSE_CONTINUATION, +payload=payload) + class stream(object): """Represents a logical unidirectional series of frames.""" @@ -716,10 +808,14 @@ def sendframes():