Repository: spark
Updated Branches:
  refs/heads/master 7106866c2 -> adc112429


[SPARK-17661][SQL] Consolidate various listLeafFiles implementations

## What changes were proposed in this pull request?
There are 4 listLeafFiles-related functions in Spark:

- ListingFileCatalog.listLeafFiles (which calls 
HadoopFsRelation.listLeafFilesInParallel if the number of paths passed in is 
greater than a threshold; if it is lower, then it has its own serial version 
implemented)
- HadoopFsRelation.listLeafFiles (called only by 
HadoopFsRelation.listLeafFilesInParallel)
- HadoopFsRelation.listLeafFilesInParallel (called only by 
ListingFileCatalog.listLeafFiles)

It is actually very confusing and error prone because there are effectively two 
distinct implementations for the serial version of listing leaf files. As an 
example, SPARK-17599 updated only one of the code path and ignored the other 
one.

This code can be improved by:

- Move all file listing code into ListingFileCatalog, since it is the only 
class that needs this.
- Keep only one function for listing files in serial.

## How was this patch tested?
This change should be covered by existing unit and integration tests. I also 
moved a test case for HadoopFsRelation.shouldFilterOut from 
HadoopFsRelationSuite to ListingFileCatalogSuite.

Author: petermaxlee <petermax...@gmail.com>

Closes #15235 from petermaxlee/SPARK-17661.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/adc11242
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/adc11242
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/adc11242

Branch: refs/heads/master
Commit: adc112429d6fe671e6e8294824a0e41a2b1ec2e0
Parents: 7106866
Author: petermaxlee <petermax...@gmail.com>
Authored: Thu Oct 13 14:16:39 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Oct 13 14:16:39 2016 -0700

----------------------------------------------------------------------
 .../datasources/ListingFileCatalog.scala        | 231 ++++++++++++++-----
 .../datasources/fileSourceInterfaces.scala      | 154 -------------
 .../datasources/HadoopFsRelationSuite.scala     |  11 -
 .../datasources/ListingFileCatalogSuite.scala   |  34 +++
 4 files changed, 206 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/adc11242/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 3253208..a68ae52 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -21,11 +21,14 @@ import java.io.FileNotFoundException
 
 import scala.collection.mutable
 
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
 
 
 /**
@@ -82,73 +85,183 @@ class ListingFileCatalog(
    * This is publicly visible for testing.
    */
   def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
-    if (paths.length >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
-    } else {
-      // Right now, the number of paths is less than the value of
-      // parallelPartitionDiscoveryThreshold. So, we will list file statues at 
the driver.
-      // If there is any child that has more files than the threshold, we will 
use parallel
-      // listing.
-
-      // Dummy jobconf to get to the pathFilter defined in configuration
-      val jobConf = new JobConf(hadoopConf, this.getClass)
-      val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
-
-      val statuses: Seq[FileStatus] = paths.flatMap { path =>
-        val fs = path.getFileSystem(hadoopConf)
-        logTrace(s"Listing $path on driver")
-
-        val childStatuses = {
-          try {
-            val stats = fs.listStatus(path)
-            if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-          } catch {
-            case _: FileNotFoundException =>
-              logWarning(s"The directory $path was not found. Was it deleted 
very recently?")
-              Array.empty[FileStatus]
-          }
-        }
+    val files =
+      if (paths.length >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+        ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, 
sparkSession)
+      } else {
+        ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
+      }
+
+    mutable.LinkedHashSet(files: _*)
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = paths.toSet.hashCode()
+}
+
+
+object ListingFileCatalog extends Logging {
+
+  /** A serializable variant of HDFS's BlockLocation. */
+  private case class SerializableBlockLocation(
+      names: Array[String],
+      hosts: Array[String],
+      offset: Long,
+      length: Long)
+
+  /** A serializable variant of HDFS's FileStatus. */
+  private case class SerializableFileStatus(
+      path: String,
+      length: Long,
+      isDir: Boolean,
+      blockReplication: Short,
+      blockSize: Long,
+      modificationTime: Long,
+      accessTime: Long,
+      blockLocations: Array[SerializableBlockLocation])
+
+  /**
+   * List a collection of path recursively.
+   */
+  private def listLeafFilesInSerial(
+      paths: Seq[Path],
+      hadoopConf: Configuration): Seq[FileStatus] = {
+    // Dummy jobconf to get to the pathFilter defined in configuration
+    val jobConf = new JobConf(hadoopConf, this.getClass)
+    val filter = FileInputFormat.getInputPathFilter(jobConf)
+
+    paths.flatMap { path =>
+      val fs = path.getFileSystem(hadoopConf)
+      listLeafFiles0(fs, path, filter)
+    }
+  }
 
-        childStatuses.map {
-          case f: LocatedFileStatus => f
-
-          // NOTE:
-          //
-          // - Although S3/S3A/S3N file system can be quite slow for remote 
file metadata
-          //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
-          //   implementations don't actually issue RPC for this method.
-          //
-          // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
-          //   be a big deal since we always use to `listLeafFilesInParallel` 
when the number of
-          //   paths exceeds threshold.
-          case f =>
-            if (f.isDirectory ) {
-              // If f is a directory, we do not need to call 
getFileBlockLocations (SPARK-14959).
-              f
-            } else {
-              HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
+  /**
+   * List a collection of path recursively in parallel (using Spark executors).
+   * Each task launched will use [[listLeafFilesInSerial]] to list.
+   */
+  private def listLeafFilesInParallel(
+      paths: Seq[Path],
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Seq[FileStatus] = {
+    assert(paths.size >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+    logInfo(s"Listing leaf files and directories in parallel under: 
${paths.mkString(", ")}")
+
+    val sparkContext = sparkSession.sparkContext
+    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+    val serializedPaths = paths.map(_.toString)
+
+    // Set the number of parallelism to prevent following file listing from 
generating many tasks
+    // in case of large #defaultParallelism.
+    val numParallelism = Math.min(paths.size, 10000)
+
+    val statuses = sparkContext
+      .parallelize(serializedPaths, numParallelism)
+      .mapPartitions { paths =>
+        val hadoopConf = serializableConfiguration.value
+        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, 
hadoopConf).iterator
+      }.map { status =>
+        // Turn FileStatus into SerializableFileStatus so we can send it back 
to the driver
+        val blockLocations = status match {
+          case f: LocatedFileStatus =>
+            f.getBlockLocations.map { loc =>
+              SerializableBlockLocation(
+                loc.getNames,
+                loc.getHosts,
+                loc.getOffset,
+                loc.getLength)
             }
+
+          case _ =>
+            Array.empty[SerializableBlockLocation]
         }
-      }.filterNot { status =>
-        val name = status.getPath.getName
-        HadoopFsRelation.shouldFilterOut(name)
-      }
 
-      val (dirs, files) = statuses.partition(_.isDirectory)
+        SerializableFileStatus(
+          status.getPath.toString,
+          status.getLen,
+          status.isDirectory,
+          status.getReplication,
+          status.getBlockSize,
+          status.getModificationTime,
+          status.getAccessTime,
+          blockLocations)
+      }.collect()
 
-      // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-      if (dirs.isEmpty) {
-        mutable.LinkedHashSet(files: _*)
-      } else {
-        mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
+    // Turn SerializableFileStatus back to Status
+    statuses.map { f =>
+      val blockLocations = f.blockLocations.map { loc =>
+        new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
       }
+      new LocatedFileStatus(
+        new FileStatus(
+          f.length, f.isDir, f.blockReplication, f.blockSize, 
f.modificationTime, new Path(f.path)),
+        blockLocations)
     }
   }
 
-  override def equals(other: Any): Boolean = other match {
-    case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
-    case _ => false
+  /**
+   * List a single path, provided as a FileStatus, in serial.
+   */
+  private def listLeafFiles0(
+      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+    logTrace(s"Listing $path")
+    val name = path.getName.toLowerCase
+    if (shouldFilterOut(name)) {
+      Seq.empty[FileStatus]
+    } else {
+      // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't 
exist
+      // Note that statuses only include FileStatus for the files and dirs 
directly under path,
+      // and does not include anything else recursively.
+      val statuses = try fs.listStatus(path) catch {
+        case _: FileNotFoundException =>
+          logWarning(s"The directory $path was not found. Was it deleted very 
recently?")
+          Array.empty[FileStatus]
+      }
+
+      val allLeafStatuses = {
+        val (dirs, files) = statuses.partition(_.isDirectory)
+        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, 
dir.getPath, filter))
+        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else 
stats
+      }
+
+      allLeafStatuses.filterNot(status => 
shouldFilterOut(status.getPath.getName)).map {
+        case f: LocatedFileStatus =>
+          f
+
+        // NOTE:
+        //
+        // - Although S3/S3A/S3N file system can be quite slow for remote file 
metadata
+        //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
+        //   implementations don't actually issue RPC for this method.
+        //
+        // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
+        //   be a big deal since we always use to `listLeafFilesInParallel` 
when the number of
+        //   paths exceeds threshold.
+        case f =>
+          // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
+          // which is very slow on some file system (RawLocalFileSystem, which 
is launch a
+          // subprocess and parse the stdout).
+          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
+            f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
+          if (f.isSymlink) {
+            lfs.setSymlink(f.getSymlink)
+          }
+          lfs
+      }
+    }
   }
 
-  override def hashCode(): Int = paths.toSet.hashCode()
+  /** Checks if we should filter out this path name. */
+  def shouldFilterOut(pathName: String): Boolean = {
+    // We filter everything that starts with _ and ., except _common_metadata 
and _metadata
+    // because Parquet needs to find those metadata files from leaf files 
returned by this method.
+    // We should refactor this logic to not mix metadata files with data files.
+    ((pathName.startsWith("_") && !pathName.contains("=")) || 
pathName.startsWith(".")) &&
+      !pathName.startsWith("_common_metadata") && 
!pathName.startsWith("_metadata")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/adc11242/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 5cc5f32..69dd622 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -17,16 +17,12 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import scala.collection.mutable
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.io.compress.{CompressionCodecFactory, 
SplittableCompressionCodec}
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -35,7 +31,6 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
 
 /**
  * ::Experimental::
@@ -352,152 +347,3 @@ trait FileCatalog {
   /** Refresh the file listing */
   def refresh(): Unit
 }
-
-
-/**
- * Helper methods for gathering metadata from HDFS.
- */
-object HadoopFsRelation extends Logging {
-
-  /** Checks if we should filter out this path name. */
-  def shouldFilterOut(pathName: String): Boolean = {
-    // We filter everything that starts with _ and ., except _common_metadata 
and _metadata
-    // because Parquet needs to find those metadata files from leaf files 
returned by this method.
-    // We should refactor this logic to not mix metadata files with data files.
-    ((pathName.startsWith("_") && !pathName.contains("=")) || 
pathName.startsWith(".")) &&
-      !pathName.startsWith("_common_metadata") && 
!pathName.startsWith("_metadata")
-  }
-
-  /**
-   * Create a LocatedFileStatus using FileStatus and block locations.
-   */
-  def createLocatedFileStatus(f: FileStatus, locations: Array[BlockLocation]): 
LocatedFileStatus = {
-    // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(), which is
-    // very slow on some file system (RawLocalFileSystem, which is launch a 
subprocess and parse the
-    // stdout).
-    val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, 
f.getBlockSize,
-      f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
-    if (f.isSymlink) {
-      lfs.setSymlink(f.getSymlink)
-    }
-    lfs
-  }
-
-  // We don't filter files/directories whose name start with "_" except 
"_temporary" here, as
-  // specific data sources may take advantages over them (e.g. Parquet 
_metadata and
-  // _common_metadata files). "_temporary" directories are explicitly ignored 
since failed
-  // tasks/jobs may leave partial/corrupted data files there.  Files and 
directories whose name
-  // start with "." are also ignored.
-  def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): 
Array[FileStatus] = {
-    logTrace(s"Listing ${status.getPath}")
-    val name = status.getPath.getName.toLowerCase
-    if (shouldFilterOut(name)) {
-      Array.empty[FileStatus]
-    } else {
-      val statuses = {
-        val (dirs, files) = 
fs.listStatus(status.getPath).partition(_.isDirectory)
-        val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, 
filter))
-        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else 
stats
-      }
-      // statuses do not have any dirs.
-      statuses.filterNot(status => 
shouldFilterOut(status.getPath.getName)).map {
-        case f: LocatedFileStatus => f
-
-        // NOTE:
-        //
-        // - Although S3/S3A/S3N file system can be quite slow for remote file 
metadata
-        //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
-        //   implementations don't actually issue RPC for this method.
-        //
-        // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
-        //   be a big deal since we always use to `listLeafFilesInParallel` 
when the number of
-        //   paths exceeds threshold.
-        case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, 
f.getLen))
-      }
-    }
-  }
-
-  // `FileStatus` is Writable but not serializable.  What make it worse, 
somehow it doesn't play
-  // well with `SerializableWritable`.  So there seems to be no way to 
serialize a `FileStatus`.
-  // Here we use `FakeFileStatus` to extract key components of a `FileStatus` 
to serialize it from
-  // executor side and reconstruct it on driver side.
-  case class FakeBlockLocation(
-      names: Array[String],
-      hosts: Array[String],
-      offset: Long,
-      length: Long)
-
-  case class FakeFileStatus(
-      path: String,
-      length: Long,
-      isDir: Boolean,
-      blockReplication: Short,
-      blockSize: Long,
-      modificationTime: Long,
-      accessTime: Long,
-      blockLocations: Array[FakeBlockLocation])
-
-  def listLeafFilesInParallel(
-      paths: Seq[Path],
-      hadoopConf: Configuration,
-      sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
-    assert(paths.size >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
-    logInfo(s"Listing leaf files and directories in parallel under: 
${paths.mkString(", ")}")
-
-    val sparkContext = sparkSession.sparkContext
-    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
-    val serializedPaths = paths.map(_.toString)
-
-    // Set the number of parallelism to prevent following file listing from 
generating many tasks
-    // in case of large #defaultParallelism.
-    val numParallelism = Math.min(paths.size, 10000)
-
-    val fakeStatuses = sparkContext
-        .parallelize(serializedPaths, numParallelism)
-        .mapPartitions { paths =>
-      // Dummy jobconf to get to the pathFilter defined in configuration
-      // It's very expensive to create a JobConf(ClassUtil.findContainingJar() 
is slow)
-      val jobConf = new JobConf(serializableConfiguration.value, this.getClass)
-      val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
-      paths.map(new Path(_)).flatMap { path =>
-        val fs = path.getFileSystem(serializableConfiguration.value)
-        listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
-      }
-    }.map { status =>
-      val blockLocations = status match {
-        case f: LocatedFileStatus =>
-          f.getBlockLocations.map { loc =>
-            FakeBlockLocation(
-              loc.getNames,
-              loc.getHosts,
-              loc.getOffset,
-              loc.getLength)
-          }
-
-        case _ =>
-          Array.empty[FakeBlockLocation]
-      }
-
-      FakeFileStatus(
-        status.getPath.toString,
-        status.getLen,
-        status.isDirectory,
-        status.getReplication,
-        status.getBlockSize,
-        status.getModificationTime,
-        status.getAccessTime,
-        blockLocations)
-    }.collect()
-
-    val hadoopFakeStatuses = fakeStatuses.map { f =>
-      val blockLocations = f.blockLocations.map { loc =>
-        new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
-      }
-      new LocatedFileStatus(
-        new FileStatus(
-          f.length, f.isDir, f.blockReplication, f.blockSize, 
f.modificationTime, new Path(f.path)),
-        blockLocations)
-    }
-    mutable.LinkedHashSet(hadoopFakeStatuses: _*)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/adc11242/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
index 3c68dc8..89d5765 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
@@ -39,15 +39,4 @@ class HadoopFsRelationSuite extends QueryTest with 
SharedSQLContext {
       assert(df.queryExecution.logical.statistics.sizeInBytes === 
BigInt(totalSize))
     }
   }
-
-  test("file filtering") {
-    assert(!HadoopFsRelation.shouldFilterOut("abcd"))
-    assert(HadoopFsRelation.shouldFilterOut(".ab"))
-    assert(HadoopFsRelation.shouldFilterOut("_cd"))
-
-    assert(!HadoopFsRelation.shouldFilterOut("_metadata"))
-    assert(!HadoopFsRelation.shouldFilterOut("_common_metadata"))
-    assert(HadoopFsRelation.shouldFilterOut("_ab_metadata"))
-    assert(HadoopFsRelation.shouldFilterOut("_cd_common_metadata"))
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/adc11242/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
new file mode 100644
index 0000000..f15730a
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.SparkFunSuite
+
+class ListingFileCatalogSuite extends SparkFunSuite {
+
+  test("file filtering") {
+    assert(!ListingFileCatalog.shouldFilterOut("abcd"))
+    assert(ListingFileCatalog.shouldFilterOut(".ab"))
+    assert(ListingFileCatalog.shouldFilterOut("_cd"))
+
+    assert(!ListingFileCatalog.shouldFilterOut("_metadata"))
+    assert(!ListingFileCatalog.shouldFilterOut("_common_metadata"))
+    assert(ListingFileCatalog.shouldFilterOut("_ab_metadata"))
+    assert(ListingFileCatalog.shouldFilterOut("_cd_common_metadata"))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to