xianyinxin commented on a change in pull request #26167: [SPARK-28893][SQL] 
Support MERGE INTO in the parser and add the corresponding logical plan
URL: https://github.com/apache/spark/pull/26167#discussion_r344010635
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ##########
 @@ -395,6 +395,123 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
       predicate)
   }
 
+  private def withAssignments(
+      assignCtx: SqlBaseParser.AssignmentListContext): (Seq[Expression], 
Seq[Expression]) =
+    withOrigin(assignCtx) {
+      assignCtx.assignment().asScala.map { assign =>
+        (UnresolvedAttribute(visitQualifiedName(assign.key)), 
expression(assign.value))
+      }.foldLeft(Seq[Expression](), Seq[Expression]()) {
+        case (a, b) => (a._1 :+ b._1, a._2 :+ b._2)
+      }
+    }
+
+  override def visitMergeIntoTable(ctx: MergeIntoTableContext): LogicalPlan = 
withOrigin(ctx) {
+    val targetTable = UnresolvedRelation(visitMultipartIdentifier(ctx.target))
+    val targetTableAlias = if (ctx.targetAlias != null) {
+      val ident = ctx.targetAlias.strictIdentifier()
+      // We do not allow columns aliases after table alias.
+      if (ctx.targetAlias.identifierList() != null) {
+        throw new ParseException("Columns aliases is not allowed in MERGE.",
+          ctx.targetAlias.identifierList())
+      }
+      if (ident != null) Some(ident.getText) else None
+    } else {
+      None
+    }
+    val aliasedTarget = targetTableAlias.map(SubqueryAlias(_, 
targetTable)).getOrElse(targetTable)
+
+    val sourceTableOrQuery = if (ctx.source != null) {
+      UnresolvedRelation(visitMultipartIdentifier(ctx.source))
+    } else if (ctx.sourceQuery != null) {
+      visitQuery(ctx.sourceQuery)
+    } else {
+      throw new ParseException("Empty source for merge: you should specify a 
source" +
+          " table/subquery in merge.", ctx.source)
+    }
+    val sourceTableAlias = if (ctx.sourceAlias != null) {
+      val ident = ctx.sourceAlias.strictIdentifier()
+      // We do not allow columns aliases after table alias.
+      if (ctx.sourceAlias.identifierList() != null) {
+        throw new ParseException("Columns aliases is not allowed in MERGE.",
+          ctx.sourceAlias.identifierList())
+      }
+      if (ident != null) Some(ident.getText) else None
+    } else {
+      None
+    }
+    val aliasedSource =
+      sourceTableAlias.map(SubqueryAlias(_, 
sourceTableOrQuery)).getOrElse(sourceTableOrQuery)
+
+    val mergeCondition = expression(ctx.mergeCondition)
+
+    val matchedClauses = ctx.matchedClause()
+    if (matchedClauses.size() > 2) {
+      throw new ParseException("There should be at most 2 'WHEN MATCHED' 
clauses.",
+        matchedClauses.get(2))
+    }
+    val matchedActions = matchedClauses.asScala.map {
+      clause => {
+        if (clause.matchedAction().DELETE() != null) {
+          DeleteAction(
+            if (clause.matchedCond != null) 
Some(expression(clause.matchedCond)) else None)
+        } else if (clause.matchedAction().UPDATE() != null) {
+          val (setColumns, setValues): (Seq[Expression], Seq[Expression]) =
+            if (clause.matchedAction().ASTERISK() != null) {
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to