otterc commented on code in PR #39725:
URL: https://github.com/apache/spark/pull/39725#discussion_r1087470017
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1356,6 +1362,15 @@ private boolean isTooLate(
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
}
+ /**
+ * Update ignoredBlockBytes in pushMergeMetrics.
+ */
+ private void updateIgnoredBytes(long numBytes) {
+ if (numBytes > 0) {
+ mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(numBytes);
+ }
+ }
+
Review Comment:
```suggestion
private long totalBytesReceived = 0;
/**
* Update ignoredBlockBytes in pushMergeMetrics.
*/
private void receivedBytesIgnored() {
if (totalBytesReceived > 0) {
mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(totalBytesReceived);
totalBytesReceived = 0;
}
}
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1298,24 +1298,30 @@ private void writeDeferredBufs() throws IOException {
deferredBufs = null;
}
- private void freeDeferredBufs() {
- if (deferredBufs != null && !deferredBufs.isEmpty()) {
- long totalSize = 0;
- for (ByteBuffer deferredBuf : deferredBufs) {
- totalSize += deferredBuf.limit();
- mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
- }
- mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
+ /**
+ * @return total number of deferred bytes
+ */
+ private long freeDeferredBufs() {
+ if (deferredBufs == null || deferredBufs.isEmpty()) {
+ deferredBufs = null;
+ return 0;
+ }
+ long totalSize = 0;
+ for (ByteBuffer deferredBuf : deferredBufs) {
+ totalSize += deferredBuf.limit();
+ mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
}
+ mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
deferredBufs = null;
+ return totalSize;
Review Comment:
revert this change
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1356,6 +1362,15 @@ private boolean isTooLate(
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
}
+ /**
+ * Update ignoredBlockBytes in pushMergeMetrics.
+ */
+ private void updateIgnoredBytes(long numBytes) {
+ if (numBytes > 0) {
+ mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(numBytes);
+ }
+ }
+
@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
Review Comment:
```suggestion
public void onData(String streamId, ByteBuffer buf) throws IOException {
totalBytesReceived += buf.remaining();
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1298,24 +1298,30 @@ private void writeDeferredBufs() throws IOException {
deferredBufs = null;
}
- private void freeDeferredBufs() {
- if (deferredBufs != null && !deferredBufs.isEmpty()) {
- long totalSize = 0;
- for (ByteBuffer deferredBuf : deferredBufs) {
- totalSize += deferredBuf.limit();
- mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
- }
- mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
+ /**
+ * @return total number of deferred bytes
+ */
+ private long freeDeferredBufs() {
+ if (deferredBufs == null || deferredBufs.isEmpty()) {
+ deferredBufs = null;
+ return 0;
+ }
+ long totalSize = 0;
+ for (ByteBuffer deferredBuf : deferredBufs) {
+ totalSize += deferredBuf.limit();
+ mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
}
+ mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
deferredBufs = null;
+ return totalSize;
}
/**
* @throws IllegalStateException if the number of IOExceptions have
exceeded threshold.
*/
private void abortIfNecessary() {
if
(partitionInfo.shouldAbort(mergeManager.ioExceptionsThresholdDuringMerge)) {
- freeDeferredBufs();
+ updateIgnoredBytes(freeDeferredBufs());
Review Comment:
Revert
```suggestion
freeDeferredBufs();
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1410,6 +1425,10 @@ public void onData(String streamId, ByteBuffer buf)
throws IOException {
}
writeBuf(buf);
} catch (IOException ioe) {
+ // When server failed to write, capture the number of failed bytes
to ignoreBlckBytes
+ // of pushMergeMetrics. The number of deferred bytes are already
captured in
+ // incrementIOExceptionsAndAbortIfNecessary().
+
mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(buf.remaining());
Review Comment:
Revert this change
```suggestion
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1461,14 +1480,14 @@ public void onComplete(String streamId) throws
IOException {
AppShuffleMergePartitionsInfo info =
appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
if (isTooLate(info, partitionInfo.reduceId)) {
- freeDeferredBufs();
+ updateIgnoredBytes(freeDeferredBufs());
Review Comment:
Revert
```suggestion
freeDeferredBufs();
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1520,7 +1539,7 @@ public void onComplete(String streamId) throws
IOException {
partitionInfo.resetChunkTracker();
}
} else {
- freeDeferredBufs();
+ updateIgnoredBytes(freeDeferredBufs());
Review Comment:
Revert
```suggestion
freeDeferredBufs();
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1554,6 +1573,7 @@ public void onFailure(String streamId, Throwable
throwable) throws IOException {
}
}
isWriting = false;
+ updateIgnoredBytes(freeDeferredBufs());
Review Comment:
```suggestion
receivedBytesIgnored();
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1392,7 +1407,7 @@ public void onData(String streamId, ByteBuffer buf)
throws IOException {
// Identify duplicate block generated by speculative tasks. We
respond success to
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
- freeDeferredBufs();
+ updateIgnoredBytes(freeDeferredBufs() + buf.remaining());
Review Comment:
Revert
```suggestion
freeDeferredBufs();
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1379,7 +1394,7 @@ public void onData(String streamId, ByteBuffer buf)
throws IOException {
isStale(info,
partitionInfo.appAttemptShuffleMergeId.shuffleMergeId);
boolean isTooLateBlockPush = isTooLate(info, partitionInfo.reduceId);
if (isStaleBlockPush || isTooLateBlockPush) {
- freeDeferredBufs();
+ updateIgnoredBytes(freeDeferredBufs() + buf.remaining());
Review Comment:
Revert
```suggestion
freeDeferredBufs();
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1461,14 +1480,14 @@ public void onComplete(String streamId) throws
IOException {
AppShuffleMergePartitionsInfo info =
appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
if (isTooLate(info, partitionInfo.reduceId)) {
- freeDeferredBufs();
+ updateIgnoredBytes(freeDeferredBufs());
mergeManager.pushMergeMetrics.lateBlockPushes.mark();
throw new BlockPushNonFatalFailure(
new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
streamId).toByteBuffer(),
BlockPushNonFatalFailure.getErrorMsg(streamId,
ReturnCode.TOO_LATE_BLOCK_PUSH));
}
if (isStale(info,
partitionInfo.appAttemptShuffleMergeId.shuffleMergeId)) {
- freeDeferredBufs();
+ updateIgnoredBytes(freeDeferredBufs());
Review Comment:
Revert
```suggestion
freeDeferredBufs();
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1480,7 +1499,7 @@ public void onComplete(String streamId) throws
IOException {
// Identify duplicate block generated by speculative tasks. We
respond success to
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
- freeDeferredBufs();
+ updateIgnoredBytes(freeDeferredBufs());
Review Comment:
```suggestion
freeDeferredBufs();
// Since we don't throw an exception and just return, but the
received bytes are ignored we need to update the ignored bytes metric.
receivedBytesIgnored();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]