Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/22324#discussion_r215315904
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala ---
@@ -473,6 +476,27 @@ class FileBasedDataSourceSuite extends QueryTest with
SharedSQLContext with Befo
}
}
}
+
+ test("SPARK-25237 compute correct input metrics in FileScanRDD") {
+ withTempPath { p =>
+ val path = p.getAbsolutePath
+ spark.range(1000).repartition(1).write.csv(path)
+ val bytesReads = new mutable.ArrayBuffer[Long]()
+ val bytesReadListener = new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
+ }
+ }
+ sparkContext.addSparkListener(bytesReadListener)
+ try {
+ spark.read.csv(path).limit(1).collect()
+ sparkContext.listenerBus.waitUntilEmpty(1000L)
+ assert(bytesReads.sum === 7860)
--- End diff --
In this test, Spark run with `local[2]` and each scan thread points to the
same CSV file. Since each thread gets the file size thru Hadoop APIs, the total
`byteRead` becomes 2 * the file size, IIUC.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]