[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2017-01-08 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r95084391
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+object FileStatusCache {
+  private var sharedCache: SharedInMemoryCache = null
+
+  /**
+   * @return a new FileStatusCache based on session configuration. Cache 
memory quota is
+   * shared across all clients.
+   */
+  def newCache(session: SparkSession): FileStatusCache = {
+synchronized {
+  if (session.sqlContext.conf.filesourcePartitionPruning &&
+  session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
--- End diff --

How about `HIVE_MANAGE_FILESOURCE_PARTITIONS`? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2017-01-08 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r95074015
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+object FileStatusCache {
+  private var sharedCache: SharedInMemoryCache = null
+
+  /**
+   * @return a new FileStatusCache based on session configuration. Cache 
memory quota is
+   * shared across all clients.
+   */
+  def newCache(session: SparkSession): FileStatusCache = {
+synchronized {
+  if (session.sqlContext.conf.filesourcePartitionPruning &&
+  session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
--- End diff --

Oh I see, the sizing can be problematic. I find it sometimes useful for 
debugging to turn it off at runtime by setting it to zero though -- it will 
take effect after a refresh table.

Perhaps we can just add a comment that adjusting the value at runtime may 
produce unexpected behavior when there are multiple sessions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2017-01-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r95073558
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+object FileStatusCache {
+  private var sharedCache: SharedInMemoryCache = null
+
+  /**
+   * @return a new FileStatusCache based on session configuration. Cache 
memory quota is
+   * shared across all clients.
+   */
+  def newCache(session: SparkSession): FileStatusCache = {
+synchronized {
+  if (session.sqlContext.conf.filesourcePartitionPruning &&
+  session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
--- End diff --

Or can we make the cache session-specific? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2017-01-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r95073551
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+object FileStatusCache {
+  private var sharedCache: SharedInMemoryCache = null
+
+  /**
+   * @return a new FileStatusCache based on session configuration. Cache 
memory quota is
+   * shared across all clients.
+   */
+  def newCache(session: SparkSession): FileStatusCache = {
+synchronized {
+  if (session.sqlContext.conf.filesourcePartitionPruning &&
+  session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
--- End diff --

If we allow users to change it at runtime, users might get strange results 
due to the global sharing cache. Some sessions might skip cache to directly 
change the metastore, some sessions are still using the cached values. Even in 
a single session, we might still face similar issues if we allow users to 
change it at runtime. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2017-01-07 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r95073488
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+object FileStatusCache {
+  private var sharedCache: SharedInMemoryCache = null
+
+  /**
+   * @return a new FileStatusCache based on session configuration. Cache 
memory quota is
+   * shared across all clients.
+   */
+  def newCache(session: SparkSession): FileStatusCache = {
+synchronized {
+  if (session.sqlContext.conf.filesourcePartitionPruning &&
+  session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
--- End diff --

Sounds good, as long as you can still change it at runtime.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2017-01-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r95073252
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+object FileStatusCache {
+  private var sharedCache: SharedInMemoryCache = null
+
+  /**
+   * @return a new FileStatusCache based on session configuration. Cache 
memory quota is
+   * shared across all clients.
+   */
+  def newCache(session: SparkSession): FileStatusCache = {
+synchronized {
+  if (session.sqlContext.conf.filesourcePartitionPruning &&
+  session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
--- End diff --

These two conf have to be global and static; otherwise, we might face weird 
behaviors when different sessions are using/changing different values. Let me 
submit a PR to improve it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15539


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84568025
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -38,14 +38,16 @@ class ListingFileCatalog(
 sparkSession: SparkSession,
 override val rootPaths: Seq[Path],
 parameters: Map[String, String],
-partitionSchema: Option[StructType])
-  extends PartitioningAwareFileCatalog(sparkSession, parameters, 
partitionSchema) {
+partitionSchema: Option[StructType],
+fileStatusCache: FileStatusCache = NoopCache)
+  extends PartitioningAwareFileCatalog(
+sparkSession, parameters, partitionSchema, fileStatusCache) {
 
   @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, 
FileStatus] = _
   @volatile private var cachedLeafDirToChildrenFiles: Map[Path, 
Array[FileStatus]] = _
--- End diff --

is this cache still useful?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84508947
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -42,24 +43,21 @@ class TableFileCatalog(
 
   protected val hadoopConf = sparkSession.sessionState.newHadoopConf
 
+  private val fileStatusCache = FileStatusCache.getOrInitializeShared(new 
Object(), sparkSession)
--- End diff --

Odd, I figured we'd call the `refresh` method to refresh the table.

The reason I suggested using the table's name is so that all references to 
the same table can use the same cache—avoid redundant cache entries and 
provide the ability to inherit cached entries for new instances of the same 
logical table. Aside from implementation issues, does that make sense?

Anyway, this is something we can explore in a follow up PR if desired.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-21 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84424330
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 ---
@@ -294,7 +308,7 @@ object PartitioningAwareFileCatalog extends Logging {
   private def listLeafFilesInParallel(
   paths: Seq[Path],
   hadoopConf: Configuration,
-  sparkSession: SparkSession): Seq[FileStatus] = {
+  sparkSession: SparkSession): Map[Path, Seq[FileStatus]] = {
--- End diff --

Updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-21 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84424519
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -64,11 +66,18 @@ class ListingFileCatalog(
   }
 
   override def refresh(): Unit = {
+refresh0(true)
--- End diff --

Good idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-21 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84424418
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -42,24 +43,21 @@ class TableFileCatalog(
 
   protected val hadoopConf = sparkSession.sessionState.newHadoopConf
 
+  private val fileStatusCache = FileStatusCache.getOrInitializeShared(new 
Object(), sparkSession)
--- End diff --

The way we currently refresh tables is by dropping the reference to 
TableFileCatalog and letting it get GC'ed. Given this strategy, making it 
private is the simplest way to ensure refresh actually works correctly -- 
otherwise you have to carefully test that refresh also invalidates shared cache 
entries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-21 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84424682
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+object FileStatusCache {
+  // Opaque object that uniquely identifies a shared cache user
+  type ClientId = Object
--- End diff --

I think it's better to err on the side of isolation here. Otherwise, it is 
harder to reason about what is actually invalidated when a table is refreshed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84409662
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+object FileStatusCache {
+  // Opaque object that uniquely identifies a shared cache user
+  type ClientId = Object
--- End diff --

Why do we need a client id? It looks to me that if a client refers to a 
same path of another client, it won't hit the cache. Is it an expected 
behaviour?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84409377
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -64,11 +66,18 @@ class ListingFileCatalog(
   }
 
   override def refresh(): Unit = {
+refresh0(true)
--- End diff --

nit: instead of having a parameter in `refresh0`, can we just call 
`refresh0` here, and then invalidate the file status cache?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-20 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84402516
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -42,24 +43,21 @@ class TableFileCatalog(
 
   protected val hadoopConf = sparkSession.sessionState.newHadoopConf
 
+  private val fileStatusCache = FileStatusCache.getOrInitializeShared(new 
Object(), sparkSession)
--- End diff --

Does the file status cache need to be private to an instance of 
`TableFileCatalog`? Can we use the table's qualified name as the key here 
instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-20 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84355969
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
--- End diff --

What about using the cache's built-in eviction process? That would enforce 
an upper limit on memory usage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-20 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84353307
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala 
---
@@ -103,11 +92,103 @@ class HiveDataFrameSuite extends QueryTest with 
TestHiveSingleton with SQLTestUt
   assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
   assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
 
-  // read all should be cached
+  // read all should not be cached
   HiveCatalogMetrics.reset()
   spark.sql("select * from test").count()
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
+
+  // cache should be disabled
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+}
+  }
+}
+  }
+
+  test("lazy partition pruning with file status caching enabled") {
+withSQLConf(
+"spark.sql.hive.filesourcePartitionPruning" -> "true",
+"spark.sql.hive.filesourcePartitionFileCacheSize" -> "999") {
+  withTable("test") {
+withTempDir { dir =>
+  setupPartitionedTable("test", dir)
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test where partCol1 = 
999").count() == 0)
   assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 0)
   assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
0)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test where partCol1 < 
2").count() == 2)
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 2)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
2)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test where partCol1 < 
3").count() == 3)
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 3)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
1)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 2)
+
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test").count() == 5)
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
2)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 3)
+
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test").count() == 5)
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
0)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 5)
+}
+  }
+}
+  }
+
+  test("file status caching respects refresh table and refreshByPath") {
+withSQLConf(
+"spark.sql.hive.filesourcePartitionPruning" -> "true",
+"spark.sql.hive.filesourcePartitionFileCacheSize" -> "999") {
+  withTable("test") {
+withTempDir { dir =>
+  setupPartitionedTable("test", dir)
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test").count() == 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+  HiveCatalogMetrics.reset()
+  spark.sql("refresh table test")
+  assert(spark.sql("select * from test").count() == 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+  spark.catalog.cacheTable("test")
+  HiveCatalogMetrics.reset()
+  spark.catalog.refreshByPath(dir.getAbsolutePath)
+  assert(spark.sql("select * from test").count() == 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+}
+  }
+}
+  }
+
+  test("file status cache respects size limit") {
+withSQLConf(
+"spark.sql.hive.filesourcePartitionPruning" -> "true",
+"spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte 
*/) {
--- End 

[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-20 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84353211
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -87,21 +85,13 @@ class TableFileCatalog(
 }
 val partitionSpec = PartitionSpec(schema, partitions)
 new PrunedTableFileCatalog(
-  sparkSession, new Path(baseLocation.get), partitionSpec)
+  sparkSession, new Path(baseLocation.get), fileStatusCache, 
partitionSpec)
   case None =>
-new ListingFileCatalog(sparkSession, rootPaths, parameters, None)
-}
-  }
-
-  // Not used in the hot path of queries when metastore partition pruning 
is enabled
-  def allPartitions: ListingFileCatalog = synchronized {
-if (cachedAllPartitions == null) {
--- End diff --

Removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-20 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84353179
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -231,11 +231,16 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
 val fileCatalog = {
   val catalog = new TableFileCatalog(
-sparkSession, db, table, Some(partitionSchema), sizeInBytes)
+sparkSession, db, table, Some(partitionSchema), sizeInBytes,
+fileStatusCacheSize = if (lazyPruningEnabled) {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-20 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84352755
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -64,11 +66,18 @@ class ListingFileCatalog(
   }
 
   override def refresh(): Unit = {
+refresh0(true)
+  }
+
+  private def refresh0(invalidateSharedCache: Boolean): Unit = {
--- End diff --

It's shared within a single table. Confusingly, you can have views over the 
table that are cached in CacheManager, so we need this to invalidate the table 
cache as well on refresh.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-20 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84352402
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
--- End diff --

This is already better than what we have in 2.0 - which is caching 
everything with no limit.

I think a global cache is really fairly complicated since we can no longer 
rely on the Java GC to manage the lifetime of the cache entries. Unless we want 
to do cache GC in a finalizer, I don't think this is worth it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-20 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84222074
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
--- End diff --

hm this is pretty risky, because the number of tables in a real environment 
can be very large.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84203885
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala 
---
@@ -103,11 +92,103 @@ class HiveDataFrameSuite extends QueryTest with 
TestHiveSingleton with SQLTestUt
   assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
   assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
 
-  // read all should be cached
+  // read all should not be cached
   HiveCatalogMetrics.reset()
   spark.sql("select * from test").count()
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
+
+  // cache should be disabled
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+}
+  }
+}
+  }
+
+  test("lazy partition pruning with file status caching enabled") {
+withSQLConf(
+"spark.sql.hive.filesourcePartitionPruning" -> "true",
+"spark.sql.hive.filesourcePartitionFileCacheSize" -> "999") {
+  withTable("test") {
+withTempDir { dir =>
+  setupPartitionedTable("test", dir)
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test where partCol1 = 
999").count() == 0)
   assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 0)
   assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
0)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test where partCol1 < 
2").count() == 2)
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 2)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
2)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test where partCol1 < 
3").count() == 3)
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 3)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
1)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 2)
+
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test").count() == 5)
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
2)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 3)
+
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test").count() == 5)
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
0)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 5)
+}
+  }
+}
+  }
+
+  test("file status caching respects refresh table and refreshByPath") {
+withSQLConf(
+"spark.sql.hive.filesourcePartitionPruning" -> "true",
+"spark.sql.hive.filesourcePartitionFileCacheSize" -> "999") {
+  withTable("test") {
+withTempDir { dir =>
+  setupPartitionedTable("test", dir)
+  HiveCatalogMetrics.reset()
+  assert(spark.sql("select * from test").count() == 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+  HiveCatalogMetrics.reset()
+  spark.sql("refresh table test")
+  assert(spark.sql("select * from test").count() == 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+  spark.catalog.cacheTable("test")
+  HiveCatalogMetrics.reset()
+  spark.catalog.refreshByPath(dir.getAbsolutePath)
+  assert(spark.sql("select * from test").count() == 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+}
+  }
+}
+  }
+
+  test("file status cache respects size limit") {
+withSQLConf(
+"spark.sql.hive.filesourcePartitionPruning" -> "true",
+"spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte 
*/) {
--- 

[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84203441
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -231,11 +231,16 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
 val fileCatalog = {
   val catalog = new TableFileCatalog(
-sparkSession, db, table, Some(partitionSchema), sizeInBytes)
+sparkSession, db, table, Some(partitionSchema), sizeInBytes,
+fileStatusCacheSize = if (lazyPruningEnabled) {
--- End diff --

shall we inline this logic in `TableFileCatalog`? `TableFileCatalog` can 
access `SparkSession` so it can know these 2 flags: 
`filesourcePartitionPruning` and `filesourcePartitionFileCacheSize`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84202841
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -64,11 +66,18 @@ class ListingFileCatalog(
   }
 
   override def refresh(): Unit = {
+refresh0(true)
+  }
+
+  private def refresh0(invalidateSharedCache: Boolean): Unit = {
--- End diff --

the cache is shared?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84184588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+/**
+ * An implementation that caches all partition file statuses in memory 
forever.
--- End diff --

Updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84161990
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
--- End diff --

A single shared cache with some kind of upper limit on entries (or memory, 
the configuration of which is worth another discussion) and automatic eviction 
would simplify global resource management. I think we will need something like 
this to support the thriftserver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84152855
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 ---
@@ -276,15 +290,15 @@ object PartitioningAwareFileCatalog extends Logging {
*/
   private def listLeafFilesInSerial(
   paths: Seq[Path],
-  hadoopConf: Configuration): Seq[FileStatus] = {
+  hadoopConf: Configuration): Map[Path, Seq[FileStatus]] = {
 // Dummy jobconf to get to the pathFilter defined in configuration
 val jobConf = new JobConf(hadoopConf, this.getClass)
 val filter = FileInputFormat.getInputPathFilter(jobConf)
 
-paths.flatMap { path =>
+paths.map { path =>
   val fs = path.getFileSystem(hadoopConf)
-  listLeafFiles0(fs, path, filter)
-}
+  (path, listLeafFiles0(fs, path, filter))
+}.toMap
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84152806
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -38,14 +38,16 @@ class ListingFileCatalog(
 sparkSession: SparkSession,
 override val rootPaths: Seq[Path],
 parameters: Map[String, String],
-partitionSchema: Option[StructType])
-  extends PartitioningAwareFileCatalog(sparkSession, parameters, 
partitionSchema) {
+partitionSchema: Option[StructType],
+fileStatusCache: FileStatusCache = new NoopCache)
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84152793
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+/**
+ * An implementation that caches all partition file statuses in memory 
forever.
+ */
+class InMemoryCache extends FileStatusCache {
+  private val cache = new ConcurrentHashMap[Path, Array[FileStatus]]()
+
+  override def getLeafFiles(path: Path): Option[Array[FileStatus]] = {
+Option(cache.get(path))
+  }
+
+  override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): 
Unit = {
+cache.put(path, leafFiles.toArray)
+  }
+
+  override def invalidateAll(): Unit = {
+cache.clear()
+  }
+}
+
+/**
+ * A non-caching implementation used when partition file status caching is 
disabled.
+ */
+private class NoopCache extends FileStatusCache {
+  override def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+  override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): 
Unit = {}
+  override def invalidateAll(): Unit = {}
+}
+
+object FileStatusCache {
+  val noop: FileStatusCache = new NoopCache
--- End diff --

I ran into some compilation issues there, but I guess I just had to do a 
clean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84153565
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala 
---
@@ -103,11 +92,84 @@ class HiveDataFrameSuite extends QueryTest with 
TestHiveSingleton with SQLTestUt
   assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
   assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
 
-  // read all should be cached
+  // read all should not be cached
   HiveCatalogMetrics.reset()
   spark.sql("select * from test").count()
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
+
+  // cache should be disabled
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+}
+  }
+}
+  }
+
+  test("lazy partition pruning with file status caching enabled") {
+withSQLConf(
+"spark.sql.hive.filesourcePartitionPruning" -> "true",
+"spark.sql.hive.filesourcePartitionFileCacheEnabled" -> "true") {
+  withTable("test") {
+withTempDir { dir =>
+  setupPartitionedTable("test", dir)
+  HiveCatalogMetrics.reset()
+  spark.sql("select * from test where partCol1 = 999").count()
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84159294
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
--- End diff --

There's some extra complexity here since each table has its own cache, so 
you could still exceed the amount of available memory if there were multiple 
tables.

Some options here:
1) A large shared cache that includes table name in the key
2) Pick an arbitrary size limit for each table cache
3) Do nothing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-19 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r84118387
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -32,13 +32,21 @@ import org.apache.spark.sql.types.StructType
  * @param table the table's (unqualified) name
  * @param partitionSchema the schema of a partitioned table's partition 
columns
  * @param sizeInBytes the table's data size in bytes
+ * @param enableFileStatusCache whether to enable file status caching
  */
 class TableFileCatalog(
 sparkSession: SparkSession,
 db: String,
 table: String,
 partitionSchema: Option[StructType],
-override val sizeInBytes: Long) extends FileCatalog {
+override val sizeInBytes: Long,
+enableFileStatusCache: Boolean) extends FileCatalog {
--- End diff --

I would turn it off if it was taking too much heap space, and I didn't have 
a better workaround.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r83991043
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -32,13 +32,21 @@ import org.apache.spark.sql.types.StructType
  * @param table the table's (unqualified) name
  * @param partitionSchema the schema of a partitioned table's partition 
columns
  * @param sizeInBytes the table's data size in bytes
+ * @param enableFileStatusCache whether to enable file status caching
  */
 class TableFileCatalog(
 sparkSession: SparkSession,
 db: String,
 table: String,
 partitionSchema: Option[StructType],
-override val sizeInBytes: Long) extends FileCatalog {
+override val sizeInBytes: Long,
+enableFileStatusCache: Boolean) extends FileCatalog {
--- End diff --

when will users want to turn it off?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r83990858
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala 
---
@@ -103,11 +92,84 @@ class HiveDataFrameSuite extends QueryTest with 
TestHiveSingleton with SQLTestUt
   assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
   assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
 
-  // read all should be cached
+  // read all should not be cached
   HiveCatalogMetrics.reset()
   spark.sql("select * from test").count()
+  assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() 
== 5)
+  assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 
5)
+
+  // cache should be disabled
+  assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+}
+  }
+}
+  }
+
+  test("lazy partition pruning with file status caching enabled") {
+withSQLConf(
+"spark.sql.hive.filesourcePartitionPruning" -> "true",
+"spark.sql.hive.filesourcePartitionFileCacheEnabled" -> "true") {
+  withTable("test") {
+withTempDir { dir =>
+  setupPartitionedTable("test", dir)
+  HiveCatalogMetrics.reset()
+  spark.sql("select * from test where partCol1 = 999").count()
--- End diff --

I suggest wrapping this and other such calls in an `assert` to declare and 
validate assumptions, e.g.

```
assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
```

whereas

```
assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r83990583
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+/**
+ * An implementation that caches all partition file statuses in memory 
forever.
+ */
+class InMemoryCache extends FileStatusCache {
+  private val cache = new ConcurrentHashMap[Path, Array[FileStatus]]()
+
+  override def getLeafFiles(path: Path): Option[Array[FileStatus]] = {
+Option(cache.get(path))
+  }
+
+  override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): 
Unit = {
+cache.put(path, leafFiles.toArray)
+  }
+
+  override def invalidateAll(): Unit = {
+cache.clear()
+  }
+}
+
+/**
+ * A non-caching implementation used when partition file status caching is 
disabled.
+ */
+private class NoopCache extends FileStatusCache {
+  override def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+  override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): 
Unit = {}
+  override def invalidateAll(): Unit = {}
+}
+
+object FileStatusCache {
+  val noop: FileStatusCache = new NoopCache
--- End diff --

why not just make `NoopCache` an object?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r83988599
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -38,14 +38,16 @@ class ListingFileCatalog(
 sparkSession: SparkSession,
 override val rootPaths: Seq[Path],
 parameters: Map[String, String],
-partitionSchema: Option[StructType])
-  extends PartitioningAwareFileCatalog(sparkSession, parameters, 
partitionSchema) {
+partitionSchema: Option[StructType],
+fileStatusCache: FileStatusCache = new NoopCache)
--- End diff --

It just occurred to me I may not have been clear in my suggestion. Just in 
case, I'm suggesting changing `class NoopCache` to `object NoopCache`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r83987399
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -38,14 +38,16 @@ class ListingFileCatalog(
 sparkSession: SparkSession,
 override val rootPaths: Seq[Path],
 parameters: Map[String, String],
-partitionSchema: Option[StructType])
-  extends PartitioningAwareFileCatalog(sparkSession, parameters, 
partitionSchema) {
+partitionSchema: Option[StructType],
+fileStatusCache: FileStatusCache = new NoopCache)
--- End diff --

I still think you should just use a singleton object, but if you want to 
use `FileStatusCache.noop`, you forgot to update this code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r83987017
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 ---
@@ -276,15 +290,15 @@ object PartitioningAwareFileCatalog extends Logging {
*/
   private def listLeafFilesInSerial(
   paths: Seq[Path],
-  hadoopConf: Configuration): Seq[FileStatus] = {
+  hadoopConf: Configuration): Map[Path, Seq[FileStatus]] = {
 // Dummy jobconf to get to the pathFilter defined in configuration
 val jobConf = new JobConf(hadoopConf, this.getClass)
 val filter = FileInputFormat.getInputPathFilter(jobConf)
 
-paths.flatMap { path =>
+paths.map { path =>
   val fs = path.getFileSystem(hadoopConf)
-  listLeafFiles0(fs, path, filter)
-}
+  (path, listLeafFiles0(fs, path, filter))
+}.toMap
--- End diff --

Based on how you're using this return value above, it looks like you can 
omit this call to `toMap` and just return a sequence of tuples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r83986875
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A cache of the leaf files of partition directories. We cache these 
files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each 
query would have to
+ * hit remote storage in order to gather file statistics for physical 
planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the 
backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache 
will be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or 
None if not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+/**
+ * An implementation that caches all partition file statuses in memory 
forever.
+ */
+class InMemoryCache extends FileStatusCache {
+  private val cache = new ConcurrentHashMap[Path, Array[FileStatus]]()
+
+  override def getLeafFiles(path: Path): Option[Array[FileStatus]] = {
+Option(cache.get(path))
+  }
+
+  override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): 
Unit = {
+cache.put(path, leafFiles.toArray)
+  }
+
+  override def invalidateAll(): Unit = {
+cache.clear()
+  }
+}
+
+/**
+ * A non-caching implementation used when partition file status caching is 
disabled.
+ */
+class NoopCache extends FileStatusCache {
--- End diff --

good idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r83981625
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1486,10 +1485,10 @@ private[spark] object Utils extends Logging {
   val gzInputStream = new GZIPInputStream(new FileInputStream(file))
   val bufSize = 1024
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/15539#discussion_r83980885
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1486,10 +1485,10 @@ private[spark] object Utils extends Logging {
   val gzInputStream = new GZIPInputStream(new FileInputStream(file))
   val bufSize = 1024
--- End diff --

Sorry, I know this is beside the point of this PR, but since we have this 
diff here I want to point out that this buffer seems really, really small for 
decompressing and reading a large gzip input stream (by large I mean one with 
an uncompressed file size > 4GB). For smaller files, we can much more 
efficiently get the file size by reading from the gzip file footer, as 
described here:

http://www.abeel.be/content/determine-uncompressed-size-gzip-file

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15539: [SPARK-17994] [SQL] Add back a file status cache ...

2016-10-18 Thread ericl
GitHub user ericl opened a pull request:

https://github.com/apache/spark/pull/15539

[SPARK-17994] [SQL] Add back a file status cache for catalog tables

## What changes were proposed in this pull request?

In SPARK-16980, we removed the full in-memory cache of table partitions in 
favor of loading only needed partitions from the metastore. This greatly 
improves the initial latency of queries that only read a small fraction of 
table partitions.

However, since the metastore does not store file statistics, we need to 
discover those from remote storage. With the loss of the in-memory file status 
cache this has to happen on each query, increasing the latency of repeated 
queries over the same partitions.

The proposal is to add back a per-table cache of partition contents, i.e. 
Map[Path, Array[FileStatus]]. This cache would be retained per-table, and can 
be invalidated through refreshTable() and refreshByPath(). Unlike the prior 
cache, it can be incrementally updated as new partitions are read.

## How was this patch tested?

Existing tests and new tests in `HiveTablePerfStatsSuite`.

cc @mallman 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ericl/spark meta-cache

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15539.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15539


commit c2eacb7da1d2d4129b19be89a2c07e91dbff3964
Author: Michael Allman 
Date:   2016-08-10T19:07:34Z

[SPARK-16980][SQL] Load only catalog table partition metadata required
to answer a query

commit 1f611c4089102744242b73346d9724d248635cac
Author: Michael Allman 
Date:   2016-09-13T01:21:38Z

Add a new catalyst optimizer rule to SQL core for pruning unneeded
partitions' files from a table file catalog

commit 8b24eada4a0b49f39d16570ee86f52ddc1682251
Author: Michael Allman 
Date:   2016-10-08T00:15:11Z

Include the type of file catalog in the FileSourceScanExec metadata

commit f82f0d228141dd026b0b631e8d984961ee8b827b
Author: Michael Allman 
Date:   2016-10-08T00:15:54Z

TODO: Consider renaming FileCatalog to better differentiate it from
BasicFileCatalog (or vice-versa)

commit 1f0d5d88538da058e474098eabba53d387f70f53
Author: Eric Liang 
Date:   2016-10-11T02:54:53Z

try out parquet case insensitive fallback

commit 198dd9457fad08516f65ea1bcfa6edf4af17d948
Author: Michael Allman 
Date:   2016-10-11T17:53:13Z

Refactor the FileSourceScanExec.metadata val to make it prettier

commit acc84f07f53d3c87c5637636e69b1c564421484a
Author: Michael Allman 
Date:   2016-10-11T19:00:43Z

Refactor `TableFileCatalog.listFiles` to call `listDataLeafFiles` once
instead of once per partition

commit 59de5ca2c8b209a190dc0c6082fc6e2d2de0096b
Author: Eric Liang 
Date:   2016-10-11T23:03:18Z

fix and add test for input files

commit 3b51624263cfcedd3e51b71342b940592a5f6118
Author: Eric Liang 
Date:   2016-10-11T23:09:06Z

rename test

commit f94863dd386a8654986a1fde09e5d87ded97a6e3
Author: Eric Liang 
Date:   2016-10-13T01:09:02Z

fix it

commit 0958bcd8f088d5641fc78952b8265ce05232c3f9
Author: Eric Liang 
Date:   2016-10-12T20:20:11Z

feature flag

commit 291cee788e1bcc3ecbd7b1a4187f8eba58e134fb
Author: Eric Liang 
Date:   2016-10-12T22:48:03Z

add comments

commit 022d5b9873018dad8ac08646704f567176977877
Author: Eric Liang 
Date:   2016-10-13T01:26:23Z

more test cases

commit 8bd27be814f7721f3764364c72b33c7f67e0e9ff
Author: Eric Liang 
Date:   2016-10-13T01:46:41Z

also fix a bug with zero partitions selected

commit 627572e0020d313a9c1378349e2ee4ab0d0e97f1
Author: Eric Liang 
Date:   2016-10-13T17:30:48Z

extend and fix flakiness in test

commit 6d8e7ea9f904e33af4ca7372f5b31379aede9308
Author: Michael Allman 
Date:   2016-10-13T17:55:26Z

Enhance `ParquetMetastoreSuite` with mixed-case partition columns

commit 21caa932a157ec3dd394829061b06bd3d857de0f
Author: Michael Allman 
Date:   2016-10-13T18:29:25Z

Tidy up a little by removing some unused imports, an unused method and
moving a protected method down and making it private

commit d7795cd0f3bc517bdf278e626ca25ce08ea23bcb
Author: Michael Allman 
Date:   2016-10-13T18:44:15Z

Put partition count in `FileSourceScanExec.metadata` for partitioned
tables

commit 765f93ce664ef33c1c62bf80b678ff5ba2992b85
Author: Michael Allman