Repository: spark
Updated Branches:
  refs/heads/master 49f5b0ae4 -> fcfd5d0bb


[SPARK-19290][SQL] add a new extending interface in Analyzer for post-hoc 
resolution

## What changes were proposed in this pull request?

To implement DDL commands, we added several analyzer rules in sql/hive module 
to analyze DDL related plans. However, our `Analyzer` currently only have one 
extending interface: `extendedResolutionRules`, which defines extra rules that 
will be run together with other rules in the resolution batch, and doesn't fit 
DDL rules well, because:

1. DDL rules may do some checking and normalization, but we may do it many 
times as the resolution batch will run rules again and again, until fixed 
point, and it's hard to tell if a DDL rule has already done its checking and 
normalization. It's fine because DDL rules are idempotent, but it's bad for 
analysis performance
2. some DDL rules may depend on others, and it's pretty hard to write `if` 
conditions to guarantee the dependencies. It will be good if we have a batch 
which run rules in one pass, so that we can guarantee the dependencies by rules 
order.

This PR adds a new extending interface in `Analyzer`: `postHocResolutionRules`, 
which defines rules that will be run only once in a batch runs right after the 
resolution batch.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #16645 from cloud-fan/analyzer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fcfd5d0b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fcfd5d0b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fcfd5d0b

Branch: refs/heads/master
Commit: fcfd5d0bbaf2fb2437d0eb12e3eba1b52153997c
Parents: 49f5b0a
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Mon Jan 23 20:01:10 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Mon Jan 23 20:01:10 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  8 ++++++
 .../datasources/DataSourceStrategy.scala        | 25 ++--------------
 .../spark/sql/execution/datasources/rules.scala |  4 +--
 .../spark/sql/internal/SessionState.scala       |  8 ++++--
 .../spark/sql/hive/HiveSessionState.scala       | 10 ++++---
 .../apache/spark/sql/hive/HiveStrategies.scala  | 30 +++++---------------
 6 files changed, 30 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fcfd5d0b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 98851cb..cb56e94 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -106,6 +106,13 @@ class Analyzer(
    */
   val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil
 
+  /**
+   * Override to provide rules to do post-hoc resolution. Note that these 
rules will be executed
+   * in an individual batch. This batch is to run right after the normal 
resolution batch and
+   * execute its rules in one pass.
+   */
+  val postHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
+
   lazy val batches: Seq[Batch] = Seq(
     Batch("Substitution", fixedPoint,
       CTESubstitution,
@@ -139,6 +146,7 @@ class Analyzer(
       ResolveInlineTables ::
       TypeCoercion.typeCoercionRules ++
       extendedResolutionRules : _*),
+    Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
     Batch("View", Once,
       AliasViewChild(conf)),
     Batch("Nondeterministic", Once,

http://git-wip-us.apache.org/repos/asf/spark/blob/fcfd5d0b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 21b07ee..19db293 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -44,6 +44,8 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Replaces generic operations with specific variants that are designed to 
work with Spark
  * SQL Data Sources.
+ *
+ * Note that, this rule must be run after [[PreprocessTableInsertion]].
  */
 case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
 
@@ -127,30 +129,9 @@ case class DataSourceAnalysis(conf: CatalystConf) extends 
Rule[LogicalPlan] {
     projectList
   }
 
-  /**
-   * Returns true if the [[InsertIntoTable]] plan has already been 
preprocessed by analyzer rule
-   * [[PreprocessTableInsertion]]. It is important that this 
rule([[DataSourceAnalysis]]) has to
-   * be run after [[PreprocessTableInsertion]], to normalize the column names 
in partition spec and
-   * fix the schema mismatch by adding Cast.
-   */
-  private def hasBeenPreprocessed(
-      tableOutput: Seq[Attribute],
-      partSchema: StructType,
-      partSpec: Map[String, Option[String]],
-      query: LogicalPlan): Boolean = {
-    val partColNames = partSchema.map(_.name).toSet
-    query.resolved && partSpec.keys.forall(partColNames.contains) && {
-      val staticPartCols = partSpec.filter(_._2.isDefined).keySet
-      val expectedColumns = tableOutput.filterNot(a => 
staticPartCols.contains(a.name))
-      expectedColumns.toStructType.sameType(query.schema)
-    }
-  }
-
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case InsertIntoTable(
-        l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, 
overwrite, false)
-        if hasBeenPreprocessed(l.output, t.partitionSchema, parts, query) =>
-
+        l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, 
overwrite, false) =>
       // If the InsertIntoTable command is for a partitioned HadoopFsRelation 
and
       // the user has specified static partitions, we add a Project operator 
on top of the query
       // to include those constant column values in the query result.

http://git-wip-us.apache.org/repos/asf/spark/blob/fcfd5d0b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index f8c7fca..6888dec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import scala.util.control.NonFatal
-
 import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
@@ -383,7 +381,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case i @ InsertIntoTable(table, partition, child, _, _) if table.resolved 
&& child.resolved =>
+    case i @ InsertIntoTable(table, _, child, _, _) if table.resolved && 
child.resolved =>
       table match {
         case relation: CatalogRelation =>
           val metadata = relation.catalogTable

http://git-wip-us.apache.org/repos/asf/spark/blob/fcfd5d0b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 64ec62f..68b774b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -114,12 +114,14 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
   lazy val analyzer: Analyzer = {
     new Analyzer(catalog, conf) {
       override val extendedResolutionRules =
-        AnalyzeCreateTable(sparkSession) ::
-        PreprocessTableInsertion(conf) ::
         new FindDataSourceTable(sparkSession) ::
-        DataSourceAnalysis(conf) ::
         new ResolveDataSource(sparkSession) :: Nil
 
+      override val postHocResolutionRules =
+        AnalyzeCreateTable(sparkSession) ::
+        PreprocessTableInsertion(conf) ::
+        DataSourceAnalysis(conf) :: Nil
+
       override val extendedCheckRules =
         Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/fcfd5d0b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index d3cef6e..9fd03ef 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -62,15 +62,17 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
       override val extendedResolutionRules =
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
-        AnalyzeCreateTable(sparkSession) ::
-        PreprocessTableInsertion(conf) ::
-        DataSourceAnalysis(conf) ::
         new DetermineHiveSerde(conf) ::
-        new HiveAnalysis(sparkSession) ::
         new FindDataSourceTable(sparkSession) ::
         new FindHiveSerdeTable(sparkSession) ::
         new ResolveDataSource(sparkSession) :: Nil
 
+      override val postHocResolutionRules =
+        AnalyzeCreateTable(sparkSession) ::
+        PreprocessTableInsertion(conf) ::
+        DataSourceAnalysis(conf) ::
+        new HiveAnalysis(sparkSession) :: Nil
+
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fcfd5d0b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 838e6f4..6cde783 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -25,10 +25,9 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.CreateTable
+import org.apache.spark.sql.execution.datasources.{CreateTable, 
PreprocessTableInsertion}
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
-import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -78,10 +77,14 @@ class DetermineHiveSerde(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces generic operations with specific variants that are designed to 
work with Hive.
+ *
+ * Note that, this rule must be run after [[PreprocessTableInsertion]].
+ */
 class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, 
ifNotExists)
-        if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, 
partSpec, query) =>
+    case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, 
ifNotExists) =>
       InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
 
     case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
@@ -98,25 +101,6 @@ class HiveAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
         query,
         mode == SaveMode.Ignore)
   }
-
-  /**
-   * Returns true if the [[InsertIntoTable]] plan has already been 
preprocessed by analyzer rule
-   * [[PreprocessTableInsertion]]. It is important that this 
rule([[HiveAnalysis]]) has to
-   * be run after [[PreprocessTableInsertion]], to normalize the column names 
in partition spec and
-   * fix the schema mismatch by adding Cast.
-   */
-  private def hasBeenPreprocessed(
-      tableOutput: Seq[Attribute],
-      partSchema: StructType,
-      partSpec: Map[String, Option[String]],
-      query: LogicalPlan): Boolean = {
-    val partColNames = partSchema.map(_.name).toSet
-    query.resolved && partSpec.keys.forall(partColNames.contains) && {
-      val staticPartCols = partSpec.filter(_._2.isDefined).keySet
-      val expectedColumns = tableOutput.filterNot(a => 
staticPartCols.contains(a.name))
-      expectedColumns.toStructType.sameType(query.schema)
-    }
-  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to