mridulm commented on code in PR #37638:
URL: https://github.com/apache/spark/pull/37638#discussion_r1057878241
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1220,6 +1260,7 @@ public void onData(String streamId, ByteBuffer buf)
throws IOException {
try {
if (deferredBufs != null && !deferredBufs.isEmpty()) {
writeDeferredBufs();
+ mergeManager.pushMergeMetrics.deferredBlocks.mark();
Review Comment:
This is when the deferred block is getting written out.
Instead, we should be incrementing it when the deferred block is getting
added.
Move it to where we increment `deferredBlockBytes` below ?
This will keep the metrics in sync - else we can have a large
`deferredBlockBytes`, but very low `deferredBlockBytes`.
An example of this is in the test case
[here](https://github.com/apache/spark/pull/37638/files#diff-955b4eb8f495555d5e188ed93a4acf5393e1d9c7855dffddedb5e9201eebdef2R193)
- the deferred blocks should be `1`m but is `0`
##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -184,6 +190,10 @@ public void testDeferredBufsAreWrittenDuringOnData()
throws IOException {
new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 1, 0, 0));
// This should be deferred
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
+ verifyMetrics(2, 0, 0, 3, 0, 0);
Review Comment:
I have commented about this earlier in `RemoteBlockPushResolver` - and this
will change when it is addressed.
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -568,6 +580,12 @@ public StreamCallbackWithID
receiveBlockDataAsStream(PushBlockStream msg) {
// getting killed. When this happens, we need to distinguish the duplicate
blocks as they
// arrive. More details on this is explained in later comments.
+ // Track if the block is received after shuffle merge finalize. The block
would be considered
+ // as too late if it received after shuffle merge finalize, and hence mark
it as a late block
+ // push to the pushMergeMetrics
+ if (partitionInfoBeforeCheck == null) {
+ pushMergeMetrics.lateBlockPushes.mark();
+ }
Review Comment:
Even if `partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex)`, it
would be considered late.
Move `pushMergeMetrics.lateBlockPushes.mark();` to the `else` block below
(before `new StreamCallbackWithID`) ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]