cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r441280950



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
##########
@@ -19,18 +19,18 @@ package org.apache.spark.sql.execution.datasources
 
 import java.io.{File, FileNotFoundException}

Review comment:
       It just occurred to me my styling, while passing lint, is a bit of noise 
here.  I will push a correction as to reflect only meaningful changes.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
##########
@@ -176,9 +180,27 @@ object InMemoryFileIndex extends Logging {
   private[sql] def bulkListLeafFiles(
       paths: Seq[Path],
       hadoopConf: Configuration,
-      filter: PathFilter,
+      filters: ParArray[PathFilter],
       sparkSession: SparkSession,
       areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = {
+    bulkListLeafFiles(paths, hadoopConf, filters, sparkSession, areRootPaths, 
Map[String, String]())

Review comment:
       For all other classes that leverage InMemoryFileIndex but are not 
prepared to pass along any options, the above method is an adapter to add an 
initialized map for the options parameter at the end.  In the implementation, 
an empty map will just be passed by.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
##########
@@ -295,14 +319,14 @@ object InMemoryFileIndex extends Logging {
    *
    * @return all children of path that match the specified filter.
    */
-  private def listLeafFiles(
-      path: Path,
+  private def listLeafFiles(path: Path,
       hadoopConf: Configuration,
-      filter: PathFilter,
+      filters: ParArray[PathFilter],

Review comment:
       Given how _listLeafFiles_ is right at the center of building file 
indexes to return back to a caller, there are lots of opportunities here for 
consumers to require more than one filter / configuration / option.  Adding 
support for any number of filters as opposed to just one filter feels like a 
good pay off.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
##########
@@ -356,26 +380,35 @@ object InMemoryFileIndex extends Logging {
           bulkListLeafFiles(
             dirs.map(_.getPath),
             hadoopConf,
-            filter,
+            filters,
             session,
-            areRootPaths = false
+            areRootPaths = false,
+            parameters = parameters
           ).flatMap(_._2)
         case _ =>
           dirs.flatMap { dir =>
             listLeafFiles(
               dir.getPath,
               hadoopConf,
-              filter,
+              filters,
               sessionOpt,
               ignoreMissingFiles = ignoreMissingFiles,
               ignoreLocality = ignoreLocality,
-              isRootPath = false)
+              isRootPath = false,
+              parameters = parameters)
           }
       }
+      val fileFilters = (filters ++ FileModifiedDateOption.accept(

Review comment:
       Followed a similar methodlogy as FileStreamOptions or other such classes 
that aim to deserialize some condition and perform a resulting action.  This 
helps keep everything clean.

##########
File path: docs/sql-data-sources-generic-options.md
##########
@@ -119,3 +119,14 @@ To load all files recursively, you can use:
 {% include_example recursive_file_lookup r/RSparkSQLExample.R %}
 </div>
 </div>
+
+### Load Files after Modification Date
+`fileModifiedDate` is an option used to only load files after a specified 
modification date. 

Review comment:
       I took a guess here on best place and way to describe the new file data 
source option.  Structured Streaming had a well defined layout to document 
options such as latestFirst and maxFileAge.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathfilters/FileModifiedDateOption.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.pathfilters
+
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.collection.parallel.mutable.ParArray
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.PathFilter
+
+import org.apache.spark.sql.SparkSession
+
+/**
+SPARK-31962 - Provide option to load files after a specified
+date when reading from a folder path.
+  */
+object FileModifiedDateOption {
+  def accept(parameters: Map[String, String],
+             sparkSession: SparkSession,
+             hadoopConf: Configuration): ParArray[PathFilter] = {
+    var afterDateSeconds = 0L
+    val option = "fileModifiedDate"
+    val filesModifiedAfterDate = parameters.get(option)
+    val hasModifiedDateOption = filesModifiedAfterDate.isDefined
+
+    if (hasModifiedDateOption) {
+      filesModifiedAfterDate.foreach(fileDate => {
+        val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME
+        val locateDate = LocalDateTime.parse(fileDate, formatter)
+        afterDateSeconds = locateDate.toEpochSecond(ZoneOffset.UTC) * 1000
+      })
+      val fileFilter =
+        new PathFilterIgnoreOldFiles(sparkSession, hadoopConf, 
afterDateSeconds)
+
+      return ParArray[PathFilter](fileFilter)
+    }
+    ParArray[PathFilter]()

Review comment:
       If the parameter isn't specified at all, the check on line 43 evaluates 
to false and we return back an empty object.  

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
##########
@@ -356,26 +380,35 @@ object InMemoryFileIndex extends Logging {
           bulkListLeafFiles(
             dirs.map(_.getPath),
             hadoopConf,
-            filter,
+            filters,
             session,
-            areRootPaths = false
+            areRootPaths = false,
+            parameters = parameters
           ).flatMap(_._2)
         case _ =>
           dirs.flatMap { dir =>
             listLeafFiles(
               dir.getPath,
               hadoopConf,
-              filter,
+              filters,
               sessionOpt,
               ignoreMissingFiles = ignoreMissingFiles,
               ignoreLocality = ignoreLocality,
-              isRootPath = false)
+              isRootPath = false,
+              parameters = parameters)
           }
       }
+      val fileFilters = (filters ++ FileModifiedDateOption.accept(
+        parameters,
+        sessionOpt.get,
+        hadoopConf))
+          .filter(filter => filter !=null)
+
       val allFiles = topLevelFiles ++ nestedFiles
-      if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else 
allFiles
+      if (fileFilters.nonEmpty) allFiles.filter(file => (
+          fileFilters.forall(filter => filter != null && 
filter.accept(file.getPath))))

Review comment:
       Outside of a class for testing, there was only one other PathFilter 
extended class that was inline to CommandUtils.scala as seen in this PR.  After 
adding my own for handling the modified date, I placed both classes in the 
spark.sql.execution.datasources.pathfilters folder.  
   
   When considering the above code on line 408, if filters exist in the 
collection, we then want to see if we should filter any resulting files from 
_allFiles_ before returning back to the caller.  The second lambda, 
_fileFilters.forall_, provides a collective boolean result based upon how an 
accept method for a given filter decides to respond.  
   
   Since a ParArray collection is used to parallelize invocation of each field, 
there wasnt any noticeable performance impact.  

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -34,20 +35,10 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
InMemoryFileIndex}
+import 
org.apache.spark.sql.execution.datasources.pathfilters.PathFilterIgnoreNonData
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.types._
 
-/**
- * For the purpose of calculating total directory sizes, use this filter to
- * ignore some irrelevant files.
- * @param stagingDir hive staging dir
- */
-class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with 
Serializable {

Review comment:
       The pattern used here by a previous committer is pretty nice.  I 
actually wrote this a different way until I realized a Hadoop.fs.Path can be a 
file.  Currently in both file data source and file stream source, we don't have 
much consistency around where or how we check for options that may filter 
results.
   
   That's what makes this interface implementation for PathFilter pretty cool.  
You can encapsulate these option implementations into visitor classes and 
during the normal process of InMemoryFileIndex execution, service both styles 
with less code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to