mengxr commented on a change in pull request #24387: [SPARK-27473][SQL] Support 
filter push down for status fields in binary file data source
URL: https://github.com/apache/spark/pull/24387#discussion_r276512013
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
 ##########
 @@ -51,52 +59,76 @@ class BinaryFileFormatSuite extends QueryTest with 
SharedSQLContext with SQLTest
     val year2015Dir = new File(testDir, "year=2015")
     year2015Dir.mkdir()
 
+    val file1 = new File(year2014Dir, "data.txt")
     Files.write(
-      new File(year2014Dir, "data.txt").toPath,
-      Seq("2014-test").asJava,
+      file1.toPath,
+      Seq("2014-test").asJava, // file length = 10
       StandardOpenOption.CREATE, StandardOpenOption.WRITE
     )
+    file1Status = fs.getFileStatus(new Path(file1.getAbsolutePath))
+
+    val file2 = new File(year2014Dir, "data2.bin")
     Files.write(
-      new File(year2014Dir, "data2.bin").toPath,
-      "2014-test-bin".getBytes,
+      file2.toPath,
+      "2014-test-bin".getBytes, // file length = 13
       StandardOpenOption.CREATE, StandardOpenOption.WRITE
     )
+    file2Status = fs.getFileStatus(new Path(file2.getAbsolutePath))
 
+    // sleep 1s to make the gen file modificationTime different,
+    // for unit-test for push down filters on modificationTime column.
+    Thread.sleep(1000)
+
+    val file3 = new File(year2015Dir, "bool.csv")
     Files.write(
-      new File(year2015Dir, "bool.csv").toPath,
-      Seq("bool", "True", "False", "true").asJava,
+      file3.toPath,
+      Seq("bool", "True", "False", "true").asJava, // file length = 21
       StandardOpenOption.CREATE, StandardOpenOption.WRITE
     )
+    file3Status = fs.getFileStatus(new Path(file3.getAbsolutePath))
+
+    val file4 = new File(year2015Dir, "data.bin")
     Files.write(
-      new File(year2015Dir, "data.txt").toPath,
-      "2015-test".getBytes,
+      file4.toPath,
+      "2015-test".getBytes, // file length = 9
       StandardOpenOption.CREATE, StandardOpenOption.WRITE
     )
+    file4Status = fs.getFileStatus(new Path(file4.getAbsolutePath))
+
+    fileStatusSet = Set(file1Status, file2Status, file3Status, file4Status)
   }
 
-  def testBinaryFileDataSource(pathGlobFilter: String): Unit = {
-    val resultDF = spark.read.format("binaryFile")
-      .option("pathGlobFilter", pathGlobFilter)
-      .load(testDir)
-      .select(
-        col("status.path"),
-        col("status.modificationTime"),
-        col("status.length"),
+  def testBinaryFileDataSource(
+      pathGlobFilter: String,
+      sqlFilter: Column,
+      expectedFilter: FileStatus => Boolean): Unit = {
+    val dfReader = spark.read.format("binaryFile")
+    if (pathGlobFilter != null) {
+      dfReader.option("pathGlobFilter", pathGlobFilter)
+    }
+    var resultDF = dfReader.load(testDir).select(
+        col("path"),
+        col("modificationTime"),
+        col("length"),
         col("content"),
         col("year") // this is a partition column
       )
+    if (sqlFilter != null) {
+      resultDF = resultDF.filter(sqlFilter)
 
 Review comment:
   The test doesn't really test filter push down because without it the test 
still passes. I think we should unit test the buildReader function alone.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to