Repository: spark
Updated Branches:
  refs/heads/master 5773ab121 -> 9064f1b04


[SPARK-20495][SQL][CORE] Add StorageLevel to cacheTable API

## What changes were proposed in this pull request?
Currently cacheTable API only supports MEMORY_AND_DISK. This PR adds additional 
API to take different storage levels.
## How was this patch tested?
unit tests

Author: madhu <phatak....@gmail.com>

Closes #17802 from phatak-dev/cacheTableAPI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9064f1b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9064f1b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9064f1b0

Branch: refs/heads/master
Commit: 9064f1b04461513a147aeb8179471b05595ddbc4
Parents: 5773ab1
Author: madhu <phatak....@gmail.com>
Authored: Fri May 5 22:44:03 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri May 5 22:44:03 2017 +0800

----------------------------------------------------------------------
 project/MimaExcludes.scala                            |  2 ++
 .../scala/org/apache/spark/sql/catalog/Catalog.scala  | 14 +++++++++++++-
 .../org/apache/spark/sql/internal/CatalogImpl.scala   | 13 +++++++++++++
 .../org/apache/spark/sql/internal/CatalogSuite.scala  |  8 ++++++++
 4 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9064f1b0/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index dbf933f..d50882c 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,8 @@ object MimaExcludes {
 
   // Exclude rules for 2.3.x
   lazy val v23excludes = v22excludes ++ Seq(
+    // [SPARK-20495][SQL] Add StorageLevel to cacheTable API
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable")
   )
 
   // Exclude rules for 2.2.x

http://git-wip-us.apache.org/repos/asf/spark/blob/9064f1b0/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 7e5da01..ab81725 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
 import org.apache.spark.sql.types.StructType
-
+import org.apache.spark.storage.StorageLevel
 
 /**
  * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
@@ -477,6 +477,18 @@ abstract class Catalog {
   def cacheTable(tableName: String): Unit
 
   /**
+   * Caches the specified table with the given storage level.
+   *
+   * @param tableName is either a qualified or unqualified name that 
designates a table/view.
+   *                  If no database identifier is provided, it refers to a 
temporary view or
+   *                  a table/view in the current database.
+   * @param storageLevel storage level to cache table.
+   * @since 2.3.0
+   */
+  def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
+
+
+  /**
    * Removes the specified table from the in-memory cache.
    *
    * @param tableName is either a qualified or unqualified name that 
designates a table/view.

http://git-wip-us.apache.org/repos/asf/spark/blob/9064f1b0/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 0b8e538..e1049c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -30,6 +30,8 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import 
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.StorageLevel
+
 
 
 /**
@@ -420,6 +422,17 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   }
 
   /**
+   * Caches the specified table or view with the given storage level.
+   *
+   * @group cachemgmt
+   * @since 2.3.0
+   */
+  override def cacheTable(tableName: String, storageLevel: StorageLevel): Unit 
= {
+    sparkSession.sharedState.cacheManager.cacheQuery(
+      sparkSession.table(tableName), Some(tableName), storageLevel)
+  }
+
+  /**
    * Removes the specified table or view from the in-memory cache.
    *
    * @group cachemgmt

http://git-wip-us.apache.org/repos/asf/spark/blob/9064f1b0/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 8f9c52c..bc641fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.StorageLevel
 
 
 /**
@@ -535,4 +536,11 @@ class CatalogSuite
       .createTempView("fork_table", Range(1, 2, 3, 4), overrideIfExists = true)
     assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
   }
+
+  test("cacheTable with storage level") {
+    createTempTable("my_temp_table")
+    spark.catalog.cacheTable("my_temp_table", StorageLevel.DISK_ONLY)
+    assert(spark.table("my_temp_table").storageLevel == StorageLevel.DISK_ONLY)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to