Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/16677#discussion_r197613554
--- Diff:
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
---
@@ -145,10 +145,12 @@ public void write(Iterator<Product2<K, V>> records)
throws IOException {
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
+ long numOfRecords = 0;
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key,
record._2());
+ numOfRecords += 1;
--- End diff --
Hmm, I think it is fine. However, maybe I miss it, but I can't find
`SortShuffleWriter` has updated `writeMetrics_recordsWritten`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]