This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 48d0007  [SPARK-34331][SQL] Speed up DS v2 metadata col resolution
48d0007 is described below

commit 48d0007c2fe49d91aab7939c6376cdd82a4f88e2
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Fri Feb 5 16:37:29 2021 +0800

    [SPARK-34331][SQL] Speed up DS v2 metadata col resolution
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up of https://github.com/apache/spark/pull/28027
    
    https://github.com/apache/spark/pull/28027 added a DS v2 API that allows 
data sources to produce metadata/hidden columns that can only be seen when it's 
explicitly selected. The way we integrate this API into Spark is:
    1. The v2 relation gets normal output and metadata output from the data 
source, and the metadata output is excluded from the plan output by default.
    2. column resolution can resolve `UnresolvedAttribute` with metadata 
columns, even if the child plan doesn't output metadata columns.
    3. An analyzer rule searches the query plan, trying to find a node that has 
missing inputs. If such node is found, transform the sub-plan of this node, and 
update the v2 relation to include the metadata output.
    
    The analyzer rule in step 3 brings a perf regression, for queries that do 
not read v2 tables at all. This rule will calculate `QueryPlan.inputSet` (which 
builds an `AttributeSet` from outputs of all children) and 
`QueryPlan.missingInput` (which does a set exclusion and creates a new 
`AttributeSet`) for every plan node in the query plan. In our benchmark, the 
TPCDS query compilation time gets increased by more than 10%
    
    This PR proposes a simple way to improve it: we add a special metadata 
entry to the metadata attribute, which allows us to quickly check if a plan 
needs to add metadata columns: we just check all the references of this plan, 
and see if the attribute contains the special metadata entry, instead of 
calculating `QueryPlan.missingInput`.
    
    This PR also fixes one bug: we should not change the final output schema of 
the plan, if we only use metadata columns in operators like filter, sort, etc.
    
    ### Why are the changes needed?
    
    Fix perf regression in SQL query compilation, and fix a bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Run `org.apache.spark.sql.TPCDSQuerySuite`, before this PR, 
`AddMetadataColumns` is the top 4 rule ranked by running time
    ```
    === Metrics of Analyzer/Optimizer Rules ===
    Total number of runs: 407641
    Total time: 47.257239779 seconds
    
    Rule                                  Effective Time / Total Time           
          Effective Runs / Total Runs
    
    OptimizeSubqueries                      4157690003 / 8485444626             
            49 / 2778
    Analyzer$ResolveAggregateFunctions      1238968711 / 3369351761             
            49 / 2141
    ColumnPruning                           660038236 / 2924755292              
            338 / 6391
    Analyzer$AddMetadataColumns             0 / 2918352992                      
            0 / 2151
    ```
    after this PR:
    ```
    Analyzer$AddMetadataColumns             0 / 122885629                       
            0 / 2151
    ```
    This rule is 20 times faster and is negligible to the total compilation 
time.
    
    This PR also add new tests to verify the bug fix.
    
    Closes #31440 from cloud-fan/metadata-col.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 989eb6884d77226ab4f494a4237e09aea54a032d)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 34 +++++++++++++++--
 .../datasources/v2/DataSourceV2Implicits.scala     | 14 +++++--
 .../apache/spark/sql/connector/InMemoryTable.scala |  4 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 44 +++++++++++++++++-----
 4 files changed, 77 insertions(+), 19 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 388b2f0..e9e8ba8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -952,11 +952,37 @@ class Analyzer(override val catalogManager: 
CatalogManager)
    * columns are not accidentally selected by *.
    */
   object AddMetadataColumns extends Rule[LogicalPlan] {
+    import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+
+    private def hasMetadataCol(plan: LogicalPlan): Boolean = {
+      plan.expressions.exists(_.find {
+        case a: Attribute => a.isMetadataCol
+        case _ => false
+      }.isDefined)
+    }
+
+    private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
+      case r: DataSourceV2Relation => r.withMetadataColumns()
+      case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
+    }
+
     def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
-      case node if node.resolved && node.children.nonEmpty && 
node.missingInput.nonEmpty =>
-        node resolveOperatorsUp {
-          case rel: DataSourceV2Relation =>
-            rel.withMetadataColumns()
+      case node if node.children.nonEmpty && node.resolved && 
hasMetadataCol(node) =>
+        val inputAttrs = AttributeSet(node.children.flatMap(_.output))
+        val metaCols = node.expressions.flatMap(_.collect {
+          case a: Attribute if a.isMetadataCol && !inputAttrs.contains(a) => a
+        })
+        if (metaCols.isEmpty) {
+          node
+        } else {
+          val newNode = addMetadataCol(node)
+          // We should not change the output schema of the plan. We should 
project away the extr
+          // metadata columns if necessary.
+          if (newNode.sameOutput(node)) {
+            newNode
+          } else {
+            Project(node.output, newNode)
+          }
         }
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
index 8d91ea7..4326c73 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
@@ -21,12 +21,14 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, 
ResolvedPartitionSpec, UnresolvedPartitionSpec}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, 
SupportsRead, SupportsWrite, Table, TableCapability}
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 object DataSourceV2Implicits {
+  private val METADATA_COL_ATTR_KEY = "__metadata_col"
+
   implicit class TableHelper(table: Table) {
     def asReadable: SupportsRead = {
       table match {
@@ -83,7 +85,8 @@ object DataSourceV2Implicits {
   implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn]) {
     def asStruct: StructType = {
       val fields = metadata.map { metaCol =>
-        val field = StructField(metaCol.name, metaCol.dataType, 
metaCol.isNullable)
+        val fieldMeta = new 
MetadataBuilder().putBoolean(METADATA_COL_ATTR_KEY, true).build()
+        val field = StructField(metaCol.name, metaCol.dataType, 
metaCol.isNullable, fieldMeta)
         Option(metaCol.comment).map(field.withComment).getOrElse(field)
       }
       StructType(fields)
@@ -92,6 +95,11 @@ object DataSourceV2Implicits {
     def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
   }
 
+  implicit class MetadataColumnHelper(attr: Attribute) {
+    def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) 
&&
+      attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)
+  }
+
   implicit class OptionsHelper(options: Map[String, String]) {
     def asOptions: CaseInsensitiveStringMap = {
       new CaseInsensitiveStringMap(options.asJava)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index 257c380..508d793 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write._
 import 
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, 
StreamingWrite}
 import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, 
IsNotNull, IsNull}
-import org.apache.spark.sql.types.{DataType, DateType, StringType, 
StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DateType, IntegerType, 
StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -58,7 +58,7 @@ class InMemoryTable(
 
   private object IndexColumn extends MetadataColumn {
     override def name: String = "index"
-    override def dataType: DataType = StringType
+    override def dataType: DataType = IntegerType
     override def comment: String = "Metadata column used to conflict with a 
data column"
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 42d92b1..0e12eba 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -142,7 +142,7 @@ class DataSourceV2SQLSuite
       Array("Part 0", "id", ""),
       Array("", "", ""),
       Array("# Metadata Columns", "", ""),
-      Array("index", "string", "Metadata column used to conflict with a data 
column"),
+      Array("index", "int", "Metadata column used to conflict with a data 
column"),
       Array("_partition", "string", "Partition key used to store the row"),
       Array("", "", ""),
       Array("# Detailed Table Information", "", ""),
@@ -2593,9 +2593,12 @@ class DataSourceV2SQLSuite
           "PARTITIONED BY (bucket(4, id), id)")
       sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
 
-      checkAnswer(
-        spark.sql(s"SELECT id, data, _partition FROM $t1"),
-        Seq(Row(1, "a", "3/1"), Row(2, "b", "0/2"), Row(3, "c", "1/3")))
+      val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1")
+      val dfQuery = spark.table(t1).select("id", "data", "index", "_partition")
+
+      Seq(sqlQuery, dfQuery).foreach { query =>
+        checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), 
Row(3, "c", 0, "1/3")))
+      }
     }
   }
 
@@ -2606,9 +2609,12 @@ class DataSourceV2SQLSuite
           "PARTITIONED BY (bucket(4, index), index)")
       sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')")
 
-      checkAnswer(
-        spark.sql(s"SELECT index, data, _partition FROM $t1"),
-        Seq(Row(3, "c", "1/3"), Row(2, "b", "0/2"), Row(1, "a", "3/1")))
+      val sqlQuery = spark.sql(s"SELECT index, data, _partition FROM $t1")
+      val dfQuery = spark.table(t1).select("index", "data", "_partition")
+
+      Seq(sqlQuery, dfQuery).foreach { query =>
+        checkAnswer(query, Seq(Row(3, "c", "1/3"), Row(2, "b", "0/2"), Row(1, 
"a", "3/1")))
+      }
     }
   }
 
@@ -2619,9 +2625,27 @@ class DataSourceV2SQLSuite
           "PARTITIONED BY (bucket(4, id), id)")
       sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')")
 
-      checkAnswer(
-        spark.sql(s"SELECT * FROM $t1"),
-        Seq(Row(3, "c"), Row(2, "b"), Row(1, "a")))
+      val sqlQuery = spark.sql(s"SELECT * FROM $t1")
+      val dfQuery = spark.table(t1)
+
+      Seq(sqlQuery, dfQuery).foreach { query =>
+        checkAnswer(query, Seq(Row(3, "c"), Row(2, "b"), Row(1, "a")))
+      }
+    }
+  }
+
+  test("SPARK-31255: metadata column should only be produced when necessary") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+        "PARTITIONED BY (bucket(4, id), id)")
+
+      val sqlQuery = spark.sql(s"SELECT * FROM $t1 WHERE index = 0")
+      val dfQuery = spark.table(t1).filter("index = 0")
+
+      Seq(sqlQuery, dfQuery).foreach { query =>
+        assert(query.schema.fieldNames.toSeq == Seq("id", "data"))
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to