Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5234#discussion_r27449119
  
    --- Diff: 
network/common/src/main/java/org/apache/spark/network/protocol/MessageEncoder.java
 ---
    @@ -72,9 +80,84 @@ public void encode(ChannelHandlerContext ctx, Message 
in, List<Object> out) {
         in.encode(header);
         assert header.writableBytes() == 0;
     
    -    out.add(header);
         if (body != null && bodyLength > 0) {
    -      out.add(body);
    +      out.add(new MessageWithHeader(header, headerLength, body, 
bodyLength));
    +    } else {
    +      out.add(header);
         }
       }
    +
    +  /**
    +   * A wrapper message that holds two separate pieces (a header and a 
body) to avoid
    +   * copying the body's content.
    +   */
    +  private static class MessageWithHeader extends AbstractReferenceCounted 
implements FileRegion {
    +
    +    private final ByteBuf header;
    +    private final int headerLength;
    +    private final Object body;
    +    private final long bodyLength;
    +    private int bytesTransferred;
    +
    +    MessageWithHeader(ByteBuf header, int headerLength, Object body, long 
bodyLength) {
    +      this.header = header;
    +      this.headerLength = headerLength;
    +      this.body = body;
    +      this.bodyLength = bodyLength;
    +    }
    +
    +    @Override
    +    public long count() {
    +      return headerLength + bodyLength;
    +    }
    +
    +    @Override
    +    public long position() {
    +      return 0;
    +    }
    +
    +    @Override
    +    public long transfered() {
    +      long total = bytesTransferred;
    +      if (body instanceof FileRegion) {
    +        total += ((FileRegion)body).transfered();
    +      }
    +      return total;
    +    }
    +
    +    @Override
    +    public long transferTo(WritableByteChannel target, long position) 
throws IOException {
    +      Preconditions.checkArgument(position >= 0 && position < count(), 
"Invalid position.");
    +
    +      if (position < headerLength && position >= 0) {
    +        header.skipBytes(Ints.checkedCast(position));
    +        int remaining = header.readableBytes();
    +        target.write(header.nioBuffer());
    +        bytesTransferred += remaining;
    +      }
    +
    +      long bodyPos = position > headerLength ? position - headerLength : 0;
    +      if (body instanceof FileRegion) {
    +        ((FileRegion)body).transferTo(target, bodyPos);
    --- End diff --
    
    For this transferTo and the target.write() below, we will similarly have to 
store the actual bytes transferred; additionally, the return value should be 
the bytes transferred in this transaction rather than total bytes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to