Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/16677#discussion_r198746910
--- Diff:
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
---
@@ -145,10 +145,12 @@ public void write(Iterator<Product2<K, V>> records)
throws IOException {
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
+ long numOfRecords = 0;
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key,
record._2());
+ numOfRecords += 1;
--- End diff --
To verify it, I ran a test locally. Looks like
`writeMetrics_recordsWritten` is well updated in `SortShuffleWriter` path too.
So I will replace `numOfRecords` with `writeMetrics_recordsWritten`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]