vanzin commented on a change in pull request #23602: 
[SPARK-26674][CORE]Consolidate CompositeByteBuf when reading large frame
URL: https://github.com/apache/spark/pull/23602#discussion_r258223162
 
 

 ##########
 File path: 
common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
 ##########
 @@ -47,6 +51,67 @@ public void testFrameDecoding() throws Exception {
     verifyAndCloseDecoder(decoder, ctx, data);
   }
 
+  @Test
+  public void testConsolidationPerf() throws Exception {
+    long[] testingConsolidateThresholds = new long[] {
+        ByteUnit.MiB.toBytes(1),
+        ByteUnit.MiB.toBytes(5),
+        ByteUnit.MiB.toBytes(10),
+        ByteUnit.MiB.toBytes(20),
+        ByteUnit.MiB.toBytes(30),
+        ByteUnit.MiB.toBytes(50),
+        ByteUnit.MiB.toBytes(80),
+        ByteUnit.MiB.toBytes(100),
+        ByteUnit.MiB.toBytes(300),
+        ByteUnit.MiB.toBytes(500),
+        Long.MAX_VALUE };
+    for (long threshold : testingConsolidateThresholds) {
+      TransportFrameDecoder decoder = new TransportFrameDecoder(threshold);
+      ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+      List<ByteBuf> retained = new ArrayList<>();
+      when(ctx.fireChannelRead(any())).thenAnswer(in -> {
+        ByteBuf buf = (ByteBuf) in.getArguments()[0];
+        retained.add(buf);
+        return null;
+      });
+
+      // Testing multiple messages
+      int numMessages = 3;
+      long targetBytes = ByteUnit.GiB.toBytes(1);
+      int pieceBytes = (int) ByteUnit.KiB.toBytes(32);
+      for (int i = 0; i < numMessages; i++) {
+        try {
+          long start = System.currentTimeMillis();
+          long writtenBytes = 0;
+          ByteBuf buf = Unpooled.buffer(8);
+          buf.writeLong(8 + ByteUnit.GiB.toBytes(1));
+          decoder.channelRead(ctx, buf);
+          while (writtenBytes < targetBytes) {
+            buf = Unpooled.buffer(pieceBytes * 2);
+            ByteBuf writtenBuf = 
Unpooled.buffer(pieceBytes).writerIndex(pieceBytes);
+            buf.writeBytes(writtenBuf);
+            writtenBuf.release();
+            decoder.channelRead(ctx, buf);
+            writtenBytes += pieceBytes;
+          }
+          long elapsedTime = System.currentTimeMillis() - start;
+          logger.info("Writing 1GiB frame buf with consolidation of threshold 
" + threshold
+              + " took " + elapsedTime + " milis");
+        } finally {
+          for (ByteBuf buf : retained) {
+            release(buf);
+          }
+        }
+      }
+      long totalBytesGot = 0;
+      for (ByteBuf buf : retained) {
+        totalBytesGot += buf.capacity();
+      }
+      assertEquals(numMessages, retained.size());
+      assertEquals(targetBytes * numMessages, totalBytesGot);
 
 Review comment:
   Does this mean this test now requires 3GB of memory just to store the data 
it's checking?
   
   That seems wasteful. Either change the test to do checks after each separate 
message is written, or lower the size of the messages.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to