Repository: spark Updated Branches: refs/heads/master 5773ab121 -> 9064f1b04
[SPARK-20495][SQL][CORE] Add StorageLevel to cacheTable API ## What changes were proposed in this pull request? Currently cacheTable API only supports MEMORY_AND_DISK. This PR adds additional API to take different storage levels. ## How was this patch tested? unit tests Author: madhu <phatak....@gmail.com> Closes #17802 from phatak-dev/cacheTableAPI. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9064f1b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9064f1b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9064f1b0 Branch: refs/heads/master Commit: 9064f1b04461513a147aeb8179471b05595ddbc4 Parents: 5773ab1 Author: madhu <phatak....@gmail.com> Authored: Fri May 5 22:44:03 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri May 5 22:44:03 2017 +0800 ---------------------------------------------------------------------- project/MimaExcludes.scala | 2 ++ .../scala/org/apache/spark/sql/catalog/Catalog.scala | 14 +++++++++++++- .../org/apache/spark/sql/internal/CatalogImpl.scala | 13 +++++++++++++ .../org/apache/spark/sql/internal/CatalogSuite.scala | 8 ++++++++ 4 files changed, 36 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9064f1b0/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dbf933f..d50882c 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,8 @@ object MimaExcludes { // Exclude rules for 2.3.x lazy val v23excludes = v22excludes ++ Seq( + // [SPARK-20495][SQL] Add StorageLevel to cacheTable API + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable") ) // Exclude rules for 2.2.x http://git-wip-us.apache.org/repos/asf/spark/blob/9064f1b0/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 7e5da01..ab81725 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset} import org.apache.spark.sql.types.StructType - +import org.apache.spark.storage.StorageLevel /** * Catalog interface for Spark. To access this, use `SparkSession.catalog`. @@ -477,6 +477,18 @@ abstract class Catalog { def cacheTable(tableName: String): Unit /** + * Caches the specified table with the given storage level. + * + * @param tableName is either a qualified or unqualified name that designates a table/view. + * If no database identifier is provided, it refers to a temporary view or + * a table/view in the current database. + * @param storageLevel storage level to cache table. + * @since 2.3.0 + */ + def cacheTable(tableName: String, storageLevel: StorageLevel): Unit + + + /** * Removes the specified table from the in-memory cache. * * @param tableName is either a qualified or unqualified name that designates a table/view. http://git-wip-us.apache.org/repos/asf/spark/blob/9064f1b0/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 0b8e538..e1049c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} import org.apache.spark.sql.types.StructType +import org.apache.spark.storage.StorageLevel + /** @@ -420,6 +422,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** + * Caches the specified table or view with the given storage level. + * + * @group cachemgmt + * @since 2.3.0 + */ + override def cacheTable(tableName: String, storageLevel: StorageLevel): Unit = { + sparkSession.sharedState.cacheManager.cacheQuery( + sparkSession.table(tableName), Some(tableName), storageLevel) + } + + /** * Removes the specified table or view from the in-memory cache. * * @group cachemgmt http://git-wip-us.apache.org/repos/asf/spark/blob/9064f1b0/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 8f9c52c..bc641fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.StructType +import org.apache.spark.storage.StorageLevel /** @@ -535,4 +536,11 @@ class CatalogSuite .createTempView("fork_table", Range(1, 2, 3, 4), overrideIfExists = true) assert(spark.catalog.listTables().collect().map(_.name).toSet == Set()) } + + test("cacheTable with storage level") { + createTempTable("my_temp_table") + spark.catalog.cacheTable("my_temp_table", StorageLevel.DISK_ONLY) + assert(spark.table("my_temp_table").storageLevel == StorageLevel.DISK_ONLY) + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org