HeartSaVioR commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r494268346



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -467,6 +467,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
    * the pattern. The syntax follows 
<code>org.apache.hadoop.fs.GlobFilter</code>.
    * It does not change the behavior of partition discovery.</li>
+   * <li>`modifiedBefore`: an optional timestamp to only include files with
+   * modification times  occurring before the specified Time. The provided 
timestamp
+   * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 
2020-06-01T13:00:00)
+   * <li>`modifiedAfter`: an optional timestamp to only include files with

Review comment:
       ditto

##########
File path: python/pyspark/sql/readwriter.py
##########
@@ -184,7 +196,8 @@ def json(self, path, schema=None, primitivesAsString=None, 
prefersDecimal=None,
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None,
              multiLine=None, allowUnquotedControlChars=None, lineSep=None, 
samplingRatio=None,
              dropFieldIfAllNull=None, encoding=None, locale=None, 
pathGlobFilter=None,
-             recursiveFileLookup=None, allowNonNumericNumbers=None):
+             recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None,

Review comment:
       Probably better not to change the order. I think such huge number of 
parameters end users will use named parameter almost every time, but just to be 
sure.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -752,6 +764,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
    * the pattern. The syntax follows 
<code>org.apache.hadoop.fs.GlobFilter</code>.
    * It does not change the behavior of partition discovery.</li>
+   * <li>`modifiedBefore`: an optional timestamp to only include files with
+   * modification times  occurring before the specified Time. The provided 
timestamp
+   * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 
2020-06-01T13:00:00)
+   * <li>`modifiedAfter`: an optional timestamp to only include files with

Review comment:
       ditto

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -785,6 +803,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
    * the pattern. The syntax follows 
<code>org.apache.hadoop.fs.GlobFilter</code>.
    * It does not change the behavior of partition discovery.</li>
+   * <li>`modifiedBefore`: an optional timestamp to only include files with

Review comment:
       ditto

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -785,6 +803,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
    * the pattern. The syntax follows 
<code>org.apache.hadoop.fs.GlobFilter</code>.
    * It does not change the behavior of partition discovery.</li>
+   * <li>`modifiedBefore`: an optional timestamp to only include files with
+   * modification times  occurring before the specified Time. The provided 
timestamp
+   * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 
2020-06-01T13:00:00)
+   * <li>`modifiedAfter`: an optional timestamp to only include files with

Review comment:
       ditto

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.hadoop.fs.{FileStatus, GlobFilter}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.types.UTF8String
+
+trait PathFilterStrategy extends Serializable {
+  def accept(fileStatus: FileStatus): Boolean
+}
+
+trait StrategyBuilder {
+  def strategy: String

Review comment:
       It would be simpler if we just pass the parameters and get 
PathFilterStrategy instance if applicable (`Option[PathFilterStrategy]` as 
return type of `create`), instead of let caller check it with `strategy` 
directly.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -840,6 +864,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
    * the pattern. The syntax follows 
<code>org.apache.hadoop.fs.GlobFilter</code>.
    * It does not change the behavior of partition discovery.</li>
+   * <li>`modifiedBefore`: an optional timestamp to only include files with
+   * modification times  occurring before the specified Time. The provided 
timestamp
+   * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 
2020-06-01T13:00:00)
+   * <li>`modifiedAfter`: an optional timestamp to only include files with

Review comment:
       ditto

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  def createSingleFile(dir: File): File = {
+    val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv")
+    stringToFile(file, "text")
+  }
+
+  def setFileTime(time: LocalDateTime, file: File): Boolean = {
+    val sameTime = time.toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def formatTime(time: LocalDateTime): String = {
+    time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore specified" +
+      " and sharing same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now(ZoneOffset.UTC)
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", formattedTime)
+          .option("timeZone", "UTC")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified" +
+      " and sharing same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("timeZone", "UTC")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with earlier file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setMinusFileTime(time, file, 3)
+
+      val formattedTime = formatTime(time)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with later file last modified time.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val time = LocalDateTime.now()
+      val formattedTime = formatTime(time)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      file.setLastModified(DateTimeUtils.currentTimestamp())
+      val df = spark.read
+        .option("modifiedAfter", "2019-05-10T01:11:00")
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a future date") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val afterTime = LocalDateTime.now().plusDays(25)
+      val formattedTime = formatTime(afterTime)
+      val df = spark.read
+        .option("modifiedBefore", formattedTime)
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test("SPARK-31962: with modifiedBefore option provided using a past date") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      file.setLastModified(DateTimeUtils.currentTimestamp())
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", "1984-05-01T01:00:00")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, one valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+      file1.setLastModified(DateTimeUtils.currentTimestamp())
+      file2.setLastModified(0)
+
+      val df = spark.read
+        .option("modifiedAfter", "2019-05-10T01:11:00")
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, both valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+      file1.setLastModified(DateTimeUtils.currentTimestamp())
+      file2.setLastModified(DateTimeUtils.currentTimestamp())
+      val df = spark.read
+        .option("modifiedAfter", "2019-05-10T01:11:00")
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 2)
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, none valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+      file1.setLastModified(0)
+      file2.setLastModified(0)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", "1984-05-01T01:00:00")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore specified with a future date, " +
+      "multiple files, both valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+      file1.setLastModified(0)
+      file2.setLastModified(0)
+
+      val time = LocalDateTime.now().plusDays(3)
+      val formattedTime = formatTime(time)
+
+      val df = spark.read
+        .option("modifiedBefore", formattedTime)
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 2)
+    }
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a future date, 
multiple files, one valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+      file1.setLastModified(0)
+      val time = LocalDateTime.now()
+      setPlusFileTime(time, file2, 3)
+
+      val formattedTime = formatTime(time)
+      val df = spark.read
+        .option("modifiedBefore", formattedTime)
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore specified with a future date, " +
+      "multiple files, none valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+
+      val time = LocalDateTime
+        .now()
+        .minusDays(1)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+      file1.setLastModified(DateTimeUtils.currentTimestamp())
+      file2.setLastModified(DateTimeUtils.currentTimestamp())
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", time)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified with a past date and " +
+      "pathGlobalFilter returning results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val df = spark.read
+        .option("modifiedAfter", "1984-05-10T01:11:00")
+        .option("pathGlobFilter", "*.csv")
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified with past date " +
+      "and pathGlobFilter filtering results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", "1984-05-01T01:00:00")
+          .option("pathGlobFilter", "*.txt")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified with future date and " +
+      "pathGlobFilter returning results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val time = LocalDateTime
+        .now()
+        .plusDays(10)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", time)
+          .option("pathGlobFilter", "*.csv")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified with future date and " +
+      "pathGlobFilter filtering results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val time = LocalDateTime
+        .now()
+        .plusDays(10)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", time)
+          .option("pathGlobFilter", "*.txt")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter are specified out of 
range and " +
+      "pathGlobFilter returning results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val time = LocalDateTime.now().plusDays(10)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .option("pathGlobFilter", "*.csv")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter are specified in range 
and " +
+      "pathGlobFilter returning results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val beforeTime = LocalDateTime
+        .now()
+        .minusDays(25)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+      val afterTime = LocalDateTime
+        .now()
+        .plusDays(25)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+      val df = spark.read
+        .option("modifiedAfter", beforeTime)
+        .option("modifiedBefore", afterTime)
+        .option("pathGlobFilter", "*.csv")
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter are specified in range 
and " +
+      "pathGlobFilter filtering results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val beforeTime = LocalDateTime
+        .now()
+        .minusDays(25)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+      val afterTime = LocalDateTime
+        .now()
+        .plusDays(25)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", beforeTime)
+          .option("modifiedBefore", afterTime)
+          .option("pathGlobFilter", "*.txt")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter is specified with an invalid date") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", "2024-05+1 01:00:00")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      Seq("The timestamp provided", "modifiedafter", "2024-05+1 
01:00:00").foreach {
+        expectedMsg =>
+          assert(msg.contains(expectedMsg))
+      }
+    }
+  }
+
+  test("SPARK-31962: modifiedBefore - empty option") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", "")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(
+        msg.contains("The timestamp provided for")
+          && msg.contains("modifiedbefore"))
+    }
+  }
+
+  test("SPARK-31962: modifiedAfter - empty option") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", "")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      Seq("The timestamp provided", "modifiedafter").foreach { expectedMsg =>
+        assert(msg.contains(expectedMsg))
+      }
+    }
+  }
+
+  test(
+    "SPARK-31962: modifiedAfter filter takes into account local timezone " +
+      "when specified as an option. After UTC.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val timeZone = DateTimeUtils.getTimeZone("UTC")
+      val strategyTime =
+        ModifiedDateFilter.toThreshold(
+          LocalDateTime.now(timeZone.toZoneId).toString,
+          "HST",
+          "modifiedafter")
+
+      assert(
+        strategyTime - DateTimeUtils
+          .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) > 
0)
+    }
+  }
+
+  test(
+    "SPARK-31962: modifiedAfter filter takes into account local timezone " +
+      "when specified as an option. Before UTC.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val timeZone = DateTimeUtils.getTimeZone("UTC")
+      val strategyTime =
+        ModifiedDateFilter.toThreshold(
+          LocalDateTime.now(timeZone.toZoneId).toString,
+          "HST",
+          "modifiedafter")
+      assert(
+        DateTimeUtils
+          .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) - 
strategyTime < 0)
+    }
+  }
+
+  test(
+    "SPARK-31962: modifiedBefore filter takes into account local timezone " +
+      "when specified as an option. After UTC.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val timeZone = DateTimeUtils.getTimeZone("UTC")
+      val strategyTime =
+        ModifiedDateFilter.toThreshold(
+          LocalDateTime.now(timeZone.toZoneId).toString,
+          "CET",
+          "modifiedbefore")
+      assert(
+        DateTimeUtils
+          .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) - 
strategyTime < 0)
+    }
+  }
+
+  test(
+    "SPARK-31962: modifiedBefore filter takes into account local timezone " +
+      "when specified as an option. Before UTC.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val timeZone = DateTimeUtils.getTimeZone("UTC")
+      val strategyTime =
+        ModifiedDateFilter.toThreshold(
+          LocalDateTime.now(timeZone.toZoneId).toString,
+          "HST",
+          "modifiedbefore")
+      assert(
+        strategyTime - 
DateTimeUtils.fromUTCTime(DateTimeUtils.currentTimestamp(), "UTC") > 0)
+    }
+  }
+
+  test("Option pathGlobFilter: filter files correctly") {
+    withTempPath { path =>
+      val dataDir = path.getCanonicalPath
+      Seq("foo").toDS().write.text(dataDir)
+      Seq("bar").toDS().write.mode("append").orc(dataDir)
+      val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir)
+      checkAnswer(df, Row("foo"))
+
+      // Both glob pattern in option and path should be effective to filter 
files.
+      val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + 
"/*.orc")
+      checkAnswer(df2, Seq.empty)
+
+      val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + 
"/*xt")
+      checkAnswer(df3, Row("foo"))
+    }
+  }
+
+  test("Option pathGlobFilter: simple extension filtering should contains 
partition info") {

Review comment:
       Just for other reviewers: I see this is simply moved, not a new one.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -467,6 +467,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
    * the pattern. The syntax follows 
<code>org.apache.hadoop.fs.GlobFilter</code>.
    * It does not change the behavior of partition discovery.</li>
+   * <li>`modifiedBefore`: an optional timestamp to only include files with

Review comment:
       nit: Let's add closing `</li>` as well.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -721,6 +727,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
    * the pattern. The syntax follows 
<code>org.apache.hadoop.fs.GlobFilter</code>.
    * It does not change the behavior of partition discovery.</li>
+   * <li>`modifiedBefore`: an optional timestamp to only include files with

Review comment:
       ditto

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -752,6 +764,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
    * the pattern. The syntax follows 
<code>org.apache.hadoop.fs.GlobFilter</code>.
    * It does not change the behavior of partition discovery.</li>
+   * <li>`modifiedBefore`: an optional timestamp to only include files with

Review comment:
       ditto

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -840,6 +864,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
    * the pattern. The syntax follows 
<code>org.apache.hadoop.fs.GlobFilter</code>.
    * It does not change the behavior of partition discovery.</li>
+   * <li>`modifiedBefore`: an optional timestamp to only include files with

Review comment:
       ditto

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -721,6 +727,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
    * the pattern. The syntax follows 
<code>org.apache.hadoop.fs.GlobFilter</code>.
    * It does not change the behavior of partition discovery.</li>
+   * <li>`modifiedBefore`: an optional timestamp to only include files with
+   * modification times  occurring before the specified Time. The provided 
timestamp
+   * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 
2020-06-01T13:00:00)
+   * <li>`modifiedAfter`: an optional timestamp to only include files with

Review comment:
       ditto

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.hadoop.fs.{FileStatus, GlobFilter}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.types.UTF8String
+
+trait PathFilterStrategy extends Serializable {
+  def accept(fileStatus: FileStatus): Boolean
+}
+
+trait StrategyBuilder {
+  def strategy: String
+  def create(parameters: CaseInsensitiveMap[String]): PathFilterStrategy
+}
+
+class PathGlobFilter(filePatten: String) extends PathFilterStrategy {
+
+  private val globFilter = new GlobFilter(filePatten)
+
+  override def accept(fileStatus: FileStatus): Boolean =
+    globFilter.accept(fileStatus.getPath)
+}
+
+object PathGlobFilter extends StrategyBuilder {
+
+  override def strategy: String = "pathglobfilter"
+
+  override def create(parameters: CaseInsensitiveMap[String]): 
PathFilterStrategy = {
+    new PathGlobFilter(parameters(strategy))
+  }
+}
+
+/**
+ * Provide modifiedAfter and modifiedBefore options when
+ * filtering from a batch-based file data source.
+ *
+ * Example Usages
+ * Load all CSV files modified after date:
+ * {{{
+ *   
spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()
+ * }}}
+ *
+ * Load all CSV files modified before date:
+ * {{{
+ *   
spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()
+ * }}}
+ *
+ * Load all CSV files modified between two dates:
+ * {{{
+ *   spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00")
+ *     .option("modifiedBefore","2020-06-15T05:00:00").load()
+ * }}}
+ */
+abstract class ModifiedDateFilter extends PathFilterStrategy {
+
+  def timeZoneId: String
+
+  protected def localTime(micros: Long): Long =
+    DateTimeUtils.fromUTCTime(micros, timeZoneId)
+}
+
+object ModifiedDateFilter {
+
+  def getTimeZoneId(options: CaseInsensitiveMap[String]): String = {
+    options.getOrElse(
+      DateTimeUtils.TIMEZONE_OPTION.toLowerCase(Locale.ROOT),
+      SQLConf.get.sessionLocalTimeZone)
+  }
+
+  def toThreshold(timeString: String, timeZoneId: String, strategy: String): 
Long = {
+    val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId)
+    val ts = UTF8String.fromString(timeString)
+    DateTimeUtils.stringToTimestamp(ts, timeZone.toZoneId).getOrElse {
+      throw new AnalysisException(
+        s"The timestamp provided for the '$strategy' option is invalid. The 
expected format " +
+          s"is 'YYYY-MM-DDTHH:mm:ss', but the provided timestamp: $timeString")
+    }
+  }
+}
+
+/**
+ * Filter used to determine whether file was modified before the provided 
timestamp.
+ */
+class ModifiedBeforeFilter(thresholdTime: Long, val timeZoneId: String)
+    extends ModifiedDateFilter {
+
+  override def accept(fileStatus: FileStatus): Boolean =
+    // We standardize on microseconds wherever possible
+    // getModificationTime returns in milliseconds
+    thresholdTime - 
localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) > 0
+}
+
+object ModifiedBeforeFilter extends StrategyBuilder {
+  import ModifiedDateFilter._
+
+  override val strategy: String = "modifiedbefore"
+
+  override def create(parameters: CaseInsensitiveMap[String]): 
PathFilterStrategy = {
+    val timeZoneId = getTimeZoneId(parameters)
+    val thresholdTime = toThreshold(parameters(strategy), timeZoneId, strategy)
+    new ModifiedBeforeFilter(thresholdTime, timeZoneId)
+  }
+}
+
+/**
+ * Filter used to determine whether file was modified after the provided 
timestamp.
+ */
+class ModifiedAfterFilter(thresholdTime: Long, val timeZoneId: String)
+    extends ModifiedDateFilter {
+
+  override def accept(fileStatus: FileStatus): Boolean =
+    // getModificationTime returns in milliseconds
+    // We standardize on microseconds wherever possible
+    localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) - 
thresholdTime > 0
+}
+
+object ModifiedAfterFilter extends StrategyBuilder {
+  import ModifiedDateFilter._
+
+  override val strategy: String = "modifiedafter"
+
+  override def create(parameters: CaseInsensitiveMap[String]): 
PathFilterStrategy = {
+    val timeZoneId = getTimeZoneId(parameters)
+    val thresholdTime = toThreshold(parameters(strategy), timeZoneId, strategy)
+    new ModifiedAfterFilter(thresholdTime, timeZoneId)
+  }
+}
+
+object PathFilterFactory {
+
+  private val strategies =
+    Seq(PathGlobFilter, ModifiedBeforeFilter, ModifiedAfterFilter)
+
+  def create(parameters: CaseInsensitiveMap[String]): Seq[PathFilterStrategy] 
= {
+    strategies.flatMap { s =>
+      parameters.get(s.strategy).map { _ =>

Review comment:
       After changing, `s.create(parameters)` would be sufficient for this.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  def createSingleFile(dir: File): File = {
+    val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv")

Review comment:
       nit: why not use 1000000 instead? :)

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  def createSingleFile(dir: File): File = {
+    val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv")
+    stringToFile(file, "text")
+  }
+
+  def setFileTime(time: LocalDateTime, file: File): Boolean = {
+    val sameTime = time.toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def formatTime(time: LocalDateTime): String = {
+    time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+  }
+
+  test(

Review comment:
       The overall new tests have similar structure, which can be extracted out.
   
   ```
     private def executeTest(
         dir: File,
         fileDate: LocalDateTime,
         modifiedBefore: Option[String] = None,
         modifiedAfter: Option[String] = None,
         pathGlobFilter: Option[String] = None): DataFrame = {
       executeTest(dir, Seq(fileDate), modifiedBefore, modifiedAfter, 
pathGlobFilter)
     }
   
     private def executeTest(
         dir: File,
         fileDates: Seq[LocalDateTime],
         modifiedBefore: Option[String] = None,
         modifiedAfter: Option[String] = None,
         pathGlobFilter: Option[String] = None): DataFrame = {
       fileDates.foreach { fileDate =>
         val file = createSingleFile(dir)
         setFileTime(fileDate, file)
       }
   
       var dfReader = spark.read.format("csv").option("timeZone", "UTC")
       modifiedBefore.foreach { opt => dfReader = 
dfReader.option("modifiedBefore", opt) }
       modifiedAfter.foreach { opt => dfReader = 
dfReader.option("modifiedAfter", opt) }
       pathGlobFilter.foreach { opt => dfReader = 
dfReader.option("pathGlobFilter", opt) }
   
       dfReader.load(dir.getCanonicalPath)
     }
   ```
   
   After adding these methods, this test could be changed to below:
   
   ```
     test(
       "SPARK-31962: when modifiedBefore specified" +
         " and sharing same timestamp with file last modified time.") {
       withTempDir { dir =>
         val time = LocalDateTime.now(ZoneOffset.UTC)
         val exc = intercept[AnalysisException] {
           executeTest(dir, time, modifiedBefore = Some(formatTime(time)))
         }
         assert(exc.getMessage.contains("Unable to infer schema for CSV"))
       }
     }
   ```

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  def createSingleFile(dir: File): File = {

Review comment:
       Probably good to make helper methods private. My preference is moving 
private methods bottom, but that's only me so not a big deal.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  def createSingleFile(dir: File): File = {
+    val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv")
+    stringToFile(file, "text")
+  }
+
+  def setFileTime(time: LocalDateTime, file: File): Boolean = {
+    val sameTime = time.toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def formatTime(time: LocalDateTime): String = {
+    time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore specified" +
+      " and sharing same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now(ZoneOffset.UTC)
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", formattedTime)
+          .option("timeZone", "UTC")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified" +
+      " and sharing same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("timeZone", "UTC")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with earlier file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setMinusFileTime(time, file, 3)
+
+      val formattedTime = formatTime(time)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with later file last modified time.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val time = LocalDateTime.now()
+      val formattedTime = formatTime(time)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      file.setLastModified(DateTimeUtils.currentTimestamp())

Review comment:
       Shall we do the similar with below test instead of using specific 
date/format?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  def createSingleFile(dir: File): File = {
+    val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv")
+    stringToFile(file, "text")
+  }
+
+  def setFileTime(time: LocalDateTime, file: File): Boolean = {
+    val sameTime = time.toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def formatTime(time: LocalDateTime): String = {
+    time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore specified" +
+      " and sharing same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now(ZoneOffset.UTC)
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", formattedTime)
+          .option("timeZone", "UTC")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified" +
+      " and sharing same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("timeZone", "UTC")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with earlier file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setMinusFileTime(time, file, 3)
+
+      val formattedTime = formatTime(time)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with later file last modified time.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val time = LocalDateTime.now()
+      val formattedTime = formatTime(time)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      file.setLastModified(DateTimeUtils.currentTimestamp())
+      val df = spark.read
+        .option("modifiedAfter", "2019-05-10T01:11:00")
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a future date") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val afterTime = LocalDateTime.now().plusDays(25)
+      val formattedTime = formatTime(afterTime)
+      val df = spark.read
+        .option("modifiedBefore", formattedTime)
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test("SPARK-31962: with modifiedBefore option provided using a past date") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      file.setLastModified(DateTimeUtils.currentTimestamp())
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", "1984-05-01T01:00:00")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, one valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+      file1.setLastModified(DateTimeUtils.currentTimestamp())
+      file2.setLastModified(0)
+
+      val df = spark.read
+        .option("modifiedAfter", "2019-05-10T01:11:00")
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, both valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+      file1.setLastModified(DateTimeUtils.currentTimestamp())
+      file2.setLastModified(DateTimeUtils.currentTimestamp())
+      val df = spark.read
+        .option("modifiedAfter", "2019-05-10T01:11:00")
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 2)
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, none valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+      file1.setLastModified(0)
+      file2.setLastModified(0)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", "1984-05-01T01:00:00")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore specified with a future date, " +
+      "multiple files, both valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+      file1.setLastModified(0)
+      file2.setLastModified(0)
+
+      val time = LocalDateTime.now().plusDays(3)
+      val formattedTime = formatTime(time)
+
+      val df = spark.read
+        .option("modifiedBefore", formattedTime)
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 2)
+    }
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a future date, 
multiple files, one valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+      file1.setLastModified(0)
+      val time = LocalDateTime.now()
+      setPlusFileTime(time, file2, 3)
+
+      val formattedTime = formatTime(time)
+      val df = spark.read
+        .option("modifiedBefore", formattedTime)
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore specified with a future date, " +
+      "multiple files, none valid") {
+    withTempDir { dir =>
+      val file1 = createSingleFile(dir)
+      val file2 = createSingleFile(dir)
+
+      val time = LocalDateTime
+        .now()
+        .minusDays(1)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+      file1.setLastModified(DateTimeUtils.currentTimestamp())
+      file2.setLastModified(DateTimeUtils.currentTimestamp())
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", time)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified with a past date and " +
+      "pathGlobalFilter returning results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val df = spark.read
+        .option("modifiedAfter", "1984-05-10T01:11:00")
+        .option("pathGlobFilter", "*.csv")
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified with past date " +
+      "and pathGlobFilter filtering results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", "1984-05-01T01:00:00")
+          .option("pathGlobFilter", "*.txt")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified with future date and " +
+      "pathGlobFilter returning results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val time = LocalDateTime
+        .now()
+        .plusDays(10)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", time)
+          .option("pathGlobFilter", "*.csv")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified with future date and " +
+      "pathGlobFilter filtering results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val time = LocalDateTime
+        .now()
+        .plusDays(10)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", time)
+          .option("pathGlobFilter", "*.txt")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter are specified out of 
range and " +
+      "pathGlobFilter returning results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val time = LocalDateTime.now().plusDays(10)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .option("pathGlobFilter", "*.csv")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter are specified in range 
and " +
+      "pathGlobFilter returning results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val beforeTime = LocalDateTime
+        .now()
+        .minusDays(25)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+      val afterTime = LocalDateTime
+        .now()
+        .plusDays(25)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+      val df = spark.read
+        .option("modifiedAfter", beforeTime)
+        .option("modifiedBefore", afterTime)
+        .option("pathGlobFilter", "*.csv")
+        .format("csv")
+        .load(dir.getCanonicalPath)
+      assert(df.count() == 1)
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter are specified in range 
and " +
+      "pathGlobFilter filtering results") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val beforeTime = LocalDateTime
+        .now()
+        .minusDays(25)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+      val afterTime = LocalDateTime
+        .now()
+        .plusDays(25)
+        .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", beforeTime)
+          .option("modifiedBefore", afterTime)
+          .option("pathGlobFilter", "*.txt")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter is specified with an invalid date") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", "2024-05+1 01:00:00")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      Seq("The timestamp provided", "modifiedafter", "2024-05+1 
01:00:00").foreach {
+        expectedMsg =>
+          assert(msg.contains(expectedMsg))
+      }
+    }
+  }
+
+  test("SPARK-31962: modifiedBefore - empty option") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", "")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(
+        msg.contains("The timestamp provided for")
+          && msg.contains("modifiedbefore"))
+    }
+  }
+
+  test("SPARK-31962: modifiedAfter - empty option") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", "")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      Seq("The timestamp provided", "modifiedafter").foreach { expectedMsg =>
+        assert(msg.contains(expectedMsg))
+      }
+    }
+  }
+
+  test(
+    "SPARK-31962: modifiedAfter filter takes into account local timezone " +
+      "when specified as an option. After UTC.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val timeZone = DateTimeUtils.getTimeZone("UTC")
+      val strategyTime =
+        ModifiedDateFilter.toThreshold(
+          LocalDateTime.now(timeZone.toZoneId).toString,
+          "HST",
+          "modifiedafter")
+
+      assert(
+        strategyTime - DateTimeUtils
+          .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) > 
0)
+    }
+  }
+
+  test(
+    "SPARK-31962: modifiedAfter filter takes into account local timezone " +
+      "when specified as an option. Before UTC.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+
+      val timeZone = DateTimeUtils.getTimeZone("UTC")
+      val strategyTime =
+        ModifiedDateFilter.toThreshold(
+          LocalDateTime.now(timeZone.toZoneId).toString,
+          "HST",
+          "modifiedafter")
+      assert(
+        DateTimeUtils
+          .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) - 
strategyTime < 0)
+    }
+  }
+
+  test(
+    "SPARK-31962: modifiedBefore filter takes into account local timezone " +
+      "when specified as an option. After UTC.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val timeZone = DateTimeUtils.getTimeZone("UTC")
+      val strategyTime =
+        ModifiedDateFilter.toThreshold(
+          LocalDateTime.now(timeZone.toZoneId).toString,
+          "CET",
+          "modifiedbefore")
+      assert(
+        DateTimeUtils
+          .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) - 
strategyTime < 0)
+    }
+  }
+
+  test(
+    "SPARK-31962: modifiedBefore filter takes into account local timezone " +
+      "when specified as an option. Before UTC.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val timeZone = DateTimeUtils.getTimeZone("UTC")
+      val strategyTime =
+        ModifiedDateFilter.toThreshold(
+          LocalDateTime.now(timeZone.toZoneId).toString,
+          "HST",
+          "modifiedbefore")
+      assert(
+        strategyTime - 
DateTimeUtils.fromUTCTime(DateTimeUtils.currentTimestamp(), "UTC") > 0)
+    }
+  }
+
+  test("Option pathGlobFilter: filter files correctly") {

Review comment:
       Just for other reviewers: I see this is simply moved, not a new one.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  def createSingleFile(dir: File): File = {
+    val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv")
+    stringToFile(file, "text")
+  }
+
+  def setFileTime(time: LocalDateTime, file: File): Boolean = {
+    val sameTime = time.toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def formatTime(time: LocalDateTime): String = {
+    time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore specified" +
+      " and sharing same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now(ZoneOffset.UTC)
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", formattedTime)
+          .option("timeZone", "UTC")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified" +
+      " and sharing same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("timeZone", "UTC")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with earlier file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setMinusFileTime(time, file, 3)
+
+      val formattedTime = formatTime(time)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with later file last modified time.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val time = LocalDateTime.now()
+      val formattedTime = formatTime(time)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      file.setLastModified(DateTimeUtils.currentTimestamp())

Review comment:
       Shall we do the similar with below test instead of using specific 
date/format? 
   
   Moreover, it would be consistent if we can go with LocalDateTime in all 
cases.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  def createSingleFile(dir: File): File = {
+    val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv")
+    stringToFile(file, "text")
+  }
+
+  def setFileTime(time: LocalDateTime, file: File): Boolean = {
+    val sameTime = time.toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+    val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC)
+    file.setLastModified(sameTime * 1000)
+  }
+
+  def formatTime(time: LocalDateTime): String = {
+    time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore specified" +
+      " and sharing same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now(ZoneOffset.UTC)
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedBefore", formattedTime)
+          .option("timeZone", "UTC")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedAfter specified" +
+      " and sharing same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("timeZone", "UTC")
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setFileTime(time, file)
+      val formattedTime = formatTime(time)
+
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with earlier file last modified time.") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      val time = LocalDateTime.now()
+      setMinusFileTime(time, file, 3)
+
+      val formattedTime = formatTime(time)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test(
+    "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+      " share same timestamp with later file last modified time.") {
+    withTempDir { dir =>
+      createSingleFile(dir)
+      val time = LocalDateTime.now()
+      val formattedTime = formatTime(time)
+      val msg = intercept[AnalysisException] {
+        spark.read
+          .option("modifiedAfter", formattedTime)
+          .option("modifiedBefore", formattedTime)
+          .format("csv")
+          .load(dir.getCanonicalPath)
+      }.getMessage
+      assert(msg.contains("Unable to infer schema for CSV"))
+    }
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+    withTempDir { dir =>
+      val file = createSingleFile(dir)
+      file.setLastModified(DateTimeUtils.currentTimestamp())

Review comment:
       Shall we do the similar with below test instead of using specific 
date/format? 
   
   Moreover, it would be consistent if we can go with LocalDateTime in all 
cases whenever possible.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to