vanzin commented on a change in pull request #23602: 
[SPARK-26674][CORE]Consolidate CompositeByteBuf when reading large frame
URL: https://github.com/apache/spark/pull/23602#discussion_r255276524
 
 

 ##########
 File path: 
common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
 ##########
 @@ -47,6 +47,76 @@ public void testFrameDecoding() throws Exception {
     verifyAndCloseDecoder(decoder, ctx, data);
   }
 
+  @Test
+  public void testConsolidationForDecodingNonFullyWrittenByteBuf() {
+    TransportFrameDecoder decoder = new TransportFrameDecoder();
+    ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+    List<ByteBuf> retained = new ArrayList<>();
+    when(ctx.fireChannelRead(any())).thenAnswer(in -> {
+      ByteBuf buf = (ByteBuf) in.getArguments()[0];
+      retained.add(buf);
+      return null;
+    });
+    ByteBuf data1 = Unpooled.buffer(1024 * 1024);
+    data1.writeLong(1024 * 1024 + 8);
+    data1.writeByte(127);
+    ByteBuf data2 = Unpooled.buffer(1024 * 1024);
+    for (int i = 0; i < 1024 * 1024 - 1; i++) {
+      data2.writeByte(128);
+    }
+    int orignalCapacity = data1.capacity() + data2.capacity();
+    try {
+      decoder.channelRead(ctx, data1);
+      decoder.channelRead(ctx, data2);
+      assertEquals(1, retained.size());
+      assert(retained.get(0).capacity() < orignalCapacity);
+    } catch (Exception e) {
+      release(data1);
+      release(data2);
+    }
+  }
+
+  @Test
+  public void testConsolidationPerf() throws Exception {
+    long[] testingConsolidateThresholds = new long[] {
+        1024 * 1024, 5 * 1024 * 1024, 10 * 1024 * 1024, 20 * 1024 * 1024,
+        30 * 1024 * 1024, 50 * 1024 * 1024, 80 * 1024 * 1024, 100 * 1024 * 
1024,
+        300 * 1024 * 1024, 500 * 1024 * 1024, Long.MAX_VALUE };
+    for (long threshold : testingConsolidateThresholds) {
+      TransportFrameDecoder decoder = new TransportFrameDecoder(threshold);
+      ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+      List<ByteBuf> retained = new ArrayList<>();
+      when(ctx.fireChannelRead(any())).thenAnswer(in -> {
+        ByteBuf buf = (ByteBuf) in.getArguments()[0];
+        retained.add(buf);
+        return null;
+      });
+
+      try {
+        long start = System.currentTimeMillis();
+        ByteBuf buf = Unpooled.buffer(8);
+        buf.writeLong(8 + 1024 * 1024 * 1000);
+        decoder.channelRead(ctx, buf);
+        for (int i = 0; i < 1000; i++) {
+          buf = Unpooled.buffer(1024 * 1024 * 2);
+          ByteBuf writtenBuf = Unpooled.buffer(1024 * 1024).writerIndex(1024 * 
1024);
+          buf.writeBytes(writtenBuf);
+          writtenBuf.release();
+          decoder.channelRead(ctx, buf);
+        }
+        assertEquals(1, retained.size());
+        assertEquals(1024 * 1024 * 1000, retained.get(0).capacity());
+        long costTime = System.currentTimeMillis() - start;
+        System.out.println("Build frame buf with consolidation threshold " + 
threshold
 
 Review comment:
   A shorter message would read better: "Writing blah with consolidation of 
blah took blah."
   
   I'm a little torn on keeping this, though, since very few people will see 
it, and for those who do, it will be more annoying than useful. Perhaps writing 
to the log instead of stdout.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to