Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/16677#discussion_r198360170
--- Diff:
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
---
@@ -145,10 +145,12 @@ public void write(Iterator<Product2<K, V>> records)
throws IOException {
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
+ long numOfRecords = 0;
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key,
record._2());
+ numOfRecords += 1;
--- End diff --
We are introducing numOfRecords because (as you say) some code paths are
not updating the metric.
Instead of working around the bug and cluttering code, it is better to fix
it cleanly (unless there is some design issue or more complicated issue).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]