otterc commented on code in PR #39725:
URL: https://github.com/apache/spark/pull/39725#discussion_r1086246214
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1356,6 +1357,18 @@ private boolean isTooLate(
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
}
+ /**
+ * For the block bytes in the deferred buffers that are ignored, capture
them
+ * and update pushMergeMetrics's ignoredBlockBytes.
+ */
+ private void updateIgnoredBytesWithDeferredBufs() {
+ if (deferredBufs != null && !deferredBufs.isEmpty()) {
+ for (ByteBuffer buf : deferredBufs) {
+
mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(buf.remaining());
+ }
+ }
+ }
+
Review Comment:
```suggestion
private void updateIgnoredBytes(long numBytes) {
if (numBytes > 0) {
mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(numBytes);
}
}
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1315,6 +1315,7 @@ private void freeDeferredBufs() {
*/
private void abortIfNecessary() {
if
(partitionInfo.shouldAbort(mergeManager.ioExceptionsThresholdDuringMerge)) {
+ updateIgnoredBytesWithDeferredBufs();
freeDeferredBufs();
Review Comment:
```suggestion
updateIgnoredBytes(freeDeferredBufs());
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1461,6 +1476,7 @@ public void onComplete(String streamId) throws
IOException {
AppShuffleMergePartitionsInfo info =
appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
if (isTooLate(info, partitionInfo.reduceId)) {
+ updateIgnoredBytesWithDeferredBufs();
freeDeferredBufs();
Review Comment:
```suggestion
updateIgnoredBytes(freeDeferredBufs());
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1393,6 +1407,7 @@ public void onData(String streamId, ByteBuffer buf)
throws IOException {
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
freeDeferredBufs();
+ mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(buf.limit());
Review Comment:
```suggestion
updateIgnoredBytes(freeDeferredBufs() + buf.remaining());
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1520,6 +1537,7 @@ public void onComplete(String streamId) throws
IOException {
partitionInfo.resetChunkTracker();
}
} else {
+ updateIgnoredBytesWithDeferredBufs();
freeDeferredBufs();
Review Comment:
```suggestion
updateIgnoredBytes(freeDeferredBufs());
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1480,6 +1496,7 @@ public void onComplete(String streamId) throws
IOException {
// Identify duplicate block generated by speculative tasks. We
respond success to
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
+ updateIgnoredBytesWithDeferredBufs();
freeDeferredBufs();
Review Comment:
```suggestion
updateIgnoredBytes(freeDeferredBufs());
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1382,6 +1395,7 @@ public void onData(String streamId, ByteBuffer buf)
throws IOException {
freeDeferredBufs();
if (isTooLateBlockPush) {
mergeManager.pushMergeMetrics.lateBlockPushes.mark();
+ mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(buf.limit());
Review Comment:
```suggestion
long deferredBytes = freeDeferredBufs();
if (isTooLateBlockPush) {
mergeManager.pushMergeMetrics.lateBlockPushes.mark();
updateIgnoredBytes(deferredBytes + buf.remaining());
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1315,6 +1315,7 @@ private void freeDeferredBufs() {
*/
Review Comment:
Wasn't able to comment on the `freeDeferredBufs()` lines since it is not
changed. We can change it to below to apply my other suggestions.
```
/**
* @return total number of deferred bytes
*/
private long freeDeferredBufs() {
if (deferredBufs == null || deferredBufs.isEmpty()) {
deferredBufs = null;
return 0;
}
long totalSize = 0;
for (ByteBuffer deferredBuf : deferredBufs) {
totalSize += deferredBuf.limit();
mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
}
mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
deferredBufs = null;
return totalSize;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]