Repository: spark
Updated Branches:
  refs/heads/master b536d5dc6 -> 43dac2c88


[SPARK-6941] [SQL] Provide a better error message to when inserting into RDD 
based table

JIRA: https://issues.apache.org/jira/browse/SPARK-6941

Author: Yijie Shen <henry.yijies...@gmail.com>

Closes #7342 from yijieshen/SPARK-6941 and squashes the following commits:

f82cbe7 [Yijie Shen] reorder import
dd67e40 [Yijie Shen] resolve comments
09518af [Yijie Shen] fix import order in DataframeSuite
0c635d4 [Yijie Shen] make match more specific
9df388d [Yijie Shen] move check into PreWriteCheck
847ab20 [Yijie Shen] Detect insertion error in DataSourceStrategy


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43dac2c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43dac2c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43dac2c8

Branch: refs/heads/master
Commit: 43dac2c880d6f310a958531aee0bb4ac1d9b7025
Parents: b536d5d
Author: Yijie Shen <henry.yijies...@gmail.com>
Authored: Thu Jul 16 10:52:09 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Jul 16 10:52:09 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/sources/rules.scala    |  9 +++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 55 ++++++++++++++++++--
 2 files changed, 60 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/43dac2c8/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index a3fd7f1..40ee048 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException}
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
 import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.types.DataType
 
@@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =>
         // The relation in l is not an InsertableRelation.
         failAnalysis(s"$l does not allow insertion.")
 
+      case logical.InsertIntoTable(t, _, _, _, _) =>
+        if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || 
t.isInstanceOf[LocalRelation]) {
+          failAnalysis(s"Inserting into an RDD-based table is not allowed.")
+        } else {
+          // OK
+        }
+
       case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, 
query) =>
         // When the SaveMode is Overwrite, we need to check if the table is an 
input table of
         // the query. If so, we will throw an AnalysisException to let users 
know it is not allowed.

http://git-wip-us.apache.org/repos/asf/spark/blob/43dac2c8/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f592a99..23244fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,19 +17,23 @@
 
 package org.apache.spark.sql
 
+import java.io.File
+
 import scala.language.postfixOps
 
+import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint}
-
+import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}
 
-class DataFrameSuite extends QueryTest {
+class DataFrameSuite extends QueryTest with SQLTestUtils {
   import org.apache.spark.sql.TestData._
 
   lazy val ctx = org.apache.spark.sql.test.TestSQLContext
   import ctx.implicits._
 
+  def sqlContext: SQLContext = ctx
+
   test("analysis error should be eagerly reported") {
     val oldSetting = ctx.conf.dataFrameEagerAnalysis
     // Eager analysis.
@@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest {
     assert(f.getMessage.contains("column3"))
     assert(!f.getMessage.contains("column2"))
   }
+
+  test("SPARK-6941: Better error message for inserting into RDD-based Table") {
+    withTempDir { dir =>
+
+      val tempParquetFile = new File(dir, "tmp_parquet")
+      val tempJsonFile = new File(dir, "tmp_json")
+
+      val df = Seq(Tuple1(1)).toDF()
+      val insertion = Seq(Tuple1(2)).toDF("col")
+
+      // pass case: parquet table (HadoopFsRelation)
+      
df.write.mode(SaveMode.Overwrite).parquet(tempParquetFile.getCanonicalPath)
+      val pdf = ctx.read.parquet(tempParquetFile.getCanonicalPath)
+      pdf.registerTempTable("parquet_base")
+      insertion.write.insertInto("parquet_base")
+
+      // pass case: json table (InsertableRelation)
+      df.write.mode(SaveMode.Overwrite).json(tempJsonFile.getCanonicalPath)
+      val jdf = ctx.read.json(tempJsonFile.getCanonicalPath)
+      jdf.registerTempTable("json_base")
+      insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")
+
+      // error cases: insert into an RDD
+      df.registerTempTable("rdd_base")
+      val e1 = intercept[AnalysisException] {
+        insertion.write.insertInto("rdd_base")
+      }
+      assert(e1.getMessage.contains("Inserting into an RDD-based table is not 
allowed."))
+
+      // error case: insert into a logical plan that is not a LeafNode
+      val indirectDS = pdf.select("_1").filter($"_1" > 5)
+      indirectDS.registerTempTable("indirect_ds")
+      val e2 = intercept[AnalysisException] {
+        insertion.write.insertInto("indirect_ds")
+      }
+      assert(e2.getMessage.contains("Inserting into an RDD-based table is not 
allowed."))
+
+      // error case: insert into an OneRowRelation
+      new DataFrame(ctx, OneRowRelation).registerTempTable("one_row")
+      val e3 = intercept[AnalysisException] {
+        insertion.write.insertInto("one_row")
+      }
+      assert(e3.getMessage.contains("Inserting into an RDD-based table is not 
allowed."))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to