Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22594#discussion_r222884751
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 ---
    @@ -104,12 +104,14 @@ class FileScanRDD(
             val nextElement = currentIterator.next()
             // TODO: we should have a better separation of row based and batch 
based scan, so that we
             // don't need to run this `if` for every record.
    +        val preNumRecordsRead = inputMetrics.recordsRead
             if (nextElement.isInstanceOf[ColumnarBatch]) {
               
inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
             } else {
               inputMetrics.incRecordsRead(1)
             }
    -        if (inputMetrics.recordsRead % 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
    --- End diff --
    
    I think the issue is that in line 108, this value can be incremented by 
more than 1. It might skip over the count that is an exact multiple of 
`UPDATE_INPUT_METRICS_INTERVAL_RECORDS`. If that code path is common, it might 
rarely ever get updated. This now just checks whether the increment causes the 
value to exceed a higher multiple of `UPDATE_INPUT_METRICS_INTERVAL_RECORDS`, 
which sounds more correct. But yeah needs a description and ideally a little 
test.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to