liupc commented on a change in pull request #23602:
[SPARK-26674][CORE]Consolidate CompositeByteBuf when reading large frame
URL: https://github.com/apache/spark/pull/23602#discussion_r255282612
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
##########
@@ -123,31 +139,54 @@ private long decodeFrameSize() {
private ByteBuf decodeNext() {
long frameSize = decodeFrameSize();
- if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) {
+ if (frameSize == UNKNOWN_FRAME_SIZE) {
return null;
}
- // Reset size for next frame.
- nextFrameSize = UNKNOWN_FRAME_SIZE;
-
- Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame:
%s", frameSize);
- Preconditions.checkArgument(frameSize > 0, "Frame length should be
positive: %s", frameSize);
-
- // If the first buffer holds the entire frame, return it.
- int remaining = (int) frameSize;
- if (buffers.getFirst().readableBytes() >= remaining) {
- return nextBufferForFrame(remaining);
+ if (frameBuf == null) {
+ Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE,
+ "Too large frame: %s", frameSize);
+ Preconditions.checkArgument(frameSize > 0,
+ "Frame length should be positive: %s", frameSize);
+ frameRemainingBytes = (int) frameSize;
+
+ // If buffers is empty, then return immediately for more input data.
Otherwise, if the
+ // first buffer holds the entire frame, we attempt to build frame with
it and return.
+ // Other cases, create a composite buffer to manage all the buffers.
+ if (buffers.isEmpty()) {
+ return null;
+ } else if (buffers.getFirst().readableBytes() >= frameRemainingBytes) {
+ // Reset buf and size for next frame.
+ frameBuf = null;
+ nextFrameSize = UNKNOWN_FRAME_SIZE;
+ return nextBufferForFrame(frameRemainingBytes);
+ } else {
+ frameBuf =
buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
+ }
}
- // Otherwise, create a composite buffer.
- CompositeByteBuf frame =
buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
- while (remaining > 0) {
- ByteBuf next = nextBufferForFrame(remaining);
- remaining -= next.readableBytes();
- frame.addComponent(next).writerIndex(frame.writerIndex() +
next.readableBytes());
+ while (frameRemainingBytes > 0 && !buffers.isEmpty()) {
+ ByteBuf next = nextBufferForFrame(frameRemainingBytes);
+ frameRemainingBytes -= next.readableBytes();
+ frameBuf.addComponent(next).writerIndex(frameBuf.writerIndex() +
next.readableBytes());
}
- assert remaining == 0;
- return frame;
+ // If the delta size of frameBuf exceeds the threshold, then we do
consolidation
+ // to reduce memory consumption.
+ if (frameBuf.capacity() - consolidatedFrameBufSize >
consolidateFrameBufsDeltaThreshold) {
+ int newNumComponents = frameBuf.numComponents() -
consolidatedNumComponents;
+ frameBuf.consolidate(consolidatedNumComponents, newNumComponents);
Review comment:
No parameter `consolidate()` will do unnecessary consolidate for already
consolidated components (aka there are always one component after
consolidation), it's slow and memory wasting, However, `consolidate(cIndex,
numComponents)` will only consolidate specified new components.
For instance, let's say we add 10 components, and do first consolidation,
then we got one consolidated component. If we use `consolidate(cIndex,
numComponents)` here, then next time we do consolidation after another 10
components added, we do not need to consolidate the components already
consolidated(no extra memory allocation and copy).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]