Repository: spark
Updated Branches:
  refs/heads/master ada310a9d -> f5c418da0


[SQL] SPARK-1372 Support for caching and uncaching tables in a SQLContext.

This doesn't yet support different databases in Hive (though you can probably 
workaround this by calling `USE <dbname>`).  However, given the time 
constraints for 1.0 I think its probably worth including this now and extending 
the functionality in the next release.

Author: Michael Armbrust <mich...@databricks.com>

Closes #282 from marmbrus/cacheTables and squashes the following commits:

83785db [Michael Armbrust] Support for caching and uncaching tables in a 
SQLContext.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5c418da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5c418da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5c418da

Branch: refs/heads/master
Commit: f5c418da044ef7f3d7185cc5bb1bef79d7f4e25c
Parents: ada310a
Author: Michael Armbrust <mich...@databricks.com>
Authored: Tue Apr 1 14:45:44 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Tue Apr 1 14:45:44 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Catalog.scala   | 11 +++-
 .../scala/org/apache/spark/sql/SQLContext.scala | 32 +++++++++-
 .../org/apache/spark/sql/CachedTableSuite.scala | 61 ++++++++++++++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  7 +++
 ...hed table-0-ce3797dc14a603cba2a5e58c8612de5b |  1 +
 ...hed table-0-ce3797dc14a603cba2a5e58c8612de5b |  1 +
 .../spark/sql/hive/CachedTableSuite.scala       | 58 +++++++++++++++++++
 7 files changed, 169 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5c418da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index e09182d..6b58b93 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -31,6 +31,7 @@ trait Catalog {
     alias: Option[String] = None): LogicalPlan
 
   def registerTable(databaseName: Option[String], tableName: String, plan: 
LogicalPlan): Unit
+  def unregisterTable(databaseName: Option[String], tableName: String): Unit
 }
 
 class SimpleCatalog extends Catalog {
@@ -40,7 +41,7 @@ class SimpleCatalog extends Catalog {
     tables += ((tableName, plan))
   }
 
-  def dropTable(tableName: String) = tables -= tableName
+  def unregisterTable(databaseName: Option[String], tableName: String) = { 
tables -= tableName }
 
   def lookupRelation(
       databaseName: Option[String],
@@ -87,6 +88,10 @@ trait OverrideCatalog extends Catalog {
       plan: LogicalPlan): Unit = {
     overrides.put((databaseName, tableName), plan)
   }
+
+  override def unregisterTable(databaseName: Option[String], tableName: 
String): Unit = {
+    overrides.remove((databaseName, tableName))
+  }
 }
 
 /**
@@ -104,4 +109,8 @@ object EmptyCatalog extends Catalog {
   def registerTable(databaseName: Option[String], tableName: String, plan: 
LogicalPlan): Unit = {
     throw new UnsupportedOperationException
   }
+
+  def unregisterTable(databaseName: Option[String], tableName: String): Unit = 
{
+    throw new UnsupportedOperationException
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f5c418da/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f950ea0..69bbbdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -26,8 +26,9 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.dsl
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
 import org.apache.spark.sql.execution._
 
 /**
@@ -111,6 +112,35 @@ class SQLContext(@transient val sparkContext: SparkContext)
     result
   }
 
+  /** Returns the specified table as a SchemaRDD */
+  def table(tableName: String): SchemaRDD =
+    new SchemaRDD(this, catalog.lookupRelation(None, tableName))
+
+  /** Caches the specified table in-memory. */
+  def cacheTable(tableName: String): Unit = {
+    val currentTable = catalog.lookupRelation(None, tableName)
+    val asInMemoryRelation =
+      InMemoryColumnarTableScan(currentTable.output, 
executePlan(currentTable).executedPlan)
+
+    catalog.registerTable(None, tableName, 
SparkLogicalPlan(asInMemoryRelation))
+  }
+
+  /** Removes the specified table from the in-memory cache. */
+  def uncacheTable(tableName: String): Unit = {
+    EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
+      // This is kind of a hack to make sure that if this was just an RDD 
registered as a table,
+      // we reregister the RDD as a table.
+      case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: 
ExistingRdd)) =>
+        inMem.cachedColumnBuffers.unpersist()
+        catalog.unregisterTable(None, tableName)
+        catalog.registerTable(None, tableName, SparkLogicalPlan(e))
+      case SparkLogicalPlan(inMem: InMemoryColumnarTableScan) =>
+        inMem.cachedColumnBuffers.unpersist()
+        catalog.unregisterTable(None, tableName)
+      case plan => throw new IllegalArgumentException(s"Table $tableName is 
not cached: $plan")
+    }
+  }
+
   protected[sql] class SparkPlanner extends SparkStrategies {
     val sparkContext = self.sparkContext
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f5c418da/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
new file mode 100644
index 0000000..e5902c3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.FunSuite
+import org.apache.spark.sql.TestData._
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+
+class CachedTableSuite extends QueryTest {
+  TestData // Load test tables.
+
+  test("read from cached table and uncache") {
+    TestSQLContext.cacheTable("testData")
+
+    checkAnswer(
+      TestSQLContext.table("testData"),
+      testData.collect().toSeq
+    )
+
+    TestSQLContext.table("testData").queryExecution.analyzed match {
+      case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found 
evidence of caching
+      case noCache => fail(s"No cache node found in plan $noCache")
+    }
+
+    TestSQLContext.uncacheTable("testData")
+
+    checkAnswer(
+      TestSQLContext.table("testData"),
+      testData.collect().toSeq
+    )
+
+    TestSQLContext.table("testData").queryExecution.analyzed match {
+      case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
+        fail(s"Table still cached after uncache: $cachePlan")
+      case noCache => // Table uncached successfully
+    }
+  }
+
+  test("correct error on uncache of non-cached table") {
+    intercept[IllegalArgumentException] {
+      TestSQLContext.uncacheTable("testData")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f5c418da/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4f83536..29834a1 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -141,6 +141,13 @@ class HiveMetastoreCatalog(hive: HiveContext) extends 
Catalog with Logging {
    */
   override def registerTable(
       databaseName: Option[String], tableName: String, plan: LogicalPlan): 
Unit = ???
+
+  /**
+   * UNIMPLEMENTED: It needs to be decided how we will persist in-memory 
tables to the metastore.
+   * For now, if this functionality is desired mix in the in-memory 
[[OverrideCatalog]].
+   */
+  override def unregisterTable(
+      databaseName: Option[String], tableName: String): Unit = ???
 }
 
 object HiveMetastoreTypes extends RegexParsers {

http://git-wip-us.apache.org/repos/asf/spark/blob/f5c418da/sql/hive/src/test/resources/golden/read
 from cached table-0-ce3797dc14a603cba2a5e58c8612de5b
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/read from cached 
table-0-ce3797dc14a603cba2a5e58c8612de5b 
b/sql/hive/src/test/resources/golden/read from cached 
table-0-ce3797dc14a603cba2a5e58c8612de5b
new file mode 100644
index 0000000..60878ff
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/read from cached 
table-0-ce3797dc14a603cba2a5e58c8612de5b      
@@ -0,0 +1 @@
+238    val_238

http://git-wip-us.apache.org/repos/asf/spark/blob/f5c418da/sql/hive/src/test/resources/golden/read
 from uncached table-0-ce3797dc14a603cba2a5e58c8612de5b
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/read from uncached 
table-0-ce3797dc14a603cba2a5e58c8612de5b 
b/sql/hive/src/test/resources/golden/read from uncached 
table-0-ce3797dc14a603cba2a5e58c8612de5b
new file mode 100644
index 0000000..60878ff
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/read from uncached 
table-0-ce3797dc14a603cba2a5e58c8612de5b    
@@ -0,0 +1 @@
+238    val_238

http://git-wip-us.apache.org/repos/asf/spark/blob/f5c418da/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
new file mode 100644
index 0000000..68d45e5
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.hive.execution.HiveComparisonTest
+
+class CachedTableSuite extends HiveComparisonTest {
+  TestHive.loadTestTable("src")
+
+  test("cache table") {
+    TestHive.cacheTable("src")
+  }
+
+  createQueryTest("read from cached table",
+    "SELECT * FROM src LIMIT 1")
+
+  test("check that table is cached and uncache") {
+    TestHive.table("src").queryExecution.analyzed match {
+      case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found 
evidence of caching
+      case noCache => fail(s"No cache node found in plan $noCache")
+    }
+    TestHive.uncacheTable("src")
+  }
+
+  createQueryTest("read from uncached table",
+    "SELECT * FROM src LIMIT 1")
+
+  test("make sure table is uncached") {
+    TestHive.table("src").queryExecution.analyzed match {
+      case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
+        fail(s"Table still cached after uncache: $cachePlan")
+      case noCache => // Table uncached successfully
+    }
+  }
+
+  test("correct error on uncache of non-cached table") {
+    intercept[IllegalArgumentException] {
+      TestHive.uncacheTable("src")
+    }
+  }
+}

Reply via email to