HeartSaVioR commented on a change in pull request #28841: URL: https://github.com/apache/spark/pull/28841#discussion_r494268346
########## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ########## @@ -467,6 +467,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * It does not change the behavior of partition discovery.</li> + * <li>`modifiedBefore`: an optional timestamp to only include files with + * modification times occurring before the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + * <li>`modifiedAfter`: an optional timestamp to only include files with Review comment: ditto ########## File path: python/pyspark/sql/readwriter.py ########## @@ -184,7 +196,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None, dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None, - recursiveFileLookup=None, allowNonNumericNumbers=None): + recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, Review comment: Probably better not to change the order. I think such huge number of parameters end users will use named parameter almost every time, but just to be sure. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ########## @@ -752,6 +764,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * It does not change the behavior of partition discovery.</li> + * <li>`modifiedBefore`: an optional timestamp to only include files with + * modification times occurring before the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + * <li>`modifiedAfter`: an optional timestamp to only include files with Review comment: ditto ########## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ########## @@ -785,6 +803,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * It does not change the behavior of partition discovery.</li> + * <li>`modifiedBefore`: an optional timestamp to only include files with Review comment: ditto ########## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ########## @@ -785,6 +803,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * It does not change the behavior of partition discovery.</li> + * <li>`modifiedBefore`: an optional timestamp to only include files with + * modification times occurring before the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + * <li>`modifiedAfter`: an optional timestamp to only include files with Review comment: ditto ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.{Locale, TimeZone} + +import org.apache.hadoop.fs.{FileStatus, GlobFilter} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.unsafe.types.UTF8String + +trait PathFilterStrategy extends Serializable { + def accept(fileStatus: FileStatus): Boolean +} + +trait StrategyBuilder { + def strategy: String Review comment: It would be simpler if we just pass the parameters and get PathFilterStrategy instance if applicable (`Option[PathFilterStrategy]` as return type of `create`), instead of let caller check it with `strategy` directly. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ########## @@ -840,6 +864,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * It does not change the behavior of partition discovery.</li> + * <li>`modifiedBefore`: an optional timestamp to only include files with + * modification times occurring before the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + * <li>`modifiedAfter`: an optional timestamp to only include files with Review comment: ditto ########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter + +import scala.util.Random + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils} +import org.apache.spark.sql.test.SharedSparkSession + +class PathFilterSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + def createSingleFile(dir: File): File = { + val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv") + stringToFile(file, "text") + } + + def setFileTime(time: LocalDateTime, file: File): Boolean = { + val sameTime = time.toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def formatTime(time: LocalDateTime): String = { + time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + } + + test( + "SPARK-31962: when modifiedBefore specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now(ZoneOffset.UTC) + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", formattedTime) + .option("timeZone", "UTC") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("timeZone", "UTC") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with earlier file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setMinusFileTime(time, file, 3) + + val formattedTime = formatTime(time) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with later file last modified time.") { + withTempDir { dir => + createSingleFile(dir) + val time = LocalDateTime.now() + val formattedTime = formatTime(time) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date") { + withTempDir { dir => + val file = createSingleFile(dir) + file.setLastModified(DateTimeUtils.currentTimestamp()) + val df = spark.read + .option("modifiedAfter", "2019-05-10T01:11:00") + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test("SPARK-31962: when modifiedBefore specified with a future date") { + withTempDir { dir => + createSingleFile(dir) + val afterTime = LocalDateTime.now().plusDays(25) + val formattedTime = formatTime(afterTime) + val df = spark.read + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test("SPARK-31962: with modifiedBefore option provided using a past date") { + withTempDir { dir => + val file = createSingleFile(dir) + file.setLastModified(DateTimeUtils.currentTimestamp()) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", "1984-05-01T01:00:00") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date, multiple files, one valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + file1.setLastModified(DateTimeUtils.currentTimestamp()) + file2.setLastModified(0) + + val df = spark.read + .option("modifiedAfter", "2019-05-10T01:11:00") + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date, multiple files, both valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + file1.setLastModified(DateTimeUtils.currentTimestamp()) + file2.setLastModified(DateTimeUtils.currentTimestamp()) + val df = spark.read + .option("modifiedAfter", "2019-05-10T01:11:00") + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 2) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date, multiple files, none valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + file1.setLastModified(0) + file2.setLastModified(0) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", "1984-05-01T01:00:00") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore specified with a future date, " + + "multiple files, both valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + file1.setLastModified(0) + file2.setLastModified(0) + + val time = LocalDateTime.now().plusDays(3) + val formattedTime = formatTime(time) + + val df = spark.read + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 2) + } + } + + test("SPARK-31962: when modifiedBefore specified with a future date, multiple files, one valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + file1.setLastModified(0) + val time = LocalDateTime.now() + setPlusFileTime(time, file2, 3) + + val formattedTime = formatTime(time) + val df = spark.read + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test( + "SPARK-31962: when modifiedBefore specified with a future date, " + + "multiple files, none valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + + val time = LocalDateTime + .now() + .minusDays(1) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + + file1.setLastModified(DateTimeUtils.currentTimestamp()) + file2.setLastModified(DateTimeUtils.currentTimestamp()) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", time) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified with a past date and " + + "pathGlobalFilter returning results") { + withTempDir { dir => + createSingleFile(dir) + val df = spark.read + .option("modifiedAfter", "1984-05-10T01:11:00") + .option("pathGlobFilter", "*.csv") + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test( + "SPARK-31962: when modifiedAfter specified with past date " + + "and pathGlobFilter filtering results") { + withTempDir { dir => + createSingleFile(dir) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", "1984-05-01T01:00:00") + .option("pathGlobFilter", "*.txt") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified with future date and " + + "pathGlobFilter returning results") { + withTempDir { dir => + createSingleFile(dir) + + val time = LocalDateTime + .now() + .plusDays(10) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", time) + .option("pathGlobFilter", "*.csv") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified with future date and " + + "pathGlobFilter filtering results") { + withTempDir { dir => + createSingleFile(dir) + + val time = LocalDateTime + .now() + .plusDays(10) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", time) + .option("pathGlobFilter", "*.txt") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter are specified out of range and " + + "pathGlobFilter returning results") { + withTempDir { dir => + createSingleFile(dir) + + val time = LocalDateTime.now().plusDays(10) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .option("pathGlobFilter", "*.csv") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter are specified in range and " + + "pathGlobFilter returning results") { + withTempDir { dir => + createSingleFile(dir) + + val beforeTime = LocalDateTime + .now() + .minusDays(25) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + val afterTime = LocalDateTime + .now() + .plusDays(25) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + + val df = spark.read + .option("modifiedAfter", beforeTime) + .option("modifiedBefore", afterTime) + .option("pathGlobFilter", "*.csv") + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter are specified in range and " + + "pathGlobFilter filtering results") { + withTempDir { dir => + createSingleFile(dir) + + val beforeTime = LocalDateTime + .now() + .minusDays(25) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + val afterTime = LocalDateTime + .now() + .plusDays(25) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", beforeTime) + .option("modifiedBefore", afterTime) + .option("pathGlobFilter", "*.txt") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test("SPARK-31962: when modifiedAfter is specified with an invalid date") { + withTempDir { dir => + createSingleFile(dir) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", "2024-05+1 01:00:00") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + Seq("The timestamp provided", "modifiedafter", "2024-05+1 01:00:00").foreach { + expectedMsg => + assert(msg.contains(expectedMsg)) + } + } + } + + test("SPARK-31962: modifiedBefore - empty option") { + withTempDir { dir => + createSingleFile(dir) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", "") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert( + msg.contains("The timestamp provided for") + && msg.contains("modifiedbefore")) + } + } + + test("SPARK-31962: modifiedAfter - empty option") { + withTempDir { dir => + createSingleFile(dir) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", "") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + Seq("The timestamp provided", "modifiedafter").foreach { expectedMsg => + assert(msg.contains(expectedMsg)) + } + } + } + + test( + "SPARK-31962: modifiedAfter filter takes into account local timezone " + + "when specified as an option. After UTC.") { + withTempDir { dir => + createSingleFile(dir) + val timeZone = DateTimeUtils.getTimeZone("UTC") + val strategyTime = + ModifiedDateFilter.toThreshold( + LocalDateTime.now(timeZone.toZoneId).toString, + "HST", + "modifiedafter") + + assert( + strategyTime - DateTimeUtils + .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) > 0) + } + } + + test( + "SPARK-31962: modifiedAfter filter takes into account local timezone " + + "when specified as an option. Before UTC.") { + withTempDir { dir => + createSingleFile(dir) + + val timeZone = DateTimeUtils.getTimeZone("UTC") + val strategyTime = + ModifiedDateFilter.toThreshold( + LocalDateTime.now(timeZone.toZoneId).toString, + "HST", + "modifiedafter") + assert( + DateTimeUtils + .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) - strategyTime < 0) + } + } + + test( + "SPARK-31962: modifiedBefore filter takes into account local timezone " + + "when specified as an option. After UTC.") { + withTempDir { dir => + createSingleFile(dir) + val timeZone = DateTimeUtils.getTimeZone("UTC") + val strategyTime = + ModifiedDateFilter.toThreshold( + LocalDateTime.now(timeZone.toZoneId).toString, + "CET", + "modifiedbefore") + assert( + DateTimeUtils + .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) - strategyTime < 0) + } + } + + test( + "SPARK-31962: modifiedBefore filter takes into account local timezone " + + "when specified as an option. Before UTC.") { + withTempDir { dir => + createSingleFile(dir) + val timeZone = DateTimeUtils.getTimeZone("UTC") + val strategyTime = + ModifiedDateFilter.toThreshold( + LocalDateTime.now(timeZone.toZoneId).toString, + "HST", + "modifiedbefore") + assert( + strategyTime - DateTimeUtils.fromUTCTime(DateTimeUtils.currentTimestamp(), "UTC") > 0) + } + } + + test("Option pathGlobFilter: filter files correctly") { + withTempPath { path => + val dataDir = path.getCanonicalPath + Seq("foo").toDS().write.text(dataDir) + Seq("bar").toDS().write.mode("append").orc(dataDir) + val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir) + checkAnswer(df, Row("foo")) + + // Both glob pattern in option and path should be effective to filter files. + val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc") + checkAnswer(df2, Seq.empty) + + val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt") + checkAnswer(df3, Row("foo")) + } + } + + test("Option pathGlobFilter: simple extension filtering should contains partition info") { Review comment: Just for other reviewers: I see this is simply moved, not a new one. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ########## @@ -467,6 +467,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * It does not change the behavior of partition discovery.</li> + * <li>`modifiedBefore`: an optional timestamp to only include files with Review comment: nit: Let's add closing `</li>` as well. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ########## @@ -721,6 +727,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * It does not change the behavior of partition discovery.</li> + * <li>`modifiedBefore`: an optional timestamp to only include files with Review comment: ditto ########## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ########## @@ -752,6 +764,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * It does not change the behavior of partition discovery.</li> + * <li>`modifiedBefore`: an optional timestamp to only include files with Review comment: ditto ########## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ########## @@ -840,6 +864,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * It does not change the behavior of partition discovery.</li> + * <li>`modifiedBefore`: an optional timestamp to only include files with Review comment: ditto ########## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ########## @@ -721,6 +727,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * It does not change the behavior of partition discovery.</li> + * <li>`modifiedBefore`: an optional timestamp to only include files with + * modification times occurring before the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + * <li>`modifiedAfter`: an optional timestamp to only include files with Review comment: ditto ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.{Locale, TimeZone} + +import org.apache.hadoop.fs.{FileStatus, GlobFilter} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.unsafe.types.UTF8String + +trait PathFilterStrategy extends Serializable { + def accept(fileStatus: FileStatus): Boolean +} + +trait StrategyBuilder { + def strategy: String + def create(parameters: CaseInsensitiveMap[String]): PathFilterStrategy +} + +class PathGlobFilter(filePatten: String) extends PathFilterStrategy { + + private val globFilter = new GlobFilter(filePatten) + + override def accept(fileStatus: FileStatus): Boolean = + globFilter.accept(fileStatus.getPath) +} + +object PathGlobFilter extends StrategyBuilder { + + override def strategy: String = "pathglobfilter" + + override def create(parameters: CaseInsensitiveMap[String]): PathFilterStrategy = { + new PathGlobFilter(parameters(strategy)) + } +} + +/** + * Provide modifiedAfter and modifiedBefore options when + * filtering from a batch-based file data source. + * + * Example Usages + * Load all CSV files modified after date: + * {{{ + * spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load() + * }}} + * + * Load all CSV files modified before date: + * {{{ + * spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load() + * }}} + * + * Load all CSV files modified between two dates: + * {{{ + * spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00") + * .option("modifiedBefore","2020-06-15T05:00:00").load() + * }}} + */ +abstract class ModifiedDateFilter extends PathFilterStrategy { + + def timeZoneId: String + + protected def localTime(micros: Long): Long = + DateTimeUtils.fromUTCTime(micros, timeZoneId) +} + +object ModifiedDateFilter { + + def getTimeZoneId(options: CaseInsensitiveMap[String]): String = { + options.getOrElse( + DateTimeUtils.TIMEZONE_OPTION.toLowerCase(Locale.ROOT), + SQLConf.get.sessionLocalTimeZone) + } + + def toThreshold(timeString: String, timeZoneId: String, strategy: String): Long = { + val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId) + val ts = UTF8String.fromString(timeString) + DateTimeUtils.stringToTimestamp(ts, timeZone.toZoneId).getOrElse { + throw new AnalysisException( + s"The timestamp provided for the '$strategy' option is invalid. The expected format " + + s"is 'YYYY-MM-DDTHH:mm:ss', but the provided timestamp: $timeString") + } + } +} + +/** + * Filter used to determine whether file was modified before the provided timestamp. + */ +class ModifiedBeforeFilter(thresholdTime: Long, val timeZoneId: String) + extends ModifiedDateFilter { + + override def accept(fileStatus: FileStatus): Boolean = + // We standardize on microseconds wherever possible + // getModificationTime returns in milliseconds + thresholdTime - localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) > 0 +} + +object ModifiedBeforeFilter extends StrategyBuilder { + import ModifiedDateFilter._ + + override val strategy: String = "modifiedbefore" + + override def create(parameters: CaseInsensitiveMap[String]): PathFilterStrategy = { + val timeZoneId = getTimeZoneId(parameters) + val thresholdTime = toThreshold(parameters(strategy), timeZoneId, strategy) + new ModifiedBeforeFilter(thresholdTime, timeZoneId) + } +} + +/** + * Filter used to determine whether file was modified after the provided timestamp. + */ +class ModifiedAfterFilter(thresholdTime: Long, val timeZoneId: String) + extends ModifiedDateFilter { + + override def accept(fileStatus: FileStatus): Boolean = + // getModificationTime returns in milliseconds + // We standardize on microseconds wherever possible + localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) - thresholdTime > 0 +} + +object ModifiedAfterFilter extends StrategyBuilder { + import ModifiedDateFilter._ + + override val strategy: String = "modifiedafter" + + override def create(parameters: CaseInsensitiveMap[String]): PathFilterStrategy = { + val timeZoneId = getTimeZoneId(parameters) + val thresholdTime = toThreshold(parameters(strategy), timeZoneId, strategy) + new ModifiedAfterFilter(thresholdTime, timeZoneId) + } +} + +object PathFilterFactory { + + private val strategies = + Seq(PathGlobFilter, ModifiedBeforeFilter, ModifiedAfterFilter) + + def create(parameters: CaseInsensitiveMap[String]): Seq[PathFilterStrategy] = { + strategies.flatMap { s => + parameters.get(s.strategy).map { _ => Review comment: After changing, `s.create(parameters)` would be sufficient for this. ########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter + +import scala.util.Random + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils} +import org.apache.spark.sql.test.SharedSparkSession + +class PathFilterSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + def createSingleFile(dir: File): File = { + val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv") Review comment: nit: why not use 1000000 instead? :) ########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter + +import scala.util.Random + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils} +import org.apache.spark.sql.test.SharedSparkSession + +class PathFilterSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + def createSingleFile(dir: File): File = { + val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv") + stringToFile(file, "text") + } + + def setFileTime(time: LocalDateTime, file: File): Boolean = { + val sameTime = time.toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def formatTime(time: LocalDateTime): String = { + time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + } + + test( Review comment: The overall new tests have similar structure, which can be extracted out. ``` private def executeTest( dir: File, fileDate: LocalDateTime, modifiedBefore: Option[String] = None, modifiedAfter: Option[String] = None, pathGlobFilter: Option[String] = None): DataFrame = { executeTest(dir, Seq(fileDate), modifiedBefore, modifiedAfter, pathGlobFilter) } private def executeTest( dir: File, fileDates: Seq[LocalDateTime], modifiedBefore: Option[String] = None, modifiedAfter: Option[String] = None, pathGlobFilter: Option[String] = None): DataFrame = { fileDates.foreach { fileDate => val file = createSingleFile(dir) setFileTime(fileDate, file) } var dfReader = spark.read.format("csv").option("timeZone", "UTC") modifiedBefore.foreach { opt => dfReader = dfReader.option("modifiedBefore", opt) } modifiedAfter.foreach { opt => dfReader = dfReader.option("modifiedAfter", opt) } pathGlobFilter.foreach { opt => dfReader = dfReader.option("pathGlobFilter", opt) } dfReader.load(dir.getCanonicalPath) } ``` After adding these methods, this test could be changed to below: ``` test( "SPARK-31962: when modifiedBefore specified" + " and sharing same timestamp with file last modified time.") { withTempDir { dir => val time = LocalDateTime.now(ZoneOffset.UTC) val exc = intercept[AnalysisException] { executeTest(dir, time, modifiedBefore = Some(formatTime(time))) } assert(exc.getMessage.contains("Unable to infer schema for CSV")) } } ``` ########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter + +import scala.util.Random + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils} +import org.apache.spark.sql.test.SharedSparkSession + +class PathFilterSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + def createSingleFile(dir: File): File = { Review comment: Probably good to make helper methods private. My preference is moving private methods bottom, but that's only me so not a big deal. ########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter + +import scala.util.Random + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils} +import org.apache.spark.sql.test.SharedSparkSession + +class PathFilterSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + def createSingleFile(dir: File): File = { + val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv") + stringToFile(file, "text") + } + + def setFileTime(time: LocalDateTime, file: File): Boolean = { + val sameTime = time.toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def formatTime(time: LocalDateTime): String = { + time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + } + + test( + "SPARK-31962: when modifiedBefore specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now(ZoneOffset.UTC) + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", formattedTime) + .option("timeZone", "UTC") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("timeZone", "UTC") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with earlier file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setMinusFileTime(time, file, 3) + + val formattedTime = formatTime(time) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with later file last modified time.") { + withTempDir { dir => + createSingleFile(dir) + val time = LocalDateTime.now() + val formattedTime = formatTime(time) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date") { + withTempDir { dir => + val file = createSingleFile(dir) + file.setLastModified(DateTimeUtils.currentTimestamp()) Review comment: Shall we do the similar with below test instead of using specific date/format? ########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter + +import scala.util.Random + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils} +import org.apache.spark.sql.test.SharedSparkSession + +class PathFilterSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + def createSingleFile(dir: File): File = { + val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv") + stringToFile(file, "text") + } + + def setFileTime(time: LocalDateTime, file: File): Boolean = { + val sameTime = time.toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def formatTime(time: LocalDateTime): String = { + time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + } + + test( + "SPARK-31962: when modifiedBefore specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now(ZoneOffset.UTC) + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", formattedTime) + .option("timeZone", "UTC") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("timeZone", "UTC") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with earlier file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setMinusFileTime(time, file, 3) + + val formattedTime = formatTime(time) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with later file last modified time.") { + withTempDir { dir => + createSingleFile(dir) + val time = LocalDateTime.now() + val formattedTime = formatTime(time) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date") { + withTempDir { dir => + val file = createSingleFile(dir) + file.setLastModified(DateTimeUtils.currentTimestamp()) + val df = spark.read + .option("modifiedAfter", "2019-05-10T01:11:00") + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test("SPARK-31962: when modifiedBefore specified with a future date") { + withTempDir { dir => + createSingleFile(dir) + val afterTime = LocalDateTime.now().plusDays(25) + val formattedTime = formatTime(afterTime) + val df = spark.read + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test("SPARK-31962: with modifiedBefore option provided using a past date") { + withTempDir { dir => + val file = createSingleFile(dir) + file.setLastModified(DateTimeUtils.currentTimestamp()) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", "1984-05-01T01:00:00") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date, multiple files, one valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + file1.setLastModified(DateTimeUtils.currentTimestamp()) + file2.setLastModified(0) + + val df = spark.read + .option("modifiedAfter", "2019-05-10T01:11:00") + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date, multiple files, both valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + file1.setLastModified(DateTimeUtils.currentTimestamp()) + file2.setLastModified(DateTimeUtils.currentTimestamp()) + val df = spark.read + .option("modifiedAfter", "2019-05-10T01:11:00") + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 2) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date, multiple files, none valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + file1.setLastModified(0) + file2.setLastModified(0) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", "1984-05-01T01:00:00") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore specified with a future date, " + + "multiple files, both valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + file1.setLastModified(0) + file2.setLastModified(0) + + val time = LocalDateTime.now().plusDays(3) + val formattedTime = formatTime(time) + + val df = spark.read + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 2) + } + } + + test("SPARK-31962: when modifiedBefore specified with a future date, multiple files, one valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + file1.setLastModified(0) + val time = LocalDateTime.now() + setPlusFileTime(time, file2, 3) + + val formattedTime = formatTime(time) + val df = spark.read + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test( + "SPARK-31962: when modifiedBefore specified with a future date, " + + "multiple files, none valid") { + withTempDir { dir => + val file1 = createSingleFile(dir) + val file2 = createSingleFile(dir) + + val time = LocalDateTime + .now() + .minusDays(1) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + + file1.setLastModified(DateTimeUtils.currentTimestamp()) + file2.setLastModified(DateTimeUtils.currentTimestamp()) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", time) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified with a past date and " + + "pathGlobalFilter returning results") { + withTempDir { dir => + createSingleFile(dir) + val df = spark.read + .option("modifiedAfter", "1984-05-10T01:11:00") + .option("pathGlobFilter", "*.csv") + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test( + "SPARK-31962: when modifiedAfter specified with past date " + + "and pathGlobFilter filtering results") { + withTempDir { dir => + createSingleFile(dir) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", "1984-05-01T01:00:00") + .option("pathGlobFilter", "*.txt") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified with future date and " + + "pathGlobFilter returning results") { + withTempDir { dir => + createSingleFile(dir) + + val time = LocalDateTime + .now() + .plusDays(10) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", time) + .option("pathGlobFilter", "*.csv") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified with future date and " + + "pathGlobFilter filtering results") { + withTempDir { dir => + createSingleFile(dir) + + val time = LocalDateTime + .now() + .plusDays(10) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", time) + .option("pathGlobFilter", "*.txt") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter are specified out of range and " + + "pathGlobFilter returning results") { + withTempDir { dir => + createSingleFile(dir) + + val time = LocalDateTime.now().plusDays(10) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .option("pathGlobFilter", "*.csv") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter are specified in range and " + + "pathGlobFilter returning results") { + withTempDir { dir => + createSingleFile(dir) + + val beforeTime = LocalDateTime + .now() + .minusDays(25) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + val afterTime = LocalDateTime + .now() + .plusDays(25) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + + val df = spark.read + .option("modifiedAfter", beforeTime) + .option("modifiedBefore", afterTime) + .option("pathGlobFilter", "*.csv") + .format("csv") + .load(dir.getCanonicalPath) + assert(df.count() == 1) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter are specified in range and " + + "pathGlobFilter filtering results") { + withTempDir { dir => + createSingleFile(dir) + + val beforeTime = LocalDateTime + .now() + .minusDays(25) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + val afterTime = LocalDateTime + .now() + .plusDays(25) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", beforeTime) + .option("modifiedBefore", afterTime) + .option("pathGlobFilter", "*.txt") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test("SPARK-31962: when modifiedAfter is specified with an invalid date") { + withTempDir { dir => + createSingleFile(dir) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", "2024-05+1 01:00:00") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + Seq("The timestamp provided", "modifiedafter", "2024-05+1 01:00:00").foreach { + expectedMsg => + assert(msg.contains(expectedMsg)) + } + } + } + + test("SPARK-31962: modifiedBefore - empty option") { + withTempDir { dir => + createSingleFile(dir) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", "") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert( + msg.contains("The timestamp provided for") + && msg.contains("modifiedbefore")) + } + } + + test("SPARK-31962: modifiedAfter - empty option") { + withTempDir { dir => + createSingleFile(dir) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", "") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + Seq("The timestamp provided", "modifiedafter").foreach { expectedMsg => + assert(msg.contains(expectedMsg)) + } + } + } + + test( + "SPARK-31962: modifiedAfter filter takes into account local timezone " + + "when specified as an option. After UTC.") { + withTempDir { dir => + createSingleFile(dir) + val timeZone = DateTimeUtils.getTimeZone("UTC") + val strategyTime = + ModifiedDateFilter.toThreshold( + LocalDateTime.now(timeZone.toZoneId).toString, + "HST", + "modifiedafter") + + assert( + strategyTime - DateTimeUtils + .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) > 0) + } + } + + test( + "SPARK-31962: modifiedAfter filter takes into account local timezone " + + "when specified as an option. Before UTC.") { + withTempDir { dir => + createSingleFile(dir) + + val timeZone = DateTimeUtils.getTimeZone("UTC") + val strategyTime = + ModifiedDateFilter.toThreshold( + LocalDateTime.now(timeZone.toZoneId).toString, + "HST", + "modifiedafter") + assert( + DateTimeUtils + .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) - strategyTime < 0) + } + } + + test( + "SPARK-31962: modifiedBefore filter takes into account local timezone " + + "when specified as an option. After UTC.") { + withTempDir { dir => + createSingleFile(dir) + val timeZone = DateTimeUtils.getTimeZone("UTC") + val strategyTime = + ModifiedDateFilter.toThreshold( + LocalDateTime.now(timeZone.toZoneId).toString, + "CET", + "modifiedbefore") + assert( + DateTimeUtils + .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) - strategyTime < 0) + } + } + + test( + "SPARK-31962: modifiedBefore filter takes into account local timezone " + + "when specified as an option. Before UTC.") { + withTempDir { dir => + createSingleFile(dir) + val timeZone = DateTimeUtils.getTimeZone("UTC") + val strategyTime = + ModifiedDateFilter.toThreshold( + LocalDateTime.now(timeZone.toZoneId).toString, + "HST", + "modifiedbefore") + assert( + strategyTime - DateTimeUtils.fromUTCTime(DateTimeUtils.currentTimestamp(), "UTC") > 0) + } + } + + test("Option pathGlobFilter: filter files correctly") { Review comment: Just for other reviewers: I see this is simply moved, not a new one. ########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter + +import scala.util.Random + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils} +import org.apache.spark.sql.test.SharedSparkSession + +class PathFilterSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + def createSingleFile(dir: File): File = { + val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv") + stringToFile(file, "text") + } + + def setFileTime(time: LocalDateTime, file: File): Boolean = { + val sameTime = time.toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def formatTime(time: LocalDateTime): String = { + time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + } + + test( + "SPARK-31962: when modifiedBefore specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now(ZoneOffset.UTC) + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", formattedTime) + .option("timeZone", "UTC") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("timeZone", "UTC") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with earlier file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setMinusFileTime(time, file, 3) + + val formattedTime = formatTime(time) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with later file last modified time.") { + withTempDir { dir => + createSingleFile(dir) + val time = LocalDateTime.now() + val formattedTime = formatTime(time) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date") { + withTempDir { dir => + val file = createSingleFile(dir) + file.setLastModified(DateTimeUtils.currentTimestamp()) Review comment: Shall we do the similar with below test instead of using specific date/format? Moreover, it would be consistent if we can go with LocalDateTime in all cases. ########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter + +import scala.util.Random + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils} +import org.apache.spark.sql.test.SharedSparkSession + +class PathFilterSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + def createSingleFile(dir: File): File = { + val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv") + stringToFile(file, "text") + } + + def setFileTime(time: LocalDateTime, file: File): Boolean = { + val sameTime = time.toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = { + val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + def formatTime(time: LocalDateTime): String = { + time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + } + + test( + "SPARK-31962: when modifiedBefore specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now(ZoneOffset.UTC) + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedBefore", formattedTime) + .option("timeZone", "UTC") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedAfter specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("timeZone", "UTC") + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setFileTime(time, file) + val formattedTime = formatTime(time) + + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with earlier file last modified time.") { + withTempDir { dir => + val file = createSingleFile(dir) + val time = LocalDateTime.now() + setMinusFileTime(time, file, 3) + + val formattedTime = formatTime(time) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test( + "SPARK-31962: when modifiedBefore and modifiedAfter option" + + " share same timestamp with later file last modified time.") { + withTempDir { dir => + createSingleFile(dir) + val time = LocalDateTime.now() + val formattedTime = formatTime(time) + val msg = intercept[AnalysisException] { + spark.read + .option("modifiedAfter", formattedTime) + .option("modifiedBefore", formattedTime) + .format("csv") + .load(dir.getCanonicalPath) + }.getMessage + assert(msg.contains("Unable to infer schema for CSV")) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date") { + withTempDir { dir => + val file = createSingleFile(dir) + file.setLastModified(DateTimeUtils.currentTimestamp()) Review comment: Shall we do the similar with below test instead of using specific date/format? Moreover, it would be consistent if we can go with LocalDateTime in all cases whenever possible. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org