Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/22324#discussion_r215111327
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala ---
@@ -473,6 +476,27 @@ class FileBasedDataSourceSuite extends QueryTest with
SharedSQLContext with Befo
}
}
}
+
+ test("SPARK-25237 compute correct input metrics in FileScanRDD") {
+ withTempPath { p =>
+ val path = p.getAbsolutePath
+ spark.range(1000).repartition(1).write.csv(path)
+ val bytesReads = new mutable.ArrayBuffer[Long]()
+ val bytesReadListener = new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
+ }
+ }
+ sparkContext.addSparkListener(bytesReadListener)
+ try {
+ spark.read.csv(path).limit(1).collect()
+ sparkContext.listenerBus.waitUntilEmpty(1000L)
+ assert(bytesReads.sum === 7860)
--- End diff --
So the sum *should* be 10*2 + 90*3 + 900*4 = 3890. That's the size of the
CSV file that's written too, when I try it locally. When I run this code
without the change here, I get 7820+7820 = 15640. So this is better! but I
wonder why it ends up thinking it reads about twice the bytes?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]