cchighman commented on a change in pull request #28841: URL: https://github.com/apache/spark/pull/28841#discussion_r441280950
########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala ########## @@ -19,18 +19,18 @@ package org.apache.spark.sql.execution.datasources import java.io.{File, FileNotFoundException} Review comment: It just occurred to me my styling, while passing lint, is a bit of noise here. I will push a correction as to reflect only meaningful changes. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ########## @@ -176,9 +180,27 @@ object InMemoryFileIndex extends Logging { private[sql] def bulkListLeafFiles( paths: Seq[Path], hadoopConf: Configuration, - filter: PathFilter, + filters: ParArray[PathFilter], sparkSession: SparkSession, areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { + bulkListLeafFiles(paths, hadoopConf, filters, sparkSession, areRootPaths, Map[String, String]()) Review comment: For all other classes that leverage InMemoryFileIndex but are not prepared to pass along any options, the above method is an adapter to add an initialized map for the options parameter at the end. In the implementation, an empty map will just be passed by. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ########## @@ -295,14 +319,14 @@ object InMemoryFileIndex extends Logging { * * @return all children of path that match the specified filter. */ - private def listLeafFiles( - path: Path, + private def listLeafFiles(path: Path, hadoopConf: Configuration, - filter: PathFilter, + filters: ParArray[PathFilter], Review comment: Given how _listLeafFiles_ is right at the center of building file indexes to return back to a caller, there are lots of opportunities here for consumers to require more than one filter / configuration / option. Adding support for any number of filters as opposed to just one filter feels like a good pay off. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ########## @@ -356,26 +380,35 @@ object InMemoryFileIndex extends Logging { bulkListLeafFiles( dirs.map(_.getPath), hadoopConf, - filter, + filters, session, - areRootPaths = false + areRootPaths = false, + parameters = parameters ).flatMap(_._2) case _ => dirs.flatMap { dir => listLeafFiles( dir.getPath, hadoopConf, - filter, + filters, sessionOpt, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isRootPath = false) + isRootPath = false, + parameters = parameters) } } + val fileFilters = (filters ++ FileModifiedDateOption.accept( Review comment: Followed a similar methodlogy as FileStreamOptions or other such classes that aim to deserialize some condition and perform a resulting action. This helps keep everything clean. ########## File path: docs/sql-data-sources-generic-options.md ########## @@ -119,3 +119,14 @@ To load all files recursively, you can use: {% include_example recursive_file_lookup r/RSparkSQLExample.R %} </div> </div> + +### Load Files after Modification Date +`fileModifiedDate` is an option used to only load files after a specified modification date. Review comment: I took a guess here on best place and way to describe the new file data source option. Structured Streaming had a well defined layout to document options such as latestFirst and maxFileAge. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathfilters/FileModifiedDateOption.scala ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.pathfilters + +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter + +import scala.collection.parallel.mutable.ParArray + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.PathFilter + +import org.apache.spark.sql.SparkSession + +/** +SPARK-31962 - Provide option to load files after a specified +date when reading from a folder path. + */ +object FileModifiedDateOption { + def accept(parameters: Map[String, String], + sparkSession: SparkSession, + hadoopConf: Configuration): ParArray[PathFilter] = { + var afterDateSeconds = 0L + val option = "fileModifiedDate" + val filesModifiedAfterDate = parameters.get(option) + val hasModifiedDateOption = filesModifiedAfterDate.isDefined + + if (hasModifiedDateOption) { + filesModifiedAfterDate.foreach(fileDate => { + val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME + val locateDate = LocalDateTime.parse(fileDate, formatter) + afterDateSeconds = locateDate.toEpochSecond(ZoneOffset.UTC) * 1000 + }) + val fileFilter = + new PathFilterIgnoreOldFiles(sparkSession, hadoopConf, afterDateSeconds) + + return ParArray[PathFilter](fileFilter) + } + ParArray[PathFilter]() Review comment: If the parameter isn't specified at all, the check on line 43 evaluates to false and we return back an empty object. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ########## @@ -356,26 +380,35 @@ object InMemoryFileIndex extends Logging { bulkListLeafFiles( dirs.map(_.getPath), hadoopConf, - filter, + filters, session, - areRootPaths = false + areRootPaths = false, + parameters = parameters ).flatMap(_._2) case _ => dirs.flatMap { dir => listLeafFiles( dir.getPath, hadoopConf, - filter, + filters, sessionOpt, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isRootPath = false) + isRootPath = false, + parameters = parameters) } } + val fileFilters = (filters ++ FileModifiedDateOption.accept( + parameters, + sessionOpt.get, + hadoopConf)) + .filter(filter => filter !=null) + val allFiles = topLevelFiles ++ nestedFiles - if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles + if (fileFilters.nonEmpty) allFiles.filter(file => ( + fileFilters.forall(filter => filter != null && filter.accept(file.getPath)))) Review comment: Outside of a class for testing, there was only one other PathFilter extended class that was inline to CommandUtils.scala as seen in this PR. After adding my own for handling the modified date, I placed both classes in the spark.sql.execution.datasources.pathfilters folder. When considering the above code on line 408, if filters exist in the collection, we then want to see if we should filter any resulting files from _allFiles_ before returning back to the caller. The second lambda, _fileFilters.forall_, provides a collective boolean result based upon how an accept method for a given filter decides to respond. Since a ParArray collection is used to parallelize invocation of each field, there wasnt any noticeable performance impact. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala ########## @@ -34,20 +35,10 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex} +import org.apache.spark.sql.execution.datasources.pathfilters.PathFilterIgnoreNonData import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.types._ -/** - * For the purpose of calculating total directory sizes, use this filter to - * ignore some irrelevant files. - * @param stagingDir hive staging dir - */ -class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serializable { Review comment: The pattern used here by a previous committer is pretty nice. I actually wrote this a different way until I realized a Hadoop.fs.Path can be a file. Currently in both file data source and file stream source, we don't have much consistency around where or how we check for options that may filter results. That's what makes this interface implementation for PathFilter pretty cool. You can encapsulate these option implementations into visitor classes and during the normal process of InMemoryFileIndex execution, service both styles with less code. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org