[SPARK-18103][SQL] Rename *FileCatalog to *FileIndex ## What changes were proposed in this pull request?
To reduce the number of components in SQL named *Catalog, rename *FileCatalog to *FileIndex. A FileIndex is responsible for returning the list of partitions / files to scan given a filtering expression. ``` TableFileCatalog => CatalogFileIndex FileCatalog => FileIndex ListingFileCatalog => InMemoryFileIndex MetadataLogFileCatalog => MetadataLogFileIndex PrunedTableFileCatalog => PrunedInMemoryFileIndex ``` cc yhuai marmbrus ## How was this patch tested? N/A Author: Eric Liang <e...@databricks.com> Author: Eric Liang <ekhli...@gmail.com> Closes #15634 from ericl/rename-file-provider. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d3b91f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d3b91f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d3b91f Branch: refs/heads/master Commit: 90d3b91f4cb59d84fea7105d54ef8c87a7d5c6a2 Parents: 3ad99f1 Author: Eric Liang <e...@databricks.com> Authored: Sun Oct 30 13:14:45 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Sun Oct 30 13:14:45 2016 -0700 ---------------------------------------------------------------------- .../spark/metrics/source/StaticSources.scala | 2 +- .../spark/sql/execution/CacheManager.scala | 2 +- .../datasources/CatalogFileIndex.scala | 110 +++++ .../sql/execution/datasources/DataSource.scala | 10 +- .../sql/execution/datasources/FileCatalog.scala | 70 --- .../sql/execution/datasources/FileIndex.scala | 70 +++ .../datasources/HadoopFsRelation.scala | 4 +- .../datasources/InMemoryFileIndex.scala | 87 ++++ .../datasources/ListingFileCatalog.scala | 87 ---- .../PartitioningAwareFileCatalog.scala | 437 ------------------- .../PartitioningAwareFileIndex.scala | 437 +++++++++++++++++++ .../datasources/PruneFileSourcePartitions.scala | 6 +- .../datasources/TableFileCatalog.scala | 110 ----- .../streaming/CompactibleFileStreamLog.scala | 4 +- .../execution/streaming/FileStreamSource.scala | 4 +- .../streaming/MetadataLogFileCatalog.scala | 6 +- .../datasources/FileCatalogSuite.scala | 36 +- .../datasources/FileSourceStrategySuite.scala | 2 +- .../ParquetPartitionDiscoverySuite.scala | 2 +- .../sql/streaming/FileStreamSinkSuite.scala | 6 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 4 +- .../spark/sql/hive/CachedTableSuite.scala | 10 +- .../hive/PartitionedTablePerfStatsSuite.scala | 2 +- .../PruneFileSourcePartitionsSuite.scala | 6 +- 25 files changed, 758 insertions(+), 758 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala index b54885b..3f7cfd9 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala @@ -76,7 +76,7 @@ object HiveCatalogMetrics extends Source { val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched")) /** - * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog. + * Tracks the total number of files discovered off of the filesystem by InMemoryFileIndex. */ val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered")) http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index fb72c67..526623a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -177,7 +177,7 @@ class CacheManager extends Logging { /** * Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the - * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes + * [[org.apache.spark.sql.execution.datasources.FileIndex]] of any [[HadoopFsRelation]] nodes * in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns * false. */ http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala new file mode 100644 index 0000000..092aabc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.StructType + + +/** + * A [[FileIndex]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param table the metadata of the table + * @param sizeInBytes the table's data size in bytes + */ +class CatalogFileIndex( + sparkSession: SparkSession, + val table: CatalogTable, + override val sizeInBytes: Long) extends FileIndex { + + protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val fileStatusCache = FileStatusCache.newCache(sparkSession) + + assert(table.identifier.database.isDefined, + "The table identifier must be qualified in CatalogFileIndex") + + private val baseLocation = table.storage.locationUri + + override def partitionSchema: StructType = table.partitionSchema + + override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq + + override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { + filterPartitions(filters).listFiles(Nil) + } + + override def refresh(): Unit = fileStatusCache.invalidateAll() + + /** + * Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions + * specified by the given partition-pruning filters. + * + * @param filters partition-pruning filters + */ + def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = { + if (table.partitionColumnNames.nonEmpty) { + val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( + table.identifier, filters) + val partitions = selectedPartitions.map { p => + PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get) + } + val partitionSpec = PartitionSpec(partitionSchema, partitions) + new PrunedInMemoryFileIndex( + sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec) + } else { + new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, None) + } + } + + override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles + + // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member + // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to + // implement `equals` and `hashCode` here, to make it work with cache lookup. + override def equals(o: Any): Boolean = o match { + case other: CatalogFileIndex => this.table.identifier == other.table.identifier + case _ => false + } + + override def hashCode(): Int = table.identifier.hashCode() +} + +/** + * An override of the standard HDFS listing based catalog, that overrides the partition spec with + * the information from the metastore. + * + * @param tableBasePath The default base path of the Hive metastore table + * @param partitionSpec The partition specifications from Hive metastore + */ +private class PrunedInMemoryFileIndex( + sparkSession: SparkSession, + tableBasePath: Path, + fileStatusCache: FileStatusCache, + override val partitionSpec: PartitionSpec) + extends InMemoryFileIndex( + sparkSession, + partitionSpec.partitions.map(_.path), + Map.empty, + Some(partitionSpec.partitionColumns), + fileStatusCache) http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5b8f05a..9961098 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -202,7 +202,7 @@ case class DataSource( val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None) + val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None) val partitionSchema = fileCatalog.partitionSpec().partitionColumns val inferred = format.inferSchema( sparkSession, @@ -364,7 +364,7 @@ case class DataSource( case (format: FileFormat, _) if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) - val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath) + val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath) val dataSchema = userSpecifiedSchema.orElse { format.inferSchema( sparkSession, @@ -417,12 +417,12 @@ case class DataSource( val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) { - new TableFileCatalog( + new CatalogFileIndex( sparkSession, catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L)) } else { - new ListingFileCatalog( + new InMemoryFileIndex( sparkSession, globbedPaths, options, partitionSchema) } @@ -433,7 +433,7 @@ case class DataSource( format.inferSchema( sparkSession, caseInsensitiveOptions, - fileCatalog.asInstanceOf[ListingFileCatalog].allFiles()) + fileCatalog.asInstanceOf[InMemoryFileIndex].allFiles()) }.getOrElse { throw new AnalysisException( s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " + http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala deleted file mode 100644 index dba6462..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import org.apache.hadoop.fs._ - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.StructType - -/** - * A collection of data files from a partitioned relation, along with the partition values in the - * form of an [[InternalRow]]. - */ -case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus]) - -/** - * An interface for objects capable of enumerating the root paths of a relation as well as the - * partitions of a relation subject to some pruning expressions. - */ -trait FileCatalog { - - /** - * Returns the list of root input paths from which the catalog will get files. There may be a - * single root path from which partitions are discovered, or individual partitions may be - * specified by each path. - */ - def rootPaths: Seq[Path] - - /** - * Returns all valid files grouped into partitions when the data is partitioned. If the data is - * unpartitioned, this will return a single partition with no partition values. - * - * @param filters The filters used to prune which partitions are returned. These filters must - * only refer to partition columns and this method will only return files - * where these predicates are guaranteed to evaluate to `true`. Thus, these - * filters will not need to be evaluated again on the returned data. - */ - def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] - - /** - * Returns the list of files that will be read when scanning this relation. This call may be - * very expensive for large tables. - */ - def inputFiles: Array[String] - - /** Refresh any cached file listings */ - def refresh(): Unit - - /** Sum of table file sizes, in bytes */ - def sizeInBytes: Long - - /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */ - def partitionSchema: StructType -} http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala new file mode 100644 index 0000000..277223d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.StructType + +/** + * A collection of data files from a partitioned relation, along with the partition values in the + * form of an [[InternalRow]]. + */ +case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus]) + +/** + * An interface for objects capable of enumerating the root paths of a relation as well as the + * partitions of a relation subject to some pruning expressions. + */ +trait FileIndex { + + /** + * Returns the list of root input paths from which the catalog will get files. There may be a + * single root path from which partitions are discovered, or individual partitions may be + * specified by each path. + */ + def rootPaths: Seq[Path] + + /** + * Returns all valid files grouped into partitions when the data is partitioned. If the data is + * unpartitioned, this will return a single partition with no partition values. + * + * @param filters The filters used to prune which partitions are returned. These filters must + * only refer to partition columns and this method will only return files + * where these predicates are guaranteed to evaluate to `true`. Thus, these + * filters will not need to be evaluated again on the returned data. + */ + def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] + + /** + * Returns the list of files that will be read when scanning this relation. This call may be + * very expensive for large tables. + */ + def inputFiles: Array[String] + + /** Refresh any cached file listings */ + def refresh(): Unit + + /** Sum of table file sizes, in bytes */ + def sizeInBytes: Long + + /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */ + def partitionSchema: StructType +} http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index afad889..014abd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType * Acts as a container for all of the metadata required to read from a datasource. All discovery, * resolution and merging logic for schemas and partitions has been removed. * - * @param location A [[FileCatalog]] that can enumerate the locations of all the files that + * @param location A [[FileIndex]] that can enumerate the locations of all the files that * comprise this relation. * @param partitionSchema The schema of the columns (if any) that are used to partition the relation * @param dataSchema The schema of any remaining columns. Note that if any partition columns are @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType * @param options Configuration used when reading / writing data. */ case class HadoopFsRelation( - location: FileCatalog, + location: FileIndex, partitionSchema: StructType, dataSchema: StructType, bucketSpec: Option[BucketSpec], http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala new file mode 100644 index 0000000..7531f0a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import scala.collection.mutable + +import org.apache.hadoop.fs._ + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType + + +/** + * A [[FileIndex]] that generates the list of files to process by recursively listing all the + * files present in `paths`. + * + * @param rootPaths the list of root table paths to scan + * @param parameters as set of options to control discovery + * @param partitionSchema an optional partition schema that will be use to provide types for the + * discovered partitions + */ +class InMemoryFileIndex( + sparkSession: SparkSession, + override val rootPaths: Seq[Path], + parameters: Map[String, String], + partitionSchema: Option[StructType], + fileStatusCache: FileStatusCache = NoopCache) + extends PartitioningAwareFileIndex( + sparkSession, parameters, partitionSchema, fileStatusCache) { + + @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ + @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ + @volatile private var cachedPartitionSpec: PartitionSpec = _ + + refresh0() + + override def partitionSpec(): PartitionSpec = { + if (cachedPartitionSpec == null) { + cachedPartitionSpec = inferPartitioning() + } + logTrace(s"Partition spec: $cachedPartitionSpec") + cachedPartitionSpec + } + + override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { + cachedLeafFiles + } + + override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { + cachedLeafDirToChildrenFiles + } + + override def refresh(): Unit = { + refresh0() + fileStatusCache.invalidateAll() + } + + private def refresh0(): Unit = { + val files = listLeafFiles(rootPaths) + cachedLeafFiles = + new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) + cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) + cachedPartitionSpec = null + } + + override def equals(other: Any): Boolean = other match { + case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet + case _ => false + } + + override def hashCode(): Int = rootPaths.toSet.hashCode() +} http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala deleted file mode 100644 index d9d5883..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import scala.collection.mutable - -import org.apache.hadoop.fs._ - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.types.StructType - - -/** - * A [[FileCatalog]] that generates the list of files to process by recursively listing all the - * files present in `paths`. - * - * @param rootPaths the list of root table paths to scan - * @param parameters as set of options to control discovery - * @param partitionSchema an optional partition schema that will be use to provide types for the - * discovered partitions - */ -class ListingFileCatalog( - sparkSession: SparkSession, - override val rootPaths: Seq[Path], - parameters: Map[String, String], - partitionSchema: Option[StructType], - fileStatusCache: FileStatusCache = NoopCache) - extends PartitioningAwareFileCatalog( - sparkSession, parameters, partitionSchema, fileStatusCache) { - - @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ - @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ - @volatile private var cachedPartitionSpec: PartitionSpec = _ - - refresh0() - - override def partitionSpec(): PartitionSpec = { - if (cachedPartitionSpec == null) { - cachedPartitionSpec = inferPartitioning() - } - logTrace(s"Partition spec: $cachedPartitionSpec") - cachedPartitionSpec - } - - override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { - cachedLeafFiles - } - - override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { - cachedLeafDirToChildrenFiles - } - - override def refresh(): Unit = { - refresh0() - fileStatusCache.invalidateAll() - } - - private def refresh0(): Unit = { - val files = listLeafFiles(rootPaths) - cachedLeafFiles = - new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) - cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) - cachedPartitionSpec = null - } - - override def equals(other: Any): Boolean = other match { - case hdfs: ListingFileCatalog => rootPaths.toSet == hdfs.rootPaths.toSet - case _ => false - } - - override def hashCode(): Int = rootPaths.toSet.hashCode() -} http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala deleted file mode 100644 index cc4049e..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ /dev/null @@ -1,437 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import java.io.FileNotFoundException - -import scala.collection.mutable - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs._ -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} - -import org.apache.spark.internal.Logging -import org.apache.spark.metrics.source.HiveCatalogMetrics -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.{expressions, InternalRow} -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{StringType, StructType} -import org.apache.spark.util.SerializableConfiguration - -/** - * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables. - * It provides the necessary methods to parse partition data based on a set of files. - * - * @param parameters as set of options to control partition discovery - * @param userPartitionSchema an optional partition schema that will be use to provide types for - * the discovered partitions - */ -abstract class PartitioningAwareFileCatalog( - sparkSession: SparkSession, - parameters: Map[String, String], - userPartitionSchema: Option[StructType], - fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging { - import PartitioningAwareFileCatalog.BASE_PATH_PARAM - - /** Returns the specification of the partitions inferred from the data. */ - def partitionSpec(): PartitionSpec - - override def partitionSchema: StructType = partitionSpec().partitionColumns - - protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) - - protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] - - protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] - - override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { - val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil - } else { - prunePartitions(filters, partitionSpec()).map { - case PartitionPath(values, path) => - val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { - case Some(existingDir) => - // Directory has children files in it, return them - existingDir.filter(f => isDataPath(f.getPath)) - - case None => - // Directory does not exist, or has no children files - Nil - } - PartitionDirectory(values, files) - } - } - logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) - selectedPartitions - } - - /** Returns the list of files that will be read when scanning this relation. */ - override def inputFiles: Array[String] = - allFiles().map(_.getPath.toUri.toString).toArray - - override def sizeInBytes: Long = allFiles().map(_.getLen).sum - - def allFiles(): Seq[FileStatus] = { - if (partitionSpec().partitionColumns.isEmpty) { - // For each of the root input paths, get the list of files inside them - rootPaths.flatMap { path => - // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). - val fs = path.getFileSystem(hadoopConf) - val qualifiedPathPre = fs.makeQualified(path) - val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) { - // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories, - // because the `leafFile.getParent` would have returned an absolute path with the - // separator at the end. - new Path(qualifiedPathPre, Path.SEPARATOR) - } else { - qualifiedPathPre - } - - // There are three cases possible with each path - // 1. The path is a directory and has children files in it. Then it must be present in - // leafDirToChildrenFiles as those children files will have been found as leaf files. - // Find its children files from leafDirToChildrenFiles and include them. - // 2. The path is a file, then it will be present in leafFiles. Include this path. - // 3. The path is a directory, but has no children files. Do not include this path. - - leafDirToChildrenFiles.get(qualifiedPath) - .orElse { leafFiles.get(qualifiedPath).map(Array(_)) } - .getOrElse(Array.empty) - } - } else { - leafFiles.values.toSeq - } - } - - protected def inferPartitioning(): PartitionSpec = { - // We use leaf dirs containing data files to discover the schema. - val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => - files.exists(f => isDataPath(f.getPath)) - }.keys.toSeq - userPartitionSchema match { - case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => - val spec = PartitioningUtils.parsePartitions( - leafDirs, - PartitioningUtils.DEFAULT_PARTITION_NAME, - typeInference = false, - basePaths = basePaths) - - // Without auto inference, all of value in the `row` should be null or in StringType, - // we need to cast into the data type that user specified. - def castPartitionValuesToUserSchema(row: InternalRow) = { - InternalRow((0 until row.numFields).map { i => - Cast( - Literal.create(row.getUTF8String(i), StringType), - userProvidedSchema.fields(i).dataType).eval() - }: _*) - } - - PartitionSpec(userProvidedSchema, spec.partitions.map { part => - part.copy(values = castPartitionValuesToUserSchema(part.values)) - }) - case _ => - PartitioningUtils.parsePartitions( - leafDirs, - PartitioningUtils.DEFAULT_PARTITION_NAME, - typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, - basePaths = basePaths) - } - } - - private def prunePartitions( - predicates: Seq[Expression], - partitionSpec: PartitionSpec): Seq[PartitionPath] = { - val PartitionSpec(partitionColumns, partitions) = partitionSpec - val partitionColumnNames = partitionColumns.map(_.name).toSet - val partitionPruningPredicates = predicates.filter { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - - if (partitionPruningPredicates.nonEmpty) { - val predicate = partitionPruningPredicates.reduce(expressions.And) - - val boundPredicate = InterpretedPredicate.create(predicate.transform { - case a: AttributeReference => - val index = partitionColumns.indexWhere(a.name == _.name) - BoundReference(index, partitionColumns(index).dataType, nullable = true) - }) - - val selected = partitions.filter { - case PartitionPath(values, _) => boundPredicate(values) - } - logInfo { - val total = partitions.length - val selectedSize = selected.length - val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 - s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions." - } - - selected - } else { - partitions - } - } - - /** - * Contains a set of paths that are considered as the base dirs of the input datasets. - * The partitioning discovery logic will make sure it will stop when it reaches any - * base path. - * - * By default, the paths of the dataset provided by users will be base paths. - * Below are three typical examples, - * Case 1) `spark.read.parquet("/path/something=true/")`: the base path will be - * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`. - * Case 2) `spark.read.parquet("/path/something=true/a.parquet")`: the base path will be - * still `/path/something=true/`, and the returned DataFrame will also not contain a column of - * `something`. - * Case 3) `spark.read.parquet("/path/")`: the base path will be `/path/`, and the returned - * DataFrame will have the column of `something`. - * - * Users also can override the basePath by setting `basePath` in the options to pass the new base - * path to the data source. - * For example, `spark.read.option("basePath", "/path/").parquet("/path/something=true/")`, - * and the returned DataFrame will have the column of `something`. - */ - private def basePaths: Set[Path] = { - parameters.get(BASE_PATH_PARAM).map(new Path(_)) match { - case Some(userDefinedBasePath) => - val fs = userDefinedBasePath.getFileSystem(hadoopConf) - if (!fs.isDirectory(userDefinedBasePath)) { - throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory") - } - Set(fs.makeQualified(userDefinedBasePath)) - - case None => - rootPaths.map { path => - // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). - val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path) - if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet - } - } - - // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be - // counted as data files, so that they shouldn't participate partition discovery. - private def isDataPath(path: Path): Boolean = { - val name = path.getName - !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) - } - - /** - * List leaf files of given paths. This method will submit a Spark job to do parallel - * listing whenever there is a path having more files than the parallel partition discovery - * discovery threshold. - * - * This is publicly visible for testing. - */ - def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - val output = mutable.LinkedHashSet[FileStatus]() - val pathsToFetch = mutable.ArrayBuffer[Path]() - for (path <- paths) { - fileStatusCache.getLeafFiles(path) match { - case Some(files) => - HiveCatalogMetrics.incrementFileCacheHits(files.length) - output ++= files - case None => - pathsToFetch += path - } - } - val discovered = if (pathsToFetch.length >= - sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - PartitioningAwareFileCatalog.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession) - } else { - PartitioningAwareFileCatalog.listLeafFilesInSerial(pathsToFetch, hadoopConf) - } - discovered.foreach { case (path, leafFiles) => - HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) - fileStatusCache.putLeafFiles(path, leafFiles.toArray) - output ++= leafFiles - } - output - } -} - -object PartitioningAwareFileCatalog extends Logging { - val BASE_PATH_PARAM = "basePath" - - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) - - /** - * List a collection of path recursively. - */ - private def listLeafFilesInSerial( - paths: Seq[Path], - hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = { - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val filter = FileInputFormat.getInputPathFilter(jobConf) - - paths.map { path => - val fs = path.getFileSystem(hadoopConf) - (path, listLeafFiles0(fs, path, filter)) - } - } - - /** - * List a collection of path recursively in parallel (using Spark executors). - * Each task launched will use [[listLeafFilesInSerial]] to list. - */ - private def listLeafFilesInParallel( - paths: Seq[Path], - hadoopConf: Configuration, - sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { - assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) - logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") - - val sparkContext = sparkSession.sparkContext - val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val serializedPaths = paths.map(_.toString) - - // Set the number of parallelism to prevent following file listing from generating many tasks - // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, 10000) - - val statusMap = sparkContext - .parallelize(serializedPaths, numParallelism) - .mapPartitions { paths => - val hadoopConf = serializableConfiguration.value - listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator - }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - } - (path.toString, serializableStatuses) - }.collect() - - // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses) => - val statuses = serializableStatuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, - new Path(f.path)), - blockLocations) - } - (new Path(path), statuses) - } - } - - /** - * List a single path, provided as a FileStatus, in serial. - */ - private def listLeafFiles0( - fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = { - logTrace(s"Listing $path") - val name = path.getName.toLowerCase - if (shouldFilterOut(name)) { - Seq.empty[FileStatus] - } else { - // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist - // Note that statuses only include FileStatus for the files and dirs directly under path, - // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { - case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } - - val allLeafStatuses = { - val (dirs, files) = statuses.partition(_.isDirectory) - val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter)) - if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats - } - - allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { - case f: LocatedFileStatus => - f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - val locations = fs.getFileBlockLocations(f, 0, f.getLen) - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) - } - lfs - } - } - } - - /** Checks if we should filter out this path name. */ - def shouldFilterOut(pathName: String): Boolean = { - // We filter everything that starts with _ and ., except _common_metadata and _metadata - // because Parquet needs to find those metadata files from leaf files returned by this method. - // We should refactor this logic to not mix metadata files with data files. - ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && - !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala new file mode 100644 index 0000000..a8a722d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.FileNotFoundException + +import scala.collection.mutable + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} + +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.util.SerializableConfiguration + +/** + * An abstract class that represents [[FileIndex]]s that are aware of partitioned tables. + * It provides the necessary methods to parse partition data based on a set of files. + * + * @param parameters as set of options to control partition discovery + * @param userPartitionSchema an optional partition schema that will be use to provide types for + * the discovered partitions + */ +abstract class PartitioningAwareFileIndex( + sparkSession: SparkSession, + parameters: Map[String, String], + userPartitionSchema: Option[StructType], + fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging { + import PartitioningAwareFileIndex.BASE_PATH_PARAM + + /** Returns the specification of the partitions inferred from the data. */ + def partitionSpec(): PartitionSpec + + override def partitionSchema: StructType = partitionSpec().partitionColumns + + protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) + + protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] + + protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] + + override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { + val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { + PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil + } else { + prunePartitions(filters, partitionSpec()).map { + case PartitionPath(values, path) => + val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { + case Some(existingDir) => + // Directory has children files in it, return them + existingDir.filter(f => isDataPath(f.getPath)) + + case None => + // Directory does not exist, or has no children files + Nil + } + PartitionDirectory(values, files) + } + } + logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) + selectedPartitions + } + + /** Returns the list of files that will be read when scanning this relation. */ + override def inputFiles: Array[String] = + allFiles().map(_.getPath.toUri.toString).toArray + + override def sizeInBytes: Long = allFiles().map(_.getLen).sum + + def allFiles(): Seq[FileStatus] = { + if (partitionSpec().partitionColumns.isEmpty) { + // For each of the root input paths, get the list of files inside them + rootPaths.flatMap { path => + // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). + val fs = path.getFileSystem(hadoopConf) + val qualifiedPathPre = fs.makeQualified(path) + val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) { + // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories, + // because the `leafFile.getParent` would have returned an absolute path with the + // separator at the end. + new Path(qualifiedPathPre, Path.SEPARATOR) + } else { + qualifiedPathPre + } + + // There are three cases possible with each path + // 1. The path is a directory and has children files in it. Then it must be present in + // leafDirToChildrenFiles as those children files will have been found as leaf files. + // Find its children files from leafDirToChildrenFiles and include them. + // 2. The path is a file, then it will be present in leafFiles. Include this path. + // 3. The path is a directory, but has no children files. Do not include this path. + + leafDirToChildrenFiles.get(qualifiedPath) + .orElse { leafFiles.get(qualifiedPath).map(Array(_)) } + .getOrElse(Array.empty) + } + } else { + leafFiles.values.toSeq + } + } + + protected def inferPartitioning(): PartitionSpec = { + // We use leaf dirs containing data files to discover the schema. + val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => + files.exists(f => isDataPath(f.getPath)) + }.keys.toSeq + userPartitionSchema match { + case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => + val spec = PartitioningUtils.parsePartitions( + leafDirs, + PartitioningUtils.DEFAULT_PARTITION_NAME, + typeInference = false, + basePaths = basePaths) + + // Without auto inference, all of value in the `row` should be null or in StringType, + // we need to cast into the data type that user specified. + def castPartitionValuesToUserSchema(row: InternalRow) = { + InternalRow((0 until row.numFields).map { i => + Cast( + Literal.create(row.getUTF8String(i), StringType), + userProvidedSchema.fields(i).dataType).eval() + }: _*) + } + + PartitionSpec(userProvidedSchema, spec.partitions.map { part => + part.copy(values = castPartitionValuesToUserSchema(part.values)) + }) + case _ => + PartitioningUtils.parsePartitions( + leafDirs, + PartitioningUtils.DEFAULT_PARTITION_NAME, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, + basePaths = basePaths) + } + } + + private def prunePartitions( + predicates: Seq[Expression], + partitionSpec: PartitionSpec): Seq[PartitionPath] = { + val PartitionSpec(partitionColumns, partitions) = partitionSpec + val partitionColumnNames = partitionColumns.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + + val boundPredicate = InterpretedPredicate.create(predicate.transform { + case a: AttributeReference => + val index = partitionColumns.indexWhere(a.name == _.name) + BoundReference(index, partitionColumns(index).dataType, nullable = true) + }) + + val selected = partitions.filter { + case PartitionPath(values, _) => boundPredicate(values) + } + logInfo { + val total = partitions.length + val selectedSize = selected.length + val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 + s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions." + } + + selected + } else { + partitions + } + } + + /** + * Contains a set of paths that are considered as the base dirs of the input datasets. + * The partitioning discovery logic will make sure it will stop when it reaches any + * base path. + * + * By default, the paths of the dataset provided by users will be base paths. + * Below are three typical examples, + * Case 1) `spark.read.parquet("/path/something=true/")`: the base path will be + * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`. + * Case 2) `spark.read.parquet("/path/something=true/a.parquet")`: the base path will be + * still `/path/something=true/`, and the returned DataFrame will also not contain a column of + * `something`. + * Case 3) `spark.read.parquet("/path/")`: the base path will be `/path/`, and the returned + * DataFrame will have the column of `something`. + * + * Users also can override the basePath by setting `basePath` in the options to pass the new base + * path to the data source. + * For example, `spark.read.option("basePath", "/path/").parquet("/path/something=true/")`, + * and the returned DataFrame will have the column of `something`. + */ + private def basePaths: Set[Path] = { + parameters.get(BASE_PATH_PARAM).map(new Path(_)) match { + case Some(userDefinedBasePath) => + val fs = userDefinedBasePath.getFileSystem(hadoopConf) + if (!fs.isDirectory(userDefinedBasePath)) { + throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory") + } + Set(fs.makeQualified(userDefinedBasePath)) + + case None => + rootPaths.map { path => + // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). + val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path) + if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet + } + } + + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be + // counted as data files, so that they shouldn't participate partition discovery. + private def isDataPath(path: Path): Boolean = { + val name = path.getName + !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) + } + + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery + * discovery threshold. + * + * This is publicly visible for testing. + */ + def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + val output = mutable.LinkedHashSet[FileStatus]() + val pathsToFetch = mutable.ArrayBuffer[Path]() + for (path <- paths) { + fileStatusCache.getLeafFiles(path) match { + case Some(files) => + HiveCatalogMetrics.incrementFileCacheHits(files.length) + output ++= files + case None => + pathsToFetch += path + } + } + val discovered = if (pathsToFetch.length >= + sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession) + } else { + PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf) + } + discovered.foreach { case (path, leafFiles) => + HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) + fileStatusCache.putLeafFiles(path, leafFiles.toArray) + output ++= leafFiles + } + output + } +} + +object PartitioningAwareFileIndex extends Logging { + val BASE_PATH_PARAM = "basePath" + + /** A serializable variant of HDFS's BlockLocation. */ + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + /** A serializable variant of HDFS's FileStatus. */ + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = { + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(hadoopConf, this.getClass) + val filter = FileInputFormat.getInputPathFilter(jobConf) + + paths.map { path => + val fs = path.getFileSystem(hadoopConf) + (path, listLeafFiles0(fs, path, filter)) + } + } + + /** + * List a collection of path recursively in parallel (using Spark executors). + * Each task launched will use [[listLeafFilesInSerial]] to list. + */ + private def listLeafFilesInParallel( + paths: Seq[Path], + hadoopConf: Configuration, + sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { + assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) + logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") + + val sparkContext = sparkSession.sparkContext + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializedPaths = paths.map(_.toString) + + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(paths.size, 10000) + + val statusMap = sparkContext + .parallelize(serializedPaths, numParallelism) + .mapPartitions { paths => + val hadoopConf = serializableConfiguration.value + listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator + }.map { case (path, statuses) => + val serializableStatuses = statuses.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] + } + + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + } + (path.toString, serializableStatuses) + }.collect() + + // turn SerializableFileStatus back to Status + statusMap.map { case (path, serializableStatuses) => + val statuses = serializableStatuses.map { f => + val blockLocations = f.blockLocations.map { loc => + new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) + } + new LocatedFileStatus( + new FileStatus( + f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, + new Path(f.path)), + blockLocations) + } + (new Path(path), statuses) + } + } + + /** + * List a single path, provided as a FileStatus, in serial. + */ + private def listLeafFiles0( + fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = { + logTrace(s"Listing $path") + val name = path.getName.toLowerCase + if (shouldFilterOut(name)) { + Seq.empty[FileStatus] + } else { + // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist + // Note that statuses only include FileStatus for the files and dirs directly under path, + // and does not include anything else recursively. + val statuses = try fs.listStatus(path) catch { + case _: FileNotFoundException => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + Array.empty[FileStatus] + } + + val allLeafStatuses = { + val (dirs, files) = statuses.partition(_.isDirectory) + val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter)) + if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats + } + + allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { + case f: LocatedFileStatus => + f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), + // which is very slow on some file system (RawLocalFileSystem, which is launch a + // subprocess and parse the stdout). + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + lfs + } + } + } + + /** Checks if we should filter out this path name. */ + def shouldFilterOut(pathName: String): Boolean = { + // We filter everything that starts with _ and ., except _common_metadata and _metadata + // because Parquet needs to find those metadata files from leaf files returned by this method. + // We should refactor this logic to not mix metadata files with data files. + ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && + !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 8689017..8566a80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -28,7 +28,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { logicalRelation @ LogicalRelation(fsRelation @ HadoopFsRelation( - tableFileCatalog: TableFileCatalog, + catalogFileIndex: CatalogFileIndex, partitionSchema, _, _, @@ -56,9 +56,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) if (partitionKeyFilters.nonEmpty) { - val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq) + val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = - fsRelation.copy(location = prunedFileCatalog)(sparkSession) + fsRelation.copy(location = prunedFileIndex)(sparkSession) val prunedLogicalRelation = logicalRelation.copy( relation = prunedFsRelation, expectedOutputAttributes = Some(logicalRelation.output)) http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala deleted file mode 100644 index b459df5..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import org.apache.hadoop.fs.Path - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.StructType - - -/** - * A [[FileCatalog]] for a metastore catalog table. - * - * @param sparkSession a [[SparkSession]] - * @param table the metadata of the table - * @param sizeInBytes the table's data size in bytes - */ -class TableFileCatalog( - sparkSession: SparkSession, - val table: CatalogTable, - override val sizeInBytes: Long) extends FileCatalog { - - protected val hadoopConf = sparkSession.sessionState.newHadoopConf - - private val fileStatusCache = FileStatusCache.newCache(sparkSession) - - assert(table.identifier.database.isDefined, - "The table identifier must be qualified in TableFileCatalog") - - private val baseLocation = table.storage.locationUri - - override def partitionSchema: StructType = table.partitionSchema - - override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq - - override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { - filterPartitions(filters).listFiles(Nil) - } - - override def refresh(): Unit = fileStatusCache.invalidateAll() - - /** - * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions - * specified by the given partition-pruning filters. - * - * @param filters partition-pruning filters - */ - def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = { - if (table.partitionColumnNames.nonEmpty) { - val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( - table.identifier, filters) - val partitions = selectedPartitions.map { p => - PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get) - } - val partitionSpec = PartitionSpec(partitionSchema, partitions) - new PrunedTableFileCatalog( - sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec) - } else { - new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None) - } - } - - override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles - - // `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member - // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to - // implement `equals` and `hashCode` here, to make it work with cache lookup. - override def equals(o: Any): Boolean = o match { - case other: TableFileCatalog => this.table.identifier == other.table.identifier - case _ => false - } - - override def hashCode(): Int = table.identifier.hashCode() -} - -/** - * An override of the standard HDFS listing based catalog, that overrides the partition spec with - * the information from the metastore. - * - * @param tableBasePath The default base path of the Hive metastore table - * @param partitionSpec The partition specifications from Hive metastore - */ -private class PrunedTableFileCatalog( - sparkSession: SparkSession, - tableBasePath: Path, - fileStatusCache: FileStatusCache, - override val partitionSpec: PartitionSpec) - extends ListingFileCatalog( - sparkSession, - partitionSpec.partitions.map(_.path), - Map.empty, - Some(partitionSpec.partitionColumns), - fileStatusCache) http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index c14feea..b26edee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -146,7 +146,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( */ def allFiles(): Array[T] = { var latestId = getLatest().map(_._1).getOrElse(-1L) - // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog` + // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex` // is calling this method. This loop will retry the reading to deal with the // race condition. while (true) { @@ -158,7 +158,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( } catch { case e: IOException => // Another process using `CompactibleFileStreamLog` may delete the batch files when - // `StreamFileCatalog` are reading. However, it only happens when a compaction is + // `StreamFileIndex` are reading. However, it only happens when a compaction is // deleting old files. If so, let's try the next compaction batch and we should find it. // Otherwise, this is a real IO issue and we should throw it. latestId = nextCompactionBatchId(latestId, compactInterval) http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index a392b82..680df01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.types.StructType /** @@ -156,7 +156,7 @@ class FileStreamSource( private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) - val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) + val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) val files = catalog.allFiles().sortBy(_.getModificationTime).map { status => (status.getPath.toUri.toString, status.getModificationTime) } http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala index 82b67cb..aeaa134 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala @@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.datasources._ /** - * A [[FileCatalog]] that generates the list of files to processing by reading them from the + * A [[FileIndex]] that generates the list of files to processing by reading them from the * metadata log files generated by the [[FileStreamSink]]. */ -class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path) - extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) { +class MetadataLogFileIndex(sparkSession: SparkSession, path: Path) + extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) { private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) logInfo(s"Reading streaming file log from $metadataDirectory") http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 9c43169..56df1fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -28,15 +28,15 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test.SharedSQLContext -class FileCatalogSuite extends SharedSQLContext { +class FileIndexSuite extends SharedSQLContext { - test("ListingFileCatalog: leaf files are qualified paths") { + test("InMemoryFileIndex: leaf files are qualified paths") { withTempDir { dir => val file = new File(dir, "text.txt") stringToFile(file, "text") val path = new Path(file.getCanonicalPath) - val catalog = new ListingFileCatalog(spark, Seq(path), Map.empty, None) { + val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) { def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq } @@ -45,7 +45,7 @@ class FileCatalogSuite extends SharedSQLContext { } } - test("ListingFileCatalog: input paths are converted to qualified paths") { + test("InMemoryFileIndex: input paths are converted to qualified paths") { withTempDir { dir => val file = new File(dir, "text.txt") stringToFile(file, "text") @@ -59,42 +59,42 @@ class FileCatalogSuite extends SharedSQLContext { val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath)) require(qualifiedFilePath.toString.startsWith("file:")) - val catalog1 = new ListingFileCatalog( + val catalog1 = new InMemoryFileIndex( spark, Seq(unqualifiedDirPath), Map.empty, None) assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) - val catalog2 = new ListingFileCatalog( + val catalog2 = new InMemoryFileIndex( spark, Seq(unqualifiedFilePath), Map.empty, None) assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) } } - test("ListingFileCatalog: folders that don't exist don't throw exceptions") { + test("InMemoryFileIndex: folders that don't exist don't throw exceptions") { withTempDir { dir => val deletedFolder = new File(dir, "deleted") assert(!deletedFolder.exists()) - val catalog1 = new ListingFileCatalog( + val catalog1 = new InMemoryFileIndex( spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) // doesn't throw an exception assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty) } } - test("PartitioningAwareFileCatalog - file filtering") { - assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd")) - assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab")) - assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd")) - assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata")) - assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata")) - assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata")) - assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata")) + test("PartitioningAwareFileIndex - file filtering") { + assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd")) + assert(PartitioningAwareFileIndex.shouldFilterOut(".ab")) + assert(PartitioningAwareFileIndex.shouldFilterOut("_cd")) + assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata")) + assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata")) + assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata")) + assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata")) } - test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") { + test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") { class MockCatalog( override val rootPaths: Seq[Path]) - extends PartitioningAwareFileCatalog(spark, Map.empty, None) { + extends PartitioningAwareFileIndex(spark, Map.empty, None) { override def refresh(): Unit = {} http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index c32254d..d900ce7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -393,7 +393,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi util.stringToFile(file, fileName) } - val fileCatalog = new ListingFileCatalog( + val fileCatalog = new InMemoryFileIndex( sparkSession = spark, rootPaths = Seq(new Path(tempDir)), parameters = Map.empty[String, String], http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index f2a209e..120a3a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -634,7 +634,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, _, _), _, _) => + HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _) => assert(location.partitionSpec() === PartitionSpec.emptySpec) }.getOrElse { fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution") http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 19c89f5..18b42a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileCatalog} +import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileIndex} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} @@ -179,14 +179,14 @@ class FileStreamSinkSuite extends StreamTest { .add(StructField("id", IntegerType)) assert(outputDf.schema === expectedSchema) - // Verify that MetadataLogFileCatalog is being used and the correct partitioning schema has + // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has // been inferred val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect { case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation] => baseRelation.asInstanceOf[HadoopFsRelation] } assert(hadoopdFsRelations.size === 1) - assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileCatalog]) + assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex]) assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id")) assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value")) http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b9e9da9..47018b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -879,7 +879,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val numFiles = 10000 // This is to avoid running a spark job to list of files in parallel - // by the ListingFileCatalog. + // by the InMemoryFileIndex. spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2) withTempDirs { case (root, tmp) => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org