This is an automated email from the ASF dual-hosted git repository. tdas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d2bca8f [SPARK-30609] Allow default merge command resolution to be bypassed by DSv2 tables d2bca8f is described below commit d2bca8ff70e6c82e915f633bb9f2f8a4582f7026 Author: Tathagata Das <tathagata.das1...@gmail.com> AuthorDate: Wed Jan 22 19:20:25 2020 -0800 [SPARK-30609] Allow default merge command resolution to be bypassed by DSv2 tables ### What changes were proposed in this pull request? Skip resolving the merge expressions if the target is a DSv2 table with ACCEPT_ANY_SCHEMA capability. ### Why are the changes needed? Some DSv2 sources may want to customize the merge resolution logic. For example, a table that can accept any schema (TableCapability.ACCEPT_ANY_SCHEMA) may want to allow certain merge queries that are blocked (that is, throws AnalysisError) by the default resolution logic. So there should be a way to completely bypass the merge resolution logic in the Analyzer. ### Does this PR introduce any user-facing change? No, since merge itself is an unreleased feature ### How was this patch tested? added unit test to specifically test the skipping. Closes #27326 from tdas/SPARK-30609. Authored-by: Tathagata Das <tathagata.das1...@gmail.com> Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 62 +++++++++++++--------- .../execution/command/PlanResolutionSuite.scala | 52 +++++++++++++++++- 2 files changed, 86 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7e9f85b..503dab1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1326,33 +1326,43 @@ class Analyzer( case m @ MergeIntoTable(targetTable, sourceTable, _, _, _) if !m.resolved && targetTable.resolved && sourceTable.resolved => - val newMatchedActions = m.matchedActions.map { - case DeleteAction(deleteCondition) => - val resolvedDeleteCondition = deleteCondition.map(resolveExpressionTopDown(_, m)) - DeleteAction(resolvedDeleteCondition) - case UpdateAction(updateCondition, assignments) => - val resolvedUpdateCondition = updateCondition.map(resolveExpressionTopDown(_, m)) - // The update value can access columns from both target and source tables. - UpdateAction( - resolvedUpdateCondition, - resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false)) - case o => o - } - val newNotMatchedActions = m.notMatchedActions.map { - case InsertAction(insertCondition, assignments) => - // The insert action is used when not matched, so its condition and value can only - // access columns from the source table. - val resolvedInsertCondition = - insertCondition.map(resolveExpressionTopDown(_, Project(Nil, m.sourceTable))) - InsertAction( - resolvedInsertCondition, - resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true)) - case o => o + + EliminateSubqueryAliases(targetTable) match { + case r: NamedRelation if r.skipSchemaResolution => + // Do not resolve the expression if the target table accepts any schema. + // This allows data sources to customize their own resolution logic using + // custom resolution rules. + m + + case _ => + val newMatchedActions = m.matchedActions.map { + case DeleteAction(deleteCondition) => + val resolvedDeleteCondition = deleteCondition.map(resolveExpressionTopDown(_, m)) + DeleteAction(resolvedDeleteCondition) + case UpdateAction(updateCondition, assignments) => + val resolvedUpdateCondition = updateCondition.map(resolveExpressionTopDown(_, m)) + // The update value can access columns from both target and source tables. + UpdateAction( + resolvedUpdateCondition, + resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false)) + case o => o + } + val newNotMatchedActions = m.notMatchedActions.map { + case InsertAction(insertCondition, assignments) => + // The insert action is used when not matched, so its condition and value can only + // access columns from the source table. + val resolvedInsertCondition = + insertCondition.map(resolveExpressionTopDown(_, Project(Nil, m.sourceTable))) + InsertAction( + resolvedInsertCondition, + resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true)) + case o => o + } + val resolvedMergeCondition = resolveExpressionTopDown(m.mergeCondition, m) + m.copy(mergeCondition = resolvedMergeCondition, + matchedActions = newMatchedActions, + notMatchedActions = newNotMatchedActions) } - val resolvedMergeCondition = resolveExpressionTopDown(m.mergeCondition, m) - m.copy(mergeCondition = resolvedMergeCondition, - matchedActions = newMatchedActions, - notMatchedActions = newNotMatchedActions) case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 8c73b36..35b2003 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.command import java.net.URI -import java.util.Locale +import java.util.{Collections, Locale} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mock, when} @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, E import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, Assignment, CreateTableAsSelect, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SubqueryAlias, UpdateAction, UpdateTable} import org.apache.spark.sql.connector.InMemoryTableProvider -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCatalog, TableChange, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, TableChange, V1Table} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf @@ -49,6 +49,13 @@ class PlanResolutionSuite extends AnalysisTest { t } + private val tableWithAcceptAnySchemaCapability: Table = { + val t = mock(classOf[Table]) + when(t.schema()).thenReturn(new StructType().add("i", "int")) + when(t.capabilities()).thenReturn(Collections.singleton(TableCapability.ACCEPT_ANY_SCHEMA)) + t + } + private val v1Table: V1Table = { val t = mock(classOf[CatalogTable]) when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string")) @@ -77,6 +84,7 @@ class PlanResolutionSuite extends AnalysisTest { case "v1Table1" => v1Table case "v2Table" => table case "v2Table1" => table + case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability case name => throw new NoSuchTableException(name) } }) @@ -1351,5 +1359,45 @@ class PlanResolutionSuite extends AnalysisTest { } } + test("MERGE INTO TABLE - skip resolution on v2 tables that accept any schema") { + val sql = + s""" + |MERGE INTO v2TableWithAcceptAnySchemaCapability AS target + |USING v2Table AS source + |ON target.i = source.i + |WHEN MATCHED AND (target.s='delete') THEN DELETE + |WHEN MATCHED AND (target.s='update') THEN UPDATE SET target.s = source.s + |WHEN NOT MATCHED AND (target.s='insert') + | THEN INSERT (target.i, target.s) values (source.i, source.s) + """.stripMargin + + parseAndResolve(sql) match { + case MergeIntoTable( + SubqueryAlias(AliasIdentifier("target", None), _: DataSourceV2Relation), + SubqueryAlias(AliasIdentifier("source", None), _: DataSourceV2Relation), + EqualTo(l: UnresolvedAttribute, r: UnresolvedAttribute), + Seq( + DeleteAction(Some(EqualTo(dl: UnresolvedAttribute, StringLiteral("delete")))), + UpdateAction( + Some(EqualTo(ul: UnresolvedAttribute, StringLiteral("update"))), + updateAssigns)), + Seq( + InsertAction( + Some(EqualTo(il: UnresolvedAttribute, StringLiteral("insert"))), + insertAssigns))) => + assert(l.name == "target.i" && r.name == "source.i") + assert(dl.name == "target.s") + assert(ul.name == "target.s") + assert(il.name == "target.s") + assert(updateAssigns.size == 1) + assert(updateAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.s") + assert(updateAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "source.s") + assert(insertAssigns.size == 2) + assert(insertAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.i") + assert(insertAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "source.i") + + case l => fail("Expected unresolved MergeIntoTable, but got:\n" + l.treeString) + } + } // TODO: add tests for more commands. } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org