otterc commented on code in PR #39725:
URL: https://github.com/apache/spark/pull/39725#discussion_r1087470017


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1356,6 +1362,15 @@ private boolean isTooLate(
         
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
     }
 
+    /**
+     * Update ignoredBlockBytes in pushMergeMetrics.
+     */
+    private void updateIgnoredBytes(long numBytes) {
+      if (numBytes > 0) {
+        mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(numBytes);
+      }
+    }
+

Review Comment:
   ```suggestion
       private long totalBytesReceived = 0;
   
       /**
        * Update ignoredBlockBytes in pushMergeMetrics.
        */
       private void receivedBytesIgnored() {
         if (totalBytesReceived > 0) {
           
mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(totalBytesReceived);
           totalBytesReceived = 0;
         }
       }
   
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1298,24 +1298,30 @@ private void writeDeferredBufs() throws IOException {
       deferredBufs = null;
     }
 
-    private void freeDeferredBufs() {
-      if (deferredBufs != null && !deferredBufs.isEmpty()) {
-        long totalSize = 0;
-        for (ByteBuffer deferredBuf : deferredBufs) {
-          totalSize += deferredBuf.limit();
-          mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
-        }
-        mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
+    /**
+     * @return total number of deferred bytes
+     */
+    private long freeDeferredBufs() {
+      if (deferredBufs == null || deferredBufs.isEmpty()) {
+        deferredBufs = null;
+        return 0;
+      }
+      long totalSize = 0;
+      for (ByteBuffer deferredBuf : deferredBufs) {
+        totalSize += deferredBuf.limit();
+        mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
       }
+      mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
       deferredBufs = null;
+      return totalSize;

Review Comment:
   revert this change



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1356,6 +1362,15 @@ private boolean isTooLate(
         
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
     }
 
+    /**
+     * Update ignoredBlockBytes in pushMergeMetrics.
+     */
+    private void updateIgnoredBytes(long numBytes) {
+      if (numBytes > 0) {
+        mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(numBytes);
+      }
+    }
+
     @Override
     public void onData(String streamId, ByteBuffer buf) throws IOException {

Review Comment:
   ```suggestion
       public void onData(String streamId, ByteBuffer buf) throws IOException {
            totalBytesReceived += buf.remaining();
   
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1298,24 +1298,30 @@ private void writeDeferredBufs() throws IOException {
       deferredBufs = null;
     }
 
-    private void freeDeferredBufs() {
-      if (deferredBufs != null && !deferredBufs.isEmpty()) {
-        long totalSize = 0;
-        for (ByteBuffer deferredBuf : deferredBufs) {
-          totalSize += deferredBuf.limit();
-          mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
-        }
-        mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
+    /**
+     * @return total number of deferred bytes
+     */
+    private long freeDeferredBufs() {
+      if (deferredBufs == null || deferredBufs.isEmpty()) {
+        deferredBufs = null;
+        return 0;
+      }
+      long totalSize = 0;
+      for (ByteBuffer deferredBuf : deferredBufs) {
+        totalSize += deferredBuf.limit();
+        mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
       }
+      mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
       deferredBufs = null;
+      return totalSize;
     }
 
     /**
      * @throws IllegalStateException if the number of IOExceptions have 
exceeded threshold.
      */
     private void abortIfNecessary() {
       if 
(partitionInfo.shouldAbort(mergeManager.ioExceptionsThresholdDuringMerge)) {
-        freeDeferredBufs();
+        updateIgnoredBytes(freeDeferredBufs());

Review Comment:
   Revert
   ```suggestion
           freeDeferredBufs();
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1410,6 +1425,10 @@ public void onData(String streamId, ByteBuffer buf) 
throws IOException {
             }
             writeBuf(buf);
           } catch (IOException ioe) {
+            // When server failed to write, capture the number of failed bytes 
to ignoreBlckBytes
+            // of pushMergeMetrics. The number of deferred bytes are already 
captured in
+            // incrementIOExceptionsAndAbortIfNecessary().
+            
mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(buf.remaining());

Review Comment:
   Revert this change
   ```suggestion
   
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1461,14 +1480,14 @@ public void onComplete(String streamId) throws 
IOException {
         AppShuffleMergePartitionsInfo info =
             
appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isTooLate(info, partitionInfo.reduceId)) {
-          freeDeferredBufs();
+          updateIgnoredBytes(freeDeferredBufs());

Review Comment:
   Revert
   ```suggestion
             freeDeferredBufs();
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1520,7 +1539,7 @@ public void onComplete(String streamId) throws 
IOException {
             partitionInfo.resetChunkTracker();
           }
         } else {
-          freeDeferredBufs();
+          updateIgnoredBytes(freeDeferredBufs());

Review Comment:
   Revert 
   ```suggestion
             freeDeferredBufs();
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1554,6 +1573,7 @@ public void onFailure(String streamId, Throwable 
throwable) throws IOException {
         }
       }
       isWriting = false;
+      updateIgnoredBytes(freeDeferredBufs());

Review Comment:
   ```suggestion
         receivedBytesIgnored();
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1392,7 +1407,7 @@ public void onData(String streamId, ByteBuffer buf) 
throws IOException {
           // Identify duplicate block generated by speculative tasks. We 
respond success to
           // the client in cases of duplicate even though no data is written.
           if (isDuplicateBlock()) {
-            freeDeferredBufs();
+            updateIgnoredBytes(freeDeferredBufs() + buf.remaining());

Review Comment:
   Revert
   ```suggestion
               freeDeferredBufs();
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1379,7 +1394,7 @@ public void onData(String streamId, ByteBuffer buf) 
throws IOException {
             isStale(info, 
partitionInfo.appAttemptShuffleMergeId.shuffleMergeId);
         boolean isTooLateBlockPush = isTooLate(info, partitionInfo.reduceId);
         if (isStaleBlockPush || isTooLateBlockPush) {
-          freeDeferredBufs();
+          updateIgnoredBytes(freeDeferredBufs() + buf.remaining());

Review Comment:
   Revert
   ```suggestion
             freeDeferredBufs();
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1461,14 +1480,14 @@ public void onComplete(String streamId) throws 
IOException {
         AppShuffleMergePartitionsInfo info =
             
appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isTooLate(info, partitionInfo.reduceId)) {
-          freeDeferredBufs();
+          updateIgnoredBytes(freeDeferredBufs());
           mergeManager.pushMergeMetrics.lateBlockPushes.mark();
           throw new BlockPushNonFatalFailure(
             new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), 
streamId).toByteBuffer(),
             BlockPushNonFatalFailure.getErrorMsg(streamId, 
ReturnCode.TOO_LATE_BLOCK_PUSH));
         }
         if (isStale(info, 
partitionInfo.appAttemptShuffleMergeId.shuffleMergeId)) {
-          freeDeferredBufs();
+          updateIgnoredBytes(freeDeferredBufs());

Review Comment:
   Revert 
   ```suggestion
             freeDeferredBufs();
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1480,7 +1499,7 @@ public void onComplete(String streamId) throws 
IOException {
           // Identify duplicate block generated by speculative tasks. We 
respond success to
           // the client in cases of duplicate even though no data is written.
           if (isDuplicateBlock()) {
-            freeDeferredBufs();
+            updateIgnoredBytes(freeDeferredBufs());

Review Comment:
   ```suggestion
               freeDeferredBufs();
               // Since we don't throw an exception and just return, but the 
received bytes are ignored we need to update the ignored bytes metric.
                receivedBytesIgnored();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to