mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1057878241


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1220,6 +1260,7 @@ public void onData(String streamId, ByteBuffer buf) 
throws IOException {
           try {
             if (deferredBufs != null && !deferredBufs.isEmpty()) {
               writeDeferredBufs();
+              mergeManager.pushMergeMetrics.deferredBlocks.mark();

Review Comment:
   This is when the deferred block is getting written out.
   Instead, we should be incrementing it when the deferred block is getting 
added.
   Move it to where we increment `deferredBlockBytes` below ?
   This will keep the metrics in sync - else we can have a large 
`deferredBlockBytes`, but very low `deferredBlockBytes`.
   
   An example of this is in the test case 
[here](https://github.com/apache/spark/pull/37638/files#diff-955b4eb8f495555d5e188ed93a4acf5393e1d9c7855dffddedb5e9201eebdef2R193)
 - the deferred blocks should be `1`m but is `0`



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -184,6 +190,10 @@ public void testDeferredBufsAreWrittenDuringOnData() 
throws IOException {
         new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 1, 0, 0));
     // This should be deferred
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
+    verifyMetrics(2, 0, 0, 3, 0, 0);

Review Comment:
   I have commented about this earlier in `RemoteBlockPushResolver` - and this 
will change when it is addressed.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -568,6 +580,12 @@ public StreamCallbackWithID 
receiveBlockDataAsStream(PushBlockStream msg) {
     // getting killed. When this happens, we need to distinguish the duplicate 
blocks as they
     // arrive. More details on this is explained in later comments.
 
+    // Track if the block is received after shuffle merge finalize. The block 
would be considered
+    // as too late if it received after shuffle merge finalize, and hence mark 
it as a late block
+    // push to the pushMergeMetrics
+    if (partitionInfoBeforeCheck == null) {
+      pushMergeMetrics.lateBlockPushes.mark();
+    }

Review Comment:
   Even if `partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex)`, it 
would be considered late.
   Move `pushMergeMetrics.lateBlockPushes.mark();` to the `else` block below 
(before `new StreamCallbackWithID`) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to