Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r198746910
  
    --- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 ---
    @@ -145,10 +145,12 @@ public void write(Iterator<Product2<K, V>> records) 
throws IOException {
         // included in the shuffle write time.
         writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
     
    +    long numOfRecords = 0;
         while (records.hasNext()) {
           final Product2<K, V> record = records.next();
           final K key = record._1();
           partitionWriters[partitioner.getPartition(key)].write(key, 
record._2());
    +      numOfRecords += 1;
    --- End diff --
    
    To verify it, I ran a test locally. Looks like 
`writeMetrics_recordsWritten` is well updated in `SortShuffleWriter` path too. 
So I will replace `numOfRecords` with `writeMetrics_recordsWritten`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to