Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5234#discussion_r27449044
  
    --- Diff: 
network/common/src/main/java/org/apache/spark/network/protocol/MessageEncoder.java
 ---
    @@ -72,9 +80,84 @@ public void encode(ChannelHandlerContext ctx, Message 
in, List<Object> out) {
         in.encode(header);
         assert header.writableBytes() == 0;
     
    -    out.add(header);
         if (body != null && bodyLength > 0) {
    -      out.add(body);
    +      out.add(new MessageWithHeader(header, headerLength, body, 
bodyLength));
    +    } else {
    +      out.add(header);
         }
       }
    +
    +  /**
    +   * A wrapper message that holds two separate pieces (a header and a 
body) to avoid
    +   * copying the body's content.
    +   */
    +  private static class MessageWithHeader extends AbstractReferenceCounted 
implements FileRegion {
    +
    +    private final ByteBuf header;
    +    private final int headerLength;
    +    private final Object body;
    +    private final long bodyLength;
    +    private int bytesTransferred;
    +
    +    MessageWithHeader(ByteBuf header, int headerLength, Object body, long 
bodyLength) {
    +      this.header = header;
    +      this.headerLength = headerLength;
    +      this.body = body;
    +      this.bodyLength = bodyLength;
    +    }
    +
    +    @Override
    +    public long count() {
    +      return headerLength + bodyLength;
    +    }
    +
    +    @Override
    +    public long position() {
    +      return 0;
    +    }
    +
    +    @Override
    +    public long transfered() {
    +      long total = bytesTransferred;
    +      if (body instanceof FileRegion) {
    +        total += ((FileRegion)body).transfered();
    +      }
    +      return total;
    +    }
    +
    +    @Override
    +    public long transferTo(WritableByteChannel target, long position) 
throws IOException {
    +      Preconditions.checkArgument(position >= 0 && position < count(), 
"Invalid position.");
    +
    +      if (position < headerLength && position >= 0) {
    +        header.skipBytes(Ints.checkedCast(position));
    +        int remaining = header.readableBytes();
    +        target.write(header.nioBuffer());
    +        bytesTransferred += remaining;
    +      }
    +
    +      long bodyPos = position > headerLength ? position - headerLength : 0;
    --- End diff --
    
    Similarly, since the header may not be fully written, I believe that we 
should return early if `bytesTransferred < headerLength` and can compute 
bodyPos as simply `position - headerLength` unconditionally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to