Repository: spark Updated Branches: refs/heads/master a28728a9a -> 88661747f
[SPARK-22018][SQL] Preserve top-level alias metadata when collapsing projects ## What changes were proposed in this pull request? If there are two projects like as follows. ``` Project [a_with_metadata#27 AS b#26] +- Project [a#0 AS a_with_metadata#27] +- LocalRelation <empty>, [a#0, b#1] ``` Child Project has an output column with a metadata in it, and the parent Project has an alias that implicitly forwards the metadata. So this metadata is visible for higher operators. Upon applying CollapseProject optimizer rule, the metadata is not preserved. ``` Project [a#0 AS b#26] +- LocalRelation <empty>, [a#0, b#1] ``` This is incorrect, as downstream operators that expect certain metadata (e.g. watermark in structured streaming) to identify certain fields will fail to do so. This PR fixes it by preserving the metadata of top-level aliases. ## How was this patch tested? New unit test Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #19240 from tdas/SPARK-22018. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88661747 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88661747 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88661747 Branch: refs/heads/master Commit: 88661747f506e73c79de36711daebb0330de7b0d Parents: a28728a Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Thu Sep 14 22:32:16 2017 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Thu Sep 14 22:32:16 2017 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Analyzer.scala | 5 ++++- .../optimizer/CollapseProjectSuite.scala | 23 ++++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/88661747/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0880bd6..db276fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2256,7 +2256,10 @@ object CleanupAliases extends Rule[LogicalPlan] { def trimNonTopLevelAliases(e: Expression): Expression = e match { case a: Alias => - a.withNewChildren(trimAliases(a.child) :: Nil) + a.copy(child = trimAliases(a.child))( + exprId = a.exprId, + qualifier = a.qualifier, + explicitMetadata = Some(a.metadata)) case other => trimAliases(other) } http://git-wip-us.apache.org/repos/asf/spark/blob/88661747/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala index 587437e..e7a5bce 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.Rand +import org.apache.spark.sql.catalyst.expressions.{Alias, Rand} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.MetadataBuilder class CollapseProjectSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { @@ -119,4 +120,22 @@ class CollapseProjectSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + test("preserve top-level alias metadata while collapsing projects") { + def hasMetadata(logicalPlan: LogicalPlan): Boolean = { + logicalPlan.asInstanceOf[Project].projectList.exists(_.metadata.contains("key")) + } + + val metadata = new MetadataBuilder().putLong("key", 1).build() + val analyzed = + Project(Seq(Alias('a_with_metadata, "b")()), + Project(Seq(Alias('a, "a_with_metadata")(explicitMetadata = Some(metadata))), + testRelation.logicalPlan)).analyze + require(hasMetadata(analyzed)) + + val optimized = Optimize.execute(analyzed) + val projects = optimized.collect { case p: Project => p } + assert(projects.size === 1) + assert(hasMetadata(optimized)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org