Repository: spark
Updated Branches:
  refs/heads/master 17ce0b5b3 -> f85f29608


[SPARK-19024][SQL] Implement new approach to write a permanent view

## What changes were proposed in this pull request?

On CREATE/ALTER a view, it's no longer needed to generate a SQL text string 
from the LogicalPlan, instead we store the SQL query text、the output column 
names of the query plan, and current database to CatalogTable. Permanent views 
created by this approach can be resolved by current view resolution approach.

The main advantage includes:
1. If you update an underlying view, the current view also gets updated;
2. That gives us a change to get ride of SQL generation for operators.

Major changes of this PR:
1. Generate the view-specific properties(e.g. view default database, view query 
output column names) during permanent view creation and store them as 
properties in the CatalogTable;
2. Update the commands `CreateViewCommand` and `AlterViewAsCommand`, get rid of 
SQL generation from them.

## How was this patch tested?
Existing tests.

Author: jiangxingbo <jiangxb1...@gmail.com>

Closes #16613 from jiangxb1987/view-write-path.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f85f2960
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f85f2960
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f85f2960

Branch: refs/heads/master
Commit: f85f29608de801d7cacc779a77c8edaed8124acf
Parents: 17ce0b5
Author: jiangxingbo <jiangxb1...@gmail.com>
Authored: Wed Jan 18 19:13:01 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Jan 18 19:13:01 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  19 ---
 .../spark/sql/execution/command/views.scala     | 164 +++++++++++++------
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  10 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  29 ++--
 .../spark/sql/hive/execution/SQLViewSuite.scala |  16 +-
 5 files changed, 146 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f85f2960/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2adccdd..80d3282 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -223,25 +223,6 @@ case class CatalogTable(
     )
   }
 
-  /**
-   * Insert/Update the view query output column names in `properties`.
-   */
-  def withQueryColumnNames(columns: Seq[String]): CatalogTable = {
-    val props = new mutable.HashMap[String, String]
-    if (columns.nonEmpty) {
-      props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
-      columns.zipWithIndex.foreach { case (colName, index) =>
-        props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
-      }
-    }
-
-    // We can't use `filterKeys` here, as the map returned by `filterKeys` is 
not serializable,
-    // while `CatalogTable` should be serializable.
-    copy(properties = properties.filterNot { case (key, _) =>
-      key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
-    } ++ props)
-  }
-
   /** Syntactic sugar to update a field in `storage`. */
   def withNewStorage(
       locationUri: Option[String] = storage.locationUri,

http://git-wip-us.apache.org/repos/asf/spark/blob/f85f2960/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 154141b..3da4bcf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.execution.command
 
-import scala.util.control.NonFatal
+import scala.collection.mutable
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.Alias
@@ -64,9 +64,9 @@ object PersistedView extends ViewType
 
 
 /**
- * Create or replace a view with given query plan. This command will convert 
the query plan to
- * canonicalized SQL string, and store it as view text in metastore, if we 
need to create a
- * permanent view.
+ * Create or replace a view with given query plan. This command will generate 
some view-specific
+ * properties(e.g. view default database, view query output column names) and 
store them as
+ * properties in metastore, if we need to create a permanent view.
  *
  * @param name the name of this view.
  * @param userSpecifiedColumns the output column names and optional comments 
specified by users,
@@ -75,8 +75,8 @@ object PersistedView extends ViewType
  * @param properties the properties of this view.
  * @param originalText the original SQL text of this view, can be None if this 
view is created via
  *                     Dataset API.
- * @param child the logical plan that represents the view; this is used to 
generate a canonicalized
- *              version of the SQL that can be saved in the catalog.
+ * @param child the logical plan that represents the view; this is used to 
generate the logical
+ *              plan for temporary view and the view schema.
  * @param allowExisting if true, and if the view already exists, noop; if 
false, and if the view
  *                already exists, throws analysis exception.
  * @param replace if true, and if the view already exists, updates it; if 
false, and if the view
@@ -95,6 +95,8 @@ case class CreateViewCommand(
     viewType: ViewType)
   extends RunnableCommand {
 
+  import ViewHelper._
+
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
 
   if (viewType == PersistedView) {
@@ -137,22 +139,12 @@ case class CreateViewCommand(
     // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be 
resolved)
     verifyTemporaryObjectsNotExists(sparkSession)
 
-    val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
-      analyzedPlan
-    } else {
-      val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
-        case (attr, (colName, None)) => Alias(attr, colName)()
-        case (attr, (colName, Some(colComment))) =>
-          val meta = new MetadataBuilder().putString("comment", 
colComment).build()
-          Alias(attr, colName)(explicitMetadata = Some(meta))
-      }
-      sparkSession.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
-    }
-
     val catalog = sparkSession.sessionState.catalog
     if (viewType == LocalTempView) {
+      val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
       catalog.createTempView(name.table, aliasedPlan, overrideIfExists = 
replace)
     } else if (viewType == GlobalTempView) {
+      val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
       catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = 
replace)
     } else if (catalog.tableExists(name)) {
       val tableMetadata = catalog.getTableMetadata(name)
@@ -163,7 +155,7 @@ case class CreateViewCommand(
         throw new AnalysisException(s"$name is not a view")
       } else if (replace) {
         // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-        catalog.alterTable(prepareTable(sparkSession, aliasedPlan))
+        catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
       } else {
         // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the 
target view already
         // exists.
@@ -173,7 +165,7 @@ case class CreateViewCommand(
       }
     } else {
       // Create the view if it doesn't exist.
-      catalog.createTable(prepareTable(sparkSession, aliasedPlan), 
ignoreIfExists = false)
+      catalog.createTable(prepareTable(sparkSession, analyzedPlan), 
ignoreIfExists = false)
     }
     Seq.empty[Row]
   }
@@ -207,29 +199,44 @@ case class CreateViewCommand(
   }
 
   /**
-   * Returns a [[CatalogTable]] that can be used to save in the catalog. This 
comment canonicalize
-   * SQL based on the analyzed plan, and also creates the proper schema for 
the view.
+   * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user 
specified columns,
+   * else return the analyzed plan directly.
    */
-  private def prepareTable(sparkSession: SparkSession, aliasedPlan: 
LogicalPlan): CatalogTable = {
-    val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
-
-    // Validate the view SQL - make sure we can parse it and analyze it.
-    // If we cannot analyze the generated query, there is probably a bug in 
SQL generation.
-    try {
-      sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
-    } catch {
-      case NonFatal(e) =>
-        throw new RuntimeException(s"Failed to analyze the canonicalized SQL: 
$viewSQL", e)
+  private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan): 
LogicalPlan = {
+    if (userSpecifiedColumns.isEmpty) {
+      analyzedPlan
+    } else {
+      val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
+        case (attr, (colName, None)) => Alias(attr, colName)()
+        case (attr, (colName, Some(colComment))) =>
+          val meta = new MetadataBuilder().putString("comment", 
colComment).build()
+          Alias(attr, colName)(explicitMetadata = Some(meta))
+      }
+      session.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
     }
+  }
+
+  /**
+   * Returns a [[CatalogTable]] that can be used to save in the catalog. 
Generate the view-specific
+   * properties(e.g. view default database, view query output column names) 
and store them as
+   * properties in the CatalogTable, and also creates the proper schema for 
the view.
+   */
+  private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): 
CatalogTable = {
+    if (originalText.isEmpty) {
+      throw new AnalysisException(
+        "It is not allowed to create a persisted view from the Dataset API")
+    }
+
+    val newProperties = generateViewProperties(properties, session, 
analyzedPlan)
 
     CatalogTable(
       identifier = name,
       tableType = CatalogTableType.VIEW,
       storage = CatalogStorageFormat.empty,
-      schema = aliasedPlan.schema,
-      properties = properties,
+      schema = aliasPlan(session, analyzedPlan).schema,
+      properties = newProperties,
       viewOriginalText = originalText,
-      viewText = Some(viewSQL),
+      viewText = originalText,
       comment = comment
     )
   }
@@ -244,14 +251,16 @@ case class CreateViewCommand(
  * @param name the name of this view.
  * @param originalText the original SQL text of this view. Note that we can 
only alter a view by
  *                     SQL API, which means we always have originalText.
- * @param query the logical plan that represents the view; this is used to 
generate a canonicalized
- *              version of the SQL that can be saved in the catalog.
+ * @param query the logical plan that represents the view; this is used to 
generate the new view
+ *              schema.
  */
 case class AlterViewAsCommand(
     name: TableIdentifier,
     originalText: String,
     query: LogicalPlan) extends RunnableCommand {
 
+  import ViewHelper._
+
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
   override def run(session: SparkSession): Seq[Row] = {
@@ -275,21 +284,80 @@ case class AlterViewAsCommand(
       throw new AnalysisException(s"${viewMeta.identifier} is not a view.")
     }
 
-    val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL
-    // Validate the view SQL - make sure we can parse it and analyze it.
-    // If we cannot analyze the generated query, there is probably a bug in 
SQL generation.
-    try {
-      session.sql(viewSQL).queryExecution.assertAnalyzed()
-    } catch {
-      case NonFatal(e) =>
-        throw new RuntimeException(s"Failed to analyze the canonicalized SQL: 
$viewSQL", e)
-    }
+    val newProperties = generateViewProperties(viewMeta.properties, session, 
analyzedPlan)
 
     val updatedViewMeta = viewMeta.copy(
       schema = analyzedPlan.schema,
+      properties = newProperties,
       viewOriginalText = Some(originalText),
-      viewText = Some(viewSQL))
+      viewText = Some(originalText))
 
     session.sessionState.catalog.alterTable(updatedViewMeta)
   }
 }
+
+object ViewHelper {
+
+  import CatalogTable._
+
+  /**
+   * Generate the view default database in `properties`.
+   */
+  private def generateViewDefaultDatabase(databaseName: String): Map[String, 
String] = {
+    Map(VIEW_DEFAULT_DATABASE -> databaseName)
+  }
+
+  /**
+   * Generate the view query output column names in `properties`.
+   */
+  private def generateQueryColumnNames(columns: Seq[String]): Map[String, 
String] = {
+    val props = new mutable.HashMap[String, String]
+    if (columns.nonEmpty) {
+      props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
+      columns.zipWithIndex.foreach { case (colName, index) =>
+        props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
+      }
+    }
+    props.toMap
+  }
+
+  /**
+   * Remove the view query output column names in `properties`.
+   */
+  private def removeQueryColumnNames(properties: Map[String, String]): 
Map[String, String] = {
+    // We can't use `filterKeys` here, as the map returned by `filterKeys` is 
not serializable,
+    // while `CatalogTable` should be serializable.
+    properties.filterNot { case (key, _) =>
+      key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
+    }
+  }
+
+  /**
+   * Generate the view properties in CatalogTable, including:
+   * 1. view default database that is used to provide the default database 
name on view resolution.
+   * 2. the output column names of the query that creates a view, this is used 
to map the output of
+   *    the view child to the view output during view resolution.
+   *
+   * @param properties the `properties` in CatalogTable.
+   * @param session the spark session.
+   * @param analyzedPlan the analyzed logical plan that represents the child 
of a view.
+   * @return new view properties including view default database and query 
column names properties.
+   */
+  def generateViewProperties(
+      properties: Map[String, String],
+      session: SparkSession,
+      analyzedPlan: LogicalPlan): Map[String, String] = {
+    // Generate the query column names, throw an AnalysisException if there 
exists duplicate column
+    // names.
+    val queryOutput = analyzedPlan.schema.fieldNames
+    assert(queryOutput.distinct.size == queryOutput.size,
+      s"The view output ${queryOutput.mkString("(", ",", ")")} contains 
duplicate column name.")
+
+    // Generate the view default database name.
+    val viewDefaultDatabase = session.sessionState.catalog.getCurrentDatabase
+
+    removeQueryColumnNames(properties) ++
+      generateViewDefaultDatabase(viewDefaultDatabase) ++
+      generateQueryColumnNames(queryOutput)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f85f2960/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 605dec4..10607b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2501,11 +2501,15 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("should be able to resolve a persistent view") {
-    withTable("t1") {
+    withTable("t1", "t2") {
       withView("v1") {
         sql("CREATE TABLE `t1` USING parquet AS SELECT * FROM VALUES(1, 1) AS 
t1(a, b)")
-        sql("CREATE VIEW `v1` AS SELECT * FROM t1")
-        checkAnswer(spark.table("v1"), Row(1, 1))
+        sql("CREATE TABLE `t2` USING parquet AS SELECT * FROM VALUES('a', 2, 
1.0) AS t2(d, e, f)")
+        sql("CREATE VIEW `v1`(x, y) AS SELECT * FROM t1")
+        checkAnswer(spark.table("v1").orderBy("x"), Row(1, 1))
+
+        sql("ALTER VIEW `v1` AS SELECT * FROM t2")
+        checkAnswer(spark.table("v1").orderBy("f"), Row("a", 2, 1.0))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f85f2960/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 882a184..edef308 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -381,28 +381,30 @@ class HiveDDLSuite
       spark.range(10).write.saveAsTable(tabName)
       val viewName = "view1"
       withView(viewName) {
-        val catalog = spark.sessionState.catalog
+        def checkProperties(expected: Map[String, String]): Boolean = {
+          val properties = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(viewName))
+            .properties
+          properties.filterNot { case (key, value) =>
+            Seq("transient_lastDdlTime", 
CatalogTable.VIEW_DEFAULT_DATABASE).contains(key) ||
+              key.startsWith(CatalogTable.VIEW_QUERY_OUTPUT_PREFIX)
+          } == expected
+        }
         sql(s"CREATE VIEW $viewName AS SELECT * FROM $tabName")
 
-        assert(catalog.getTableMetadata(TableIdentifier(viewName))
-          .properties.filter(_._1 != "transient_lastDdlTime") == Map())
+        checkProperties(Map())
         sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')")
-        assert(catalog.getTableMetadata(TableIdentifier(viewName))
-          .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> 
"an"))
+        checkProperties(Map("p" -> "an"))
 
         // no exception or message will be issued if we set it again
         sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')")
-        assert(catalog.getTableMetadata(TableIdentifier(viewName))
-          .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> 
"an"))
+        checkProperties(Map("p" -> "an"))
 
         // the value will be updated if we set the same key to a different 
value
         sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'b')")
-        assert(catalog.getTableMetadata(TableIdentifier(viewName))
-          .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> 
"b"))
+        checkProperties(Map("p" -> "b"))
 
         sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
-        assert(catalog.getTableMetadata(TableIdentifier(viewName))
-          .properties.filter(_._1 != "transient_lastDdlTime") == Map())
+        checkProperties(Map())
 
         val message = intercept[AnalysisException] {
           sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
@@ -655,10 +657,7 @@ class HiveDDLSuite
           Seq(
             Row("# View Information", "", ""),
             Row("View Original Text:", "SELECT * FROM tbl", ""),
-            Row("View Expanded Text:",
-              "SELECT `gen_attr_0` AS `a` FROM (SELECT `gen_attr_0` FROM " +
-              "(SELECT `a` AS `gen_attr_0` FROM `default`.`tbl`) AS 
gen_subquery_0) AS tbl",
-              "")
+            Row("View Expanded Text:", "SELECT * FROM tbl", "")
           )
         ))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/f85f2960/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 9bc078d..2658e2c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -222,13 +222,15 @@ class SQLViewSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
   }
 
   test("correctly parse CREATE VIEW statement") {
-    sql(
-      """CREATE VIEW IF NOT EXISTS
-        |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
-        |TBLPROPERTIES ('a' = 'b')
-        |AS SELECT * FROM jt""".stripMargin)
-    checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i 
=> Row(i, i)))
-    sql("DROP VIEW testView")
+    withView("testView") {
+      sql(
+        """CREATE VIEW IF NOT EXISTS
+          |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+          |TBLPROPERTIES ('a' = 'b')
+          |AS SELECT * FROM jt
+          |""".stripMargin)
+      checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 
9).map(i => Row(i, i)))
+    }
   }
 
   test("correctly parse CREATE TEMPORARY VIEW statement") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to