This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e019806  [CARBONDATA-4065] Support MERGE INTO SQL Command
e019806 is described below

commit e019806af5c27bd7d75ce144105570a21369013b
Author: BrooksLi <jianxili2...@outlook.com>
AuthorDate: Thu Nov 5 14:42:35 2020 +0800

    [CARBONDATA-4065] Support MERGE INTO SQL Command
    
    Why is this PR needed?
    In order to support MERGE INTO SQL Command in Carbondata
    The previous Scala Parser having trouble to parse the complicated Merge 
Into SQL Command
    
    What changes were proposed in this PR?
    Add an ANTLR parser, and support parse MERGE INTO SQL Command to DataSet 
Command
    
    Does this PR introduce any user interface change?
    Yes.
    The PR introduces the MERGE INTO SQL Command.
    
    Is any new testcase added?
    Yes
    
    This closes #4032
    
    Co-authored-by: Zhangshunyu <zhangshunyu1...@126.com>
---
 dev/findbugs-exclude.xml                           |  22 +-
 docs/scd-and-cdc-guide.md                          |  38 +-
 .../carbondata/examples/DataMergeIntoExample.scala | 164 ++++++
 integration/spark/pom.xml                          |  21 +
 .../org/apache/spark/sql/parser/CarbonSqlBase.g4   | 646 +++++++++++++++++++++
 .../apache/spark/sql/CarbonAntlrSqlVisitor.java    | 323 +++++++++++
 .../spark/sql/CarbonMergeIntoSQLCommand.scala      | 119 ++++
 .../sql/merge/model/CarbonJoinExpression.java      |  66 +++
 .../sql/merge/model/CarbonMergeIntoModel.java      |  80 +++
 .../apache/spark/sql/merge/model/ColumnModel.java  |  49 ++
 .../apache/spark/sql/merge/model/TableModel.java   |  59 ++
 .../command/mutation/merge/MergeProjection.scala   |  11 +-
 .../command/mutation/merge/interfaces.scala        |   7 +-
 .../spark/sql/parser/CarbonAntlrParser.scala       |  41 ++
 .../sql/parser/CarbonExtensionSqlParser.scala      |  41 +-
 .../scala/org/apache/spark/util/SparkUtil.scala    |  22 +-
 .../iud/MergeIntoCarbonTableTestCase.scala         | 314 ++++++++++
 .../spark/testsuite/merge/MergeTestCase.scala      |   2 +-
 pom.xml                                            |   7 +-
 19 files changed, 1997 insertions(+), 35 deletions(-)

diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml
index 060ea09..33dd252 100644
--- a/dev/findbugs-exclude.xml
+++ b/dev/findbugs-exclude.xml
@@ -21,11 +21,11 @@
   </Match>
 
   <Match>
-    <Source name="~.*\.scala" />
+    <Source name="~.*\.scala"/>
   </Match>
 
   <Match>
-    <Source name="~.*Test\.java" />
+    <Source name="~.*Test\.java"/>
   </Match>
 
   <!-- This method creates stream but the caller methods are responsible for 
closing the stream -->
@@ -95,7 +95,7 @@
   <Match>
     <Class name="org.apache.carbondata.core.scan.aggregator.impl.BitSet"/>
     <Or>
-    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
+      <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
     </Or>
   </Match>
   <Match>
@@ -126,6 +126,18 @@
     <Class name="org.apache.carbondata.events.OperationContext"/>
     <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
   </Match>
-  <Match> <Class 
name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/> 
<Field name="indexExprWrapper"/> <Bug 
pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> </Match>
-  <Match> <Class 
name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/> 
<Field name="validSegments"/> <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> 
</Match>
+  <Match>
+    <Class 
name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/>
+    <Field name="indexExprWrapper"/>
+    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
+  </Match>
+  <Match>
+    <Class 
name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/>
+    <Field name="validSegments"/>
+    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
+  </Match>
+
+  <Match>
+    <Package name="org.apache.spark.sql.parser"/>
+  </Match>
 </FindBugsFilter>
\ No newline at end of file
diff --git a/docs/scd-and-cdc-guide.md b/docs/scd-and-cdc-guide.md
index 892958b..38a2973 100644
--- a/docs/scd-and-cdc-guide.md
+++ b/docs/scd-and-cdc-guide.md
@@ -55,8 +55,42 @@ Below is the detailed description of the `merge` API 
operation.
    * `whenNotMatched` clause can have only the `insertExpr` action. The new 
row is generated based on the specified column and corresponding expressions. 
Users do not need to specify all the columns in the target table. For 
unspecified target columns, NULL is inserted.
 * `whenNotMatchedAndExistsOnlyOnTarget` clause is executed when row does not 
match source and exists only in target. This clause can have only delete action.
 
-**NOTE:** SQL syntax for merge is not yet supported.
+#### MERGE SQL
+
+Below sql merges a set of updates, insertions, and deletions based on a source 
table
+into a target carbondata table. 
+
+```
+    MERGE INTO target_table_identifier
+    USING source_table_identifier
+    ON <merge_condition>
+    [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
+    [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
+    [ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]
+```
+
+#### MERGE SQL Operation Semantics
+Below is the detailed description of the `merge` SQL operation.
+* `table_identifier` a table name, optionally qualified with a database name
+* `merge_condition` how the rows from one relation are combined with the rows 
of another relation. An expression with a return type of Boolean.
+* `WHEN MATCHED` clauses are executed when a source row matches a target table 
row based on the match condition,
+clauses can have at most one UPDATE and one DELETE action, These clauses have 
the following semantics.
+    * The UPDATE action in merge only updates the specified columns of the 
matched target row.
+    * The DELETE action will delete the matched row.
+    * WHEN MATCHED clauses can have at most one UPDATE and one DELETE action. 
The UPDATE action in merge only updates the specified columns of the matched 
target row. The DELETE action will delete the matched row.
+    * Each WHEN MATCHED clause can have an optional condition. If this clause 
condition exists, the UPDATE or DELETE action is executed for any matching 
source-target row pair row only when when the clause condition is true.
+    * If there are multiple WHEN MATCHED clauses, then they are evaluated in 
order they are specified (that is, the order of the clauses matter). All WHEN 
MATCHED clauses, except the last one, must have conditions.
+    * If both WHEN MATCHED clauses have conditions and neither of the 
conditions are true for a matching source-target row pair, then the matched 
target row is left unchanged.
+    * To update all the columns of the target carbondata table with the 
corresponding columns of the source dataset, use UPDATE SET *. This is 
equivalent to UPDATE SET col1 = source.col1 [, col2 = source.col2 ...] for all 
the columns of the target carbondata table. Therefore, this action assumes that 
the source table has the same columns as those in the target table, otherwise 
the query will throw an analysis error.
+* `matched_action` can be DELETE  | UPDATE SET *  |UPDATE SET column1 = value1 
[, column2 = value2 ...]
+* `WHEN NOT MATCHED` clause is executed when a source row does not match any 
target row based on the match condition, these clauses have the following 
semantics.
+    * WHEN NOT MATCHED clauses can only have the INSERT action. The new row is 
generated based on the specified column and corresponding expressions. All the 
columns in the target table do not need to be specified. For unspecified target 
columns, NULL is inserted.
+    * Each WHEN NOT MATCHED clause can have an optional condition. If the 
clause condition is present, a source row is inserted only if that condition is 
true for that row. Otherwise, the source column is ignored.
+    * If there are multiple WHEN NOT MATCHED clauses, then they are evaluated 
in order they are specified (that is, the order of the clauses matter). All 
WHEN NOT  MATCHED clauses, except the last one, must have conditions.
+    * To insert all the columns of the target carbondata table with the 
corresponding columns of the source dataset, use INSERT *. This is equivalent 
to INSERT (col1 [, col2 ...]) VALUES (source.col1 [, source.col2 ...]) for all 
the columns of the target carbondata table. Therefore, this action assumes that 
the source table has the same columns as those in the target table, otherwise 
the query will throw an error.
+* `not_matched_action` can be INSERT *  | INSERT (column1 [, column2 ...]) 
VALUES (value1 [, value2 ...])
 
 ##### Example code to implement cdc/scd scenario
 
-Please refer example class 
[MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala)
 to understand and implement scd and cdc scenarios.
+Please refer example class 
[MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala)
 to understand and implement scd and cdc scenarios using api.
+Please refer example class 
[DataMergeIntoExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala)
 to understand and implement scd and cdc scenarios using sql.
diff --git 
a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala
 
b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala
new file mode 100644
index 0000000..e3dc2da
--- /dev/null
+++ 
b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+object DataMergeIntoExample {
+
+  def main(args: Array[String]) {
+    val spark = ExampleUtils.createSparkSession("DataManagementExample")
+    deleteExampleBody(spark)
+    deleteWithExpressionExample(spark)
+    updateExampleBody(spark)
+    updateWithExpressionExample(spark)
+    updateSpecificColWithExpressionExample(spark)
+    insertExampleBody(spark)
+    insertWithExpressionExample(spark)
+    insertSpecificColWithExpressionExample(spark)
+    spark.close()
+  }
+
+  def initTable(spark: SparkSession): Unit = {
+    spark.sql("DROP TABLE IF EXISTS A")
+    spark.sql("DROP TABLE IF EXISTS B")
+
+    spark.sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS A(
+         |   id Int,
+         |   price Int,
+         |   state String
+         | )
+         | STORED AS carbondata
+       """.stripMargin)
+
+    spark.sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS B(
+         |   id Int,
+         |   price Int,
+         |   state String
+         | )
+         | STORED AS carbondata
+       """.stripMargin)
+
+    spark.sql(s"""INSERT INTO A VALUES (1,100,"MA")""")
+    spark.sql(s"""INSERT INTO A VALUES (2,200,"NY")""")
+    spark.sql(s"""INSERT INTO A VALUES (3,300,"NH")""")
+    spark.sql(s"""INSERT INTO A VALUES (4,400,"FL")""")
+
+    spark.sql(s"""INSERT INTO B VALUES (1,1,"MA (updated)")""")
+    spark.sql(s"""INSERT INTO B VALUES (2,3,"NY (updated)")""")
+    spark.sql(s"""INSERT INTO B VALUES (3,3,"CA (updated)")""")
+    spark.sql(s"""INSERT INTO B VALUES (5,5,"TX (updated)")""")
+    spark.sql(s"""INSERT INTO B VALUES (7,7,"LO (updated)")""")
+  }
+
+  def dropTables(spark: SparkSession): Unit = {
+    spark.sql("DROP TABLE IF EXISTS A")
+    spark.sql("DROP TABLE IF EXISTS B")
+  }
+
+  def deleteExampleBody(spark: SparkSession): Unit = {
+    dropTables(spark)
+    initTable(spark)
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED THEN DELETE"
+    spark.sql(sqlText)
+    spark.sql(s"""SELECT * FROM A""").show()
+    dropTables(spark)
+  }
+
+  def deleteWithExpressionExample(spark: SparkSession): Unit = {
+    dropTables(spark)
+    initTable(spark)
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND B.ID=2 
THEN DELETE"
+    spark.sql(sqlText)
+    spark.sql(s"""SELECT * FROM A""").show()
+    dropTables(spark)
+  }
+
+  def updateExampleBody(spark: SparkSession): Unit = {
+    dropTables(spark)
+    initTable(spark)
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED THEN UPDATE 
SET *"
+    spark.sql(sqlText)
+    spark.sql(s"""SELECT * FROM A""").show()
+    dropTables(spark)
+  }
+
+  def updateWithExpressionExample(spark: SparkSession): Unit = {
+    dropTables(spark)
+    initTable(spark)
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND A.ID=2 
THEN UPDATE SET *"
+    spark.sql(sqlText)
+    spark.sql(s"""SELECT * FROM A""").show()
+    dropTables(spark)
+  }
+
+  def updateSpecificColWithExpressionExample(spark: SparkSession): Unit = {
+    dropTables(spark)
+    initTable(spark)
+    // In this example, it will only update the state
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND A.ID=2 
THEN UPDATE SET " +
+                  "STATE=B.STATE"
+    spark.sql(sqlText)
+    spark.sql(s"""SELECT * FROM A""").show()
+    dropTables(spark)
+  }
+
+  def updateSpecificMultiColWithExpressionExample(spark: SparkSession): Unit = 
{
+    dropTables(spark)
+    initTable(spark)
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND A.ID=2 
THEN UPDATE SET A" +
+                  ".STATE=B.STATE, A.PRICE=B.PRICE"
+    spark.sql(sqlText)
+    spark.sql(s"""SELECT * FROM A""").show()
+    dropTables(spark)
+  }
+
+  def insertExampleBody(spark: SparkSession): Unit = {
+    dropTables(spark)
+    initTable(spark)
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED THEN 
INSERT *"
+    spark.sql(sqlText)
+    spark.sql(s"""SELECT * FROM A""").show()
+    dropTables(spark)
+  }
+
+  def insertWithExpressionExample(spark: SparkSession): Unit = {
+    dropTables(spark)
+    initTable(spark)
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED AND 
B.ID=7 THEN INSERT *"
+    spark.sql(sqlText)
+    spark.sql(s"""SELECT * FROM A""").show()
+    dropTables(spark)
+  }
+
+  def insertSpecificColWithExpressionExample(spark: SparkSession): Unit = {
+    dropTables(spark)
+    initTable(spark)
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED AND 
B.ID=7 THEN INSERT (A" +
+                  ".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE, 'test-string')"
+    spark.sql(sqlText)
+    spark.sql(s"""SELECT * FROM A""").show()
+    dropTables(spark)
+  }
+}
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index 2a591e7..6dd3ed4 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -84,6 +84,11 @@
   <dependencies>
     <!-- carbon -->
     <dependency>
+      <groupId>org.antlr</groupId>
+      <artifactId>antlr4-runtime</artifactId>
+      <version>${antlr4.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-hive</artifactId>
       <version>${project.version}</version>
@@ -528,6 +533,22 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.antlr</groupId>
+        <artifactId>antlr4-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>antlr4</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <visitor>true</visitor>
+          <sourceDirectory>../spark/src/main/antlr4</sourceDirectory>
+          <treatWarningsAsErrors>true</treatWarningsAsErrors>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 
diff --git 
a/integration/spark/src/main/antlr4/org/apache/spark/sql/parser/CarbonSqlBase.g4
 
b/integration/spark/src/main/antlr4/org/apache/spark/sql/parser/CarbonSqlBase.g4
new file mode 100644
index 0000000..4efb340
--- /dev/null
+++ 
b/integration/spark/src/main/antlr4/org/apache/spark/sql/parser/CarbonSqlBase.g4
@@ -0,0 +1,646 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Code ported from Apache Spark
+ * 
spark/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+ */
+grammar CarbonSqlBase;
+
+@parser::members {
+  /**
+   * When false, INTERSECT is given the greater precedence over the other set
+   * operations (UNION, EXCEPT and MINUS) as per the SQL standard.
+   */
+  public boolean legacy_setops_precedence_enbled = false;
+
+  /**
+   * When false, a literal with an exponent would be converted into
+   * double type rather than decimal type.
+   */
+  public boolean legacy_exponent_literal_as_decimal_enabled = false;
+
+  /**
+   * When true, the behavior of keywords follows ANSI SQL standard.
+   */
+  public boolean SQL_standard_keyword_behavior = false;
+}
+
+@lexer::members {
+  /**
+   * Verify whether current token is a valid decimal token (which contains 
dot).
+   * Returns true if the character that follows the token is not a digit or 
letter or underscore.
+   *
+   * For example:
+   * For char stream "2.3", "2." is not a valid decimal token, because it is 
followed by digit '3'.
+   * For char stream "2.3_", "2.3" is not a valid decimal token, because it is 
followed by '_'.
+   * For char stream "2.3W", "2.3" is not a valid decimal token, because it is 
followed by 'W'.
+   * For char stream "12.0D 34.E2+0.12 "  12.0D is a valid decimal token 
because it is followed
+   * by a space. 34.E2 is a valid decimal token because it is followed by 
symbol '+'
+   * which is not a digit or letter or underscore.
+   */
+  public boolean isValidDecimal() {
+    int nextChar = _input.LA(1);
+    if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= 
'9' ||
+      nextChar == '_') {
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * This method will be called when we see '/*' and try to match it as a 
bracketed comment.
+   * If the next character is '+', it should be parsed as hint later, and we 
cannot match
+   * it as a bracketed comment.
+   *
+   * Returns true if the next character is '+'.
+   */
+  public boolean isHint() {
+    int nextChar = _input.LA(1);
+    if (nextChar == '+') {
+      return true;
+    } else {
+      return false;
+    }
+  }
+}
+
+singleStatement
+    : statement ';'* EOF
+    ;
+
+singleExpression
+    : namedExpression EOF
+    ;
+
+singleTableIdentifier
+    : tableIdentifier EOF
+    ;
+
+singleMultipartIdentifier
+    : multipartIdentifier EOF
+    ;
+
+singleFunctionIdentifier
+    : functionIdentifier EOF
+    ;
+
+statement
+    : query                                                            
#statementDefault
+    | ctes? dmlStatementNoWith                                         
#dmlStatement
+    ;
+
+query
+    : ctes? queryTerm queryOrganization
+    ;
+
+ctes
+    : WITH namedQuery (',' namedQuery)*
+    ;
+
+namedQuery
+    : name=errorCapturingIdentifier (columnAliases=identifierList)? AS? '(' 
query ')'
+    ;
+
+constantList
+    : '(' constant (',' constant)* ')'
+    ;
+
+nestedConstantList
+    : '(' constantList (',' constantList)* ')'
+    ;
+
+resource
+    : identifier STRING
+    ;
+
+dmlStatementNoWith
+    : DELETE FROM multipartIdentifier tableAlias whereClause?                  
    #deleteFromTable
+    | UPDATE multipartIdentifier tableAlias setClause whereClause?             
    #updateTable
+    | MERGE INTO target=multipartIdentifier targetAlias=tableAlias
+        USING (source=multipartIdentifier |
+          '(' sourceQuery=query')') sourceAlias=tableAlias
+        ON mergeCondition=booleanExpression
+        matchedClause*
+        notMatchedClause*                                                      
    #mergeIntoTable
+    ;
+
+mergeInto
+    : MERGE INTO target=multipartIdentifier targetAlias=tableAlias
+              USING (source=multipartIdentifier |
+                '(' sourceQuery=query')') sourceAlias=tableAlias
+              ON mergeCondition=booleanExpression
+              matchedClause*
+              notMatchedClause*
+    ;
+
+queryOrganization
+    : (LIMIT (limit=expression))?
+    ;
+
+queryTerm
+    : queryPrimary                                                             
          #queryTermDefault
+    ;
+
+queryPrimary
+    : querySpecification                                                    
#queryPrimaryDefault
+    | '(' query ')'                                                         
#subquery
+    ;
+
+fromStatementBody
+    : selectClause
+      whereClause?
+      havingClause?
+      queryOrganization
+    ;
+
+querySpecification
+    : selectClause
+      fromClause?
+      whereClause?
+      havingClause?
+    ;
+
+selectClause
+    : SELECT (hints+=hint)* setQuantifier? namedExpressionSeq
+    ;
+
+setClause
+    : SET assignmentList
+    ;
+
+matchedClause
+    : WHEN MATCHED (AND matchedCond=booleanExpression)? THEN matchedAction
+    ;
+notMatchedClause
+    : WHEN NOT MATCHED (AND notMatchedCond=booleanExpression)? THEN 
notMatchedAction
+    ;
+
+matchedAction
+    : DELETE
+    | UPDATE SET ASTERISK
+    | UPDATE SET assignmentList
+    ;
+
+notMatchedAction
+    : INSERT ASTERISK
+    | INSERT '(' columns=multipartIdentifierList ')'
+        VALUES '(' expression (',' expression)* ')'
+    ;
+
+assignmentList
+    : assignment (',' assignment)*
+    ;
+
+assignment
+    : key=multipartIdentifier EQ value=expression
+    ;
+
+whereClause
+    : WHERE booleanExpression
+    ;
+
+havingClause
+    : HAVING booleanExpression
+    ;
+
+hint
+    : '/*+' hintStatements+=hintStatement (','? 
hintStatements+=hintStatement)* '*/'
+    ;
+
+hintStatement
+    : hintName=identifier
+    | hintName=identifier '(' parameters+=primaryExpression (',' 
parameters+=primaryExpression)* ')'
+    ;
+
+fromClause
+    : FROM relation (',' relation)*
+    ;
+
+setQuantifier
+    : DISTINCT
+    ;
+
+relation
+    : relationPrimary
+    ;
+
+identifierList
+    : '(' identifierSeq ')'
+    ;
+
+identifierSeq
+    : ident+=errorCapturingIdentifier (',' ident+=errorCapturingIdentifier)*
+    ;
+
+relationPrimary
+    : multipartIdentifier  #tableName
+    | inlineTable                             #inlineTableDefault2
+    | functionTable                           #tableValuedFunction
+    ;
+
+inlineTable
+    : VALUES expression (',' expression)* tableAlias
+    ;
+
+functionTable
+    : funcName=errorCapturingIdentifier '(' (expression (',' expression)*)? 
')' tableAlias
+    ;
+
+tableAlias
+    : (AS? strictIdentifier identifierList?)?
+    ;
+
+multipartIdentifierList
+    : multipartIdentifier (',' multipartIdentifier)*
+    ;
+
+multipartIdentifier
+    : parts+=errorCapturingIdentifier ('.' parts+=errorCapturingIdentifier)*
+    ;
+
+tableIdentifier
+    : (db=errorCapturingIdentifier '.')? table=errorCapturingIdentifier
+    ;
+
+functionIdentifier
+    : (db=errorCapturingIdentifier '.')? function=errorCapturingIdentifier
+    ;
+
+namedExpression
+    : expression (AS? (name=errorCapturingIdentifier | identifierList))?
+    ;
+
+namedExpressionSeq
+    : namedExpression (',' namedExpression)*
+    ;
+
+expression
+    : booleanExpression
+    ;
+
+booleanExpression
+    : NOT booleanExpression                                        #logicalNot
+    | valueExpression predicate?                                   #predicated
+    | left=booleanExpression operator=AND right=booleanExpression  
#logicalBinary
+    | left=booleanExpression operator=OR right=booleanExpression   
#logicalBinary
+    ;
+
+predicate
+    : NOT? kind=BETWEEN lower=valueExpression AND upper=valueExpression
+    | NOT? kind=IN '(' expression (',' expression)* ')'
+    | NOT? kind=IN '(' query ')'
+    | IS NOT? kind=NULL
+    | IS NOT? kind=(TRUE | FALSE)
+    | IS NOT? kind=DISTINCT FROM right=valueExpression
+    ;
+
+valueExpression
+    : primaryExpression                                                        
              #valueExpressionDefault
+    | operator=(MINUS | PLUS | TILDE) valueExpression                          
              #arithmeticUnary
+    | left=valueExpression operator=(ASTERISK | SLASH | PERCENT | DIV) 
right=valueExpression #arithmeticBinary
+    | left=valueExpression operator=(PLUS | MINUS | CONCAT_PIPE) 
right=valueExpression       #arithmeticBinary
+    | left=valueExpression operator=AMPERSAND right=valueExpression            
              #arithmeticBinary
+    | left=valueExpression operator=HAT right=valueExpression                  
              #arithmeticBinary
+    | left=valueExpression operator=PIPE right=valueExpression                 
              #arithmeticBinary
+    | left=valueExpression comparisonOperator right=valueExpression            
              #comparison
+    ;
+
+primaryExpression
+    : name=(CURRENT_DATE | CURRENT_TIMESTAMP)                                  
                #currentDatetime
+    | constant                                                                 
                #constantDefault
+    | ASTERISK                                                                 
                #star
+    | qualifiedName '.' ASTERISK                                               
                #star
+    | '(' namedExpression (',' namedExpression)+ ')'                           
                #rowConstructor
+    | '(' query ')'                                                            
                #subqueryExpression
+    | identifier '->' expression                                               
                #lambda
+    | '(' identifier (',' identifier)+ ')' '->' expression                     
                #lambda
+    | value=primaryExpression '[' index=valueExpression ']'                    
                #subscript
+    | identifier                                                               
                #columnReference
+    | base=primaryExpression '.' fieldName=identifier                          
                #dereference
+    | '(' expression ')'                                                       
                #parenthesizedExpression
+    ;
+
+constant
+    : NULL                                                                     
                #nullLiteral
+    | identifier STRING                                                        
                #typeConstructor
+    | number                                                                   
                #numericLiteral
+    | booleanValue                                                             
                #booleanLiteral
+    | STRING+                                                                  
                #stringLiteral
+    ;
+
+comparisonOperator
+    : EQ | NEQ | NEQJ | LT | LTE | GT | GTE | NSEQ
+    ;
+
+booleanValue
+    : TRUE | FALSE
+    ;
+
+qualifiedName
+    : identifier ('.' identifier)*
+    ;
+
+errorCapturingIdentifier
+    : identifier errorCapturingIdentifierExtra
+    ;
+
+// extra left-factoring grammar
+errorCapturingIdentifierExtra
+    : (MINUS identifier)+    #errorIdent
+    |                        #realIdent
+    ;
+
+identifier
+    : strictIdentifier
+    | {!SQL_standard_keyword_behavior}? strictNonReserved
+    ;
+
+strictIdentifier
+    : IDENTIFIER              #unquotedIdentifier
+    | quotedIdentifier        #quotedIdentifierAlternative
+    | {SQL_standard_keyword_behavior}? ansiNonReserved #unquotedIdentifier
+    | {!SQL_standard_keyword_behavior}? nonReserved    #unquotedIdentifier
+    ;
+
+quotedIdentifier
+    : BACKQUOTED_IDENTIFIER
+    ;
+
+number
+    : {!legacy_exponent_literal_as_decimal_enabled}? MINUS? EXPONENT_VALUE 
#exponentLiteral
+    | {!legacy_exponent_literal_as_decimal_enabled}? MINUS? DECIMAL_VALUE  
#decimalLiteral
+    | {legacy_exponent_literal_as_decimal_enabled}? MINUS? (EXPONENT_VALUE | 
DECIMAL_VALUE) #legacyDecimalLiteral
+    | MINUS? INTEGER_VALUE            #integerLiteral
+    | MINUS? BIGINT_LITERAL           #bigIntLiteral
+    | MINUS? SMALLINT_LITERAL         #smallIntLiteral
+    | MINUS? TINYINT_LITERAL          #tinyIntLiteral
+    | MINUS? DOUBLE_LITERAL           #doubleLiteral
+    | MINUS? FLOAT_LITERAL            #floatLiteral
+    | MINUS? BIGDECIMAL_LITERAL       #bigDecimalLiteral
+    ;
+
+ansiNonReserved
+//--ANSI-NON-RESERVED-START
+    : BETWEEN
+    | CURRENT
+    | DELETE
+    | DIV
+    | IF
+    | INSERT
+    | LIKE
+    | LIMIT
+    | MATCHED
+    | MERGE
+    | NO
+    | NULLS
+    | OF
+    | SET
+    | SETMINUS
+    | SETS
+    | SORT
+    | SORTED
+    | TRUE
+    | UPDATE
+    | VALUES
+//--ANSI-NON-RESERVED-END
+    ;
+
+strictNonReserved
+    : JOIN
+    | LEFT
+    | ON
+    | RIGHT
+    | SEMI
+    | SETMINUS
+    | UNION
+    | USING
+    ;
+
+nonReserved
+    : AND
+    | AS
+    | BETWEEN
+    | CURRENT
+    | CURRENT_DATE
+    | CURRENT_TIME
+    | CURRENT_TIMESTAMP
+    | CURRENT_USER
+    | DELETE
+    | DISTINCT
+    | DIV
+    | ELSE
+    | END
+    | FALSE
+    | FROM
+    | IN
+    | INSERT
+    | INTO
+    | IS
+    | LIKE
+    | LIMIT
+    | MATCHED
+    | MERGE
+    | NO
+    | NOT
+    | NULL
+    | NULLS
+    | OF
+    | OR
+    | ORDER
+    | SELECT
+    | SET
+    | SETS
+    | SORT
+    | SORTED
+    | THEN
+    | TIME
+    | TO
+    | TRUE
+    | UPDATE
+    | VALUES
+    | WHEN
+    | WHERE
+//--DEFAULT-NON-RESERVED-END
+    ;
+
+//============================
+// Start of the keywords list
+//============================
+AND: 'AND';
+AS: 'AS';
+BETWEEN: 'BETWEEN';
+CURRENT: 'CURRENT';
+CURRENT_DATE: 'CURRENT_DATE';
+CURRENT_TIME: 'CURRENT_TIME';
+CURRENT_TIMESTAMP: 'CURRENT_TIMESTAMP';
+CURRENT_USER: 'CURRENT_USER';
+DELETE: 'DELETE';
+DISTINCT: 'DISTINCT';
+DIV: 'DIV';
+ELSE: 'ELSE';
+END: 'END';
+FALSE: 'FALSE';
+FROM: 'FROM';
+HAVING: 'HAVING';
+IF: 'IF';
+IN: 'IN';
+INSERT: 'INSERT';
+INTO: 'INTO';
+IS: 'IS';
+JOIN: 'JOIN';
+LEFT: 'LEFT';
+LIKE: 'LIKE';
+LIMIT: 'LIMIT';
+MATCHED: 'MATCHED';
+MERGE: 'MERGE';
+NO: 'NO';
+NOT: 'NOT' | '!';
+NULL: 'NULL';
+NULLS: 'NULLS';
+OF: 'OF';
+ON: 'ON';
+OR: 'OR';
+ORDER: 'ORDER';
+OUTER: 'OUTER';
+RIGHT: 'RIGHT';
+SELECT: 'SELECT';
+SEMI: 'SEMI';
+SET: 'SET';
+SETMINUS: 'MINUS';
+SETS: 'SETS';
+SORT: 'SORT';
+SORTED: 'SORTED';
+THEN: 'THEN';
+TIME: 'TIME';
+TO: 'TO';
+TRUE: 'TRUE';
+UNION: 'UNION';
+UPDATE: 'UPDATE';
+USING: 'USING';
+VALUES: 'VALUES';
+WHEN: 'WHEN';
+WHERE: 'WHERE';
+WITH: 'WITH';
+
+EQ  : '=' | '==';
+NSEQ: '<=>';
+NEQ : '<>';
+NEQJ: '!=';
+LT  : '<';
+LTE : '<=' | '!>';
+GT  : '>';
+GTE : '>=' | '!<';
+
+PLUS: '+';
+MINUS: '-';
+ASTERISK: '*';
+SLASH: '/';
+PERCENT: '%';
+TILDE: '~';
+AMPERSAND: '&';
+PIPE: '|';
+CONCAT_PIPE: '||';
+HAT: '^';
+
+STRING
+    : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
+    | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
+    ;
+
+BIGINT_LITERAL
+    : DIGIT+ 'L'
+    ;
+
+SMALLINT_LITERAL
+    : DIGIT+ 'S'
+    ;
+
+TINYINT_LITERAL
+    : DIGIT+ 'Y'
+    ;
+
+INTEGER_VALUE
+    : DIGIT+
+    ;
+
+EXPONENT_VALUE
+    : DIGIT+ EXPONENT
+    | DECIMAL_DIGITS EXPONENT {isValidDecimal()}?
+    ;
+
+DECIMAL_VALUE
+    : DECIMAL_DIGITS {isValidDecimal()}?
+    ;
+
+FLOAT_LITERAL
+    : DIGIT+ EXPONENT? 'F'
+    | DECIMAL_DIGITS EXPONENT? 'F' {isValidDecimal()}?
+    ;
+
+DOUBLE_LITERAL
+    : DIGIT+ EXPONENT? 'D'
+    | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}?
+    ;
+
+BIGDECIMAL_LITERAL
+    : DIGIT+ EXPONENT? 'BD'
+    | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
+    ;
+
+IDENTIFIER
+    : (LETTER | DIGIT | '_')+
+    ;
+
+BACKQUOTED_IDENTIFIER
+    : '`' ( ~'`' | '``' )* '`'
+    ;
+
+fragment DECIMAL_DIGITS
+    : DIGIT+ '.' DIGIT*
+    | '.' DIGIT+
+    ;
+
+fragment EXPONENT
+    : 'E' [+-]? DIGIT+
+    ;
+
+fragment DIGIT
+    : [0-9]
+    ;
+
+fragment LETTER
+    : [A-Z]
+    ;
+
+SIMPLE_COMMENT
+    : '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN)
+    ;
+
+BRACKETED_COMMENT
+    : '/*' {!isHint()}? (BRACKETED_COMMENT|.)*? '*/' -> channel(HIDDEN)
+    ;
+
+WS
+    : [ \r\n\t]+ -> channel(HIDDEN)
+    ;
+
+// Catch-all for anything we can't recognize.
+// We use this to be able to ignore and recover all the text
+// when splitting statements with DelimiterLexer
+UNRECOGNIZED
+    : .
+    ;
diff --git 
a/integration/spark/src/main/java/org/apache/spark/sql/CarbonAntlrSqlVisitor.java
 
b/integration/spark/src/main/java/org/apache/spark/sql/CarbonAntlrSqlVisitor.java
new file mode 100644
index 0000000..69a5f98
--- /dev/null
+++ 
b/integration/spark/src/main/java/org/apache/spark/sql/CarbonAntlrSqlVisitor.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import org.apache.spark.sql.execution.command.mutation.merge.DeleteAction;
+import org.apache.spark.sql.execution.command.mutation.merge.InsertAction;
+import org.apache.spark.sql.execution.command.mutation.merge.MergeAction;
+import org.apache.spark.sql.execution.command.mutation.merge.UpdateAction;
+import org.apache.spark.sql.merge.model.CarbonJoinExpression;
+import org.apache.spark.sql.merge.model.CarbonMergeIntoModel;
+import org.apache.spark.sql.merge.model.ColumnModel;
+import org.apache.spark.sql.merge.model.TableModel;
+import org.apache.spark.sql.parser.CarbonSqlBaseParser;
+import org.apache.spark.util.SparkUtil;
+
+public class CarbonAntlrSqlVisitor {
+
+  private final ParserInterface sparkParser;
+
+  public CarbonAntlrSqlVisitor(ParserInterface sparkParser) {
+    this.sparkParser = sparkParser;
+  }
+
+  public String visitTableAlias(CarbonSqlBaseParser.TableAliasContext ctx) {
+    if (null == ctx.children) {
+      return null;
+    }
+    String res = ctx.getChild(1).getText();
+    return res;
+  }
+
+  public MergeAction 
visitCarbonAssignmentList(CarbonSqlBaseParser.AssignmentListContext ctx)
+          throws MalformedCarbonCommandException {
+    //  UPDATE SET assignmentList
+    Map<Column, Column> map = new HashMap<>();
+    for (int currIdx = 0; currIdx < ctx.getChildCount(); currIdx++) {
+      if (ctx.getChild(currIdx) instanceof 
CarbonSqlBaseParser.AssignmentContext) {
+        //Assume the actions are all use to pass value
+        String left = ctx.getChild(currIdx).getChild(0).getText();
+        if (left.split("\\.").length > 1) {
+          left = left.split("\\.")[1];
+        }
+        String right = ctx.getChild(currIdx).getChild(2).getText();
+        Column rightColumn = null;
+        try {
+          Expression expression = sparkParser.parseExpression(right);
+          rightColumn = new Column(expression);
+        } catch (Exception e) {
+          throw new MalformedCarbonCommandException("Parse failed: " + 
e.getMessage());
+        }
+        map.put(new Column(left), rightColumn);
+      }
+    }
+    return new UpdateAction(SparkUtil.convertMap(map), false);
+  }
+
+  public MergeAction 
visitCarbonMatchedAction(CarbonSqlBaseParser.MatchedActionContext ctx)
+          throws MalformedCarbonCommandException {
+    int childCount = ctx.getChildCount();
+    if (childCount == 1) {
+      // when matched ** delete
+      return new DeleteAction();
+    } else {
+      if (ctx.getChild(ctx.getChildCount() - 1) instanceof
+              CarbonSqlBaseParser.AssignmentListContext) {
+        //UPDATE SET assignmentList
+        return visitCarbonAssignmentList(
+            (CarbonSqlBaseParser.AssignmentListContext) 
ctx.getChild(ctx.getChildCount() - 1));
+      } else {
+        //UPDATE SET *
+        return new UpdateAction(null, true);
+      }
+    }
+  }
+
+  public InsertAction 
visitCarbonNotMatchedAction(CarbonSqlBaseParser.NotMatchedActionContext ctx) {
+    if (ctx.getChildCount() <= 2) {
+      //INSERT *
+      return InsertAction.apply(null, true);
+    } else {
+      return InsertAction.apply(null, false);
+    }
+  }
+
+  public MergeAction 
visitCarbonNotMatchedClause(CarbonSqlBaseParser.NotMatchedClauseContext ctx) {
+    int currIdx = 0;
+    for (; currIdx < ctx.getChildCount(); currIdx++) {
+      if (ctx.getChild(currIdx) instanceof 
CarbonSqlBaseParser.NotMatchedActionContext) {
+        break;
+      }
+    }
+    return visitCarbonNotMatchedAction(
+        (CarbonSqlBaseParser.NotMatchedActionContext) ctx.getChild(currIdx));
+  }
+
+  public MergeAction 
visitCarbonMatchedClause(CarbonSqlBaseParser.MatchedClauseContext ctx)
+          throws MalformedCarbonCommandException {
+    //There will be lots of childs at ctx,
+    // we need to find the predicate
+    int currIdx = 0;
+    for (; currIdx < ctx.getChildCount(); currIdx++) {
+      if (ctx.getChild(currIdx) instanceof 
CarbonSqlBaseParser.MatchedActionContext) {
+        break;
+      }
+    }
+    // Throw Exception in case of no Matched Action
+    return visitCarbonMatchedAction(
+            (CarbonSqlBaseParser.MatchedActionContext) ctx.getChild(currIdx));
+  }
+
+  public boolean containsWhenMatchedPredicateExpression(int childCount) {
+    return childCount > 4;
+  }
+
+  public boolean containsWhenNotMatchedPredicateExpression(int childCount) {
+    return childCount > 5;
+  }
+
+  public CarbonMergeIntoModel 
visitMergeIntoCarbonTable(CarbonSqlBaseParser.MergeIntoContext ctx)
+          throws MalformedCarbonCommandException {
+    // handle the exception msg from base parser
+    if (ctx.exception != null) {
+      throw new MalformedCarbonCommandException("Parse failed!");
+    }
+    TableModel targetTable = visitMultipartIdentifier(ctx.target);
+    TableModel sourceTable = visitMultipartIdentifier(ctx.source);
+
+    //Once get these two table,
+    //We can try to get CarbonTable
+
+    //Build a matched clause list to store the when matched and when not 
matched clause
+    int size = ctx.getChildCount();
+    int currIdx = 0;
+    Expression joinExpression = null;
+    List<Expression> mergeExpressions = new ArrayList<>();
+    List<MergeAction> mergeActions = new ArrayList<>();
+
+    // There should be two List to store the result retrieve from
+    // when matched / when not matched context
+    while (currIdx < size) {
+      if (ctx.getChild(currIdx) instanceof 
CarbonSqlBaseParser.PredicatedContext) {
+        //This branch will visit the Join Expression
+        ctx.getChild(currIdx).getChildCount();
+        joinExpression = this.visitCarbonPredicated(
+                (CarbonSqlBaseParser.PredicatedContext) ctx.getChild(currIdx));
+      } else if (ctx.getChild(currIdx) instanceof 
CarbonSqlBaseParser.MatchedClauseContext) {
+        //This branch will deal with the Matched Clause
+        Expression whenMatchedExpression = null;
+        //Get the whenMatched expression
+        try {
+          if 
(this.containsWhenMatchedPredicateExpression(ctx.getChild(currIdx).getChildCount()))
 {
+            whenMatchedExpression = sparkParser.parseExpression(
+                ((CarbonSqlBaseParser.MatchedClauseContext) 
ctx.getChild(currIdx))
+                    .booleanExpression().getText());
+          }
+        } catch (ParseException e) {
+          throw new MalformedCarbonCommandException("Parse failed: " + 
e.getMessage());
+        }
+        mergeExpressions.add(whenMatchedExpression);
+        mergeActions.add(visitCarbonMatchedAction(
+            (CarbonSqlBaseParser.MatchedActionContext) ctx.getChild(currIdx)
+                .getChild(ctx.getChild(currIdx).getChildCount() - 1)));
+      } else if (ctx.getChild(currIdx) instanceof 
CarbonSqlBaseParser.NotMatchedClauseContext) {
+        //This branch will deal with the Matched Clause
+        Expression whenNotMatchedExpression = null;
+        //Get the whenMatched expression
+        try {
+          if (this
+              
.containsWhenNotMatchedPredicateExpression(ctx.getChild(currIdx).getChildCount()))
 {
+            whenNotMatchedExpression = sparkParser.parseExpression(
+                ((CarbonSqlBaseParser.NotMatchedClauseContext) 
ctx.getChild(currIdx))
+                    .booleanExpression().getText());
+          }
+        } catch (ParseException e) {
+          throw new MalformedCarbonCommandException("Parse failed: " + 
e.getMessage());
+        }
+        mergeExpressions.add(whenNotMatchedExpression);
+        CarbonSqlBaseParser.NotMatchedActionContext notMatchedActionContext =
+                (CarbonSqlBaseParser.NotMatchedActionContext) 
ctx.getChild(currIdx)
+                        .getChild(ctx.getChild(currIdx).getChildCount() - 1);
+        if (notMatchedActionContext.getChildCount() <= 2) {
+          mergeActions.add(InsertAction.apply(null, true));
+        } else if (notMatchedActionContext.ASTERISK() == null) {
+          if (notMatchedActionContext.columns.multipartIdentifier().size() !=
+                  notMatchedActionContext.expression().size()) {
+            throw new MalformedCarbonCommandException("Parse failed: size of 
columns " +
+                    "is not equal to size of expression in not matched 
action.");
+          }
+          Map<Column, Column> insertMap = new HashMap<>();
+          for (int i = 0; i < 
notMatchedActionContext.columns.multipartIdentifier().size(); i++) {
+            String left = visitMultipartIdentifier(
+                            
notMatchedActionContext.columns.multipartIdentifier().get(i), "")
+                            .getColName();
+            String right = 
notMatchedActionContext.expression().get(i).getText();
+            // some times the right side is literal or expression, not table 
column
+            // so we need to check the left side is a column or expression
+            Column rightColumn = null;
+            try {
+              Expression expression = sparkParser.parseExpression(right);
+              rightColumn = new Column(expression);
+            } catch (Exception ex) {
+              throw new MalformedCarbonCommandException("Parse failed: " + 
ex.getMessage());
+            }
+            insertMap.put(new Column(left), rightColumn);
+          }
+          mergeActions.add(InsertAction.apply(SparkUtil.convertMap(insertMap), 
false));
+        } else {
+          mergeActions.add(InsertAction.apply(null, false));
+        }
+      }
+      currIdx++;
+    }
+    return new CarbonMergeIntoModel(targetTable, sourceTable, joinExpression,
+            mergeExpressions, mergeActions);
+  }
+
+  public CarbonJoinExpression 
visitComparison(CarbonSqlBaseParser.ComparisonContext ctx) {
+    // we need to get left Expression and Right Expression
+    // Even get the table name and col name
+    ctx.getText();
+    String t1Name = ctx.left.getChild(0).getChild(0).getText();
+    String c1Name = ctx.left.getChild(0).getChild(2).getText();
+    String t2Name = ctx.right.getChild(0).getChild(0).getText();
+    String c2Name = ctx.right.getChild(0).getChild(2).getText();
+    return new CarbonJoinExpression(t1Name, c1Name, t2Name, c2Name);
+  }
+
+  public Expression visitComparison(CarbonSqlBaseParser.ComparisonContext ctx, 
String x) {
+    Expression expression = null;
+    try {
+      expression = sparkParser.parseExpression(ctx.getText());
+    } catch (ParseException e) {
+      e.printStackTrace();
+    }
+    return expression;
+  }
+
+  public CarbonJoinExpression 
visitPredicated(CarbonSqlBaseParser.PredicatedContext ctx) {
+    return visitComparison((CarbonSqlBaseParser.ComparisonContext) 
ctx.getChild(0));
+  }
+
+  public Expression 
visitCarbonPredicated(CarbonSqlBaseParser.PredicatedContext ctx) {
+    return visitComparison((CarbonSqlBaseParser.ComparisonContext) 
ctx.getChild(0), "");
+  }
+
+  public ColumnModel visitDereference(CarbonSqlBaseParser.DereferenceContext 
ctx) {
+    // In this part, it will return two colunm name
+    int count = ctx.getChildCount();
+    ColumnModel col = new ColumnModel();
+    if (count == 3) {
+      String tableName = ctx.getChild(0).getText();
+      String colName = ctx.getChild(2).getText();
+      col = new ColumnModel(tableName, colName);
+    }
+    return col;
+  }
+
+  public TableModel 
visitMultipartIdentifier(CarbonSqlBaseParser.MultipartIdentifierContext ctx) {
+    TableModel table = new TableModel();
+    List<CarbonSqlBaseParser.ErrorCapturingIdentifierContext> parts = 
ctx.parts;
+    if (parts.size() == 2) {
+      table.setDatabase(parts.get(0).getText());
+      table.setTable(parts.get(1).getText());
+    }
+    if (parts.size() == 1) {
+      table.setTable(parts.get(0).getText());
+    }
+    return table;
+  }
+
+  public ColumnModel 
visitMultipartIdentifier(CarbonSqlBaseParser.MultipartIdentifierContext ctx,
+                                              String x) {
+    ColumnModel column = new ColumnModel();
+    List<CarbonSqlBaseParser.ErrorCapturingIdentifierContext> parts = 
ctx.parts;
+    if (parts.size() == 2) {
+      column.setTable(parts.get(0).getText());
+      column.setColName(parts.get(1).getText());
+    }
+    if (parts.size() == 1) {
+      column.setColName(parts.get(0).getText());
+    }
+    return column;
+  }
+
+  public String 
visitUnquotedIdentifier(CarbonSqlBaseParser.UnquotedIdentifierContext ctx) {
+    String res = ctx.getChild(0).getText();
+    return res;
+  }
+
+  public String 
visitComparisonOperator(CarbonSqlBaseParser.ComparisonOperatorContext ctx) {
+    String res = ctx.getChild(0).getText();
+    return res;
+  }
+
+  public String 
visitTableIdentifier(CarbonSqlBaseParser.TableIdentifierContext ctx) {
+    return ctx.getChild(0).getText();
+  }
+}
diff --git 
a/integration/spark/src/main/java/org/apache/spark/sql/CarbonMergeIntoSQLCommand.scala
 
b/integration/spark/src/main/java/org/apache/spark/sql/CarbonMergeIntoSQLCommand.scala
new file mode 100644
index 0000000..67611fd
--- /dev/null
+++ 
b/integration/spark/src/main/java/org/apache/spark/sql/CarbonMergeIntoSQLCommand.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.command.AtomicRunnableCommand
+import org.apache.spark.sql.execution.command.mutation.merge._
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.merge.model.{CarbonMergeIntoModel, TableModel}
+import org.apache.spark.util.SparkUtil._
+import org.apache.spark.util.TableAPIUtil
+
+case class CarbonMergeIntoSQLCommand(mergeInto: CarbonMergeIntoModel)
+  extends AtomicRunnableCommand {
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val sourceTable: TableModel = mergeInto.getSource
+    val targetTable: TableModel = mergeInto.getTarget
+    val mergeCondition: Expression = mergeInto.getMergeCondition
+    val mergeExpression: Seq[Expression] = 
convertExpressionList(mergeInto.getMergeExpressions)
+    val mergeActions: Seq[MergeAction] = 
convertMergeActionList(mergeInto.getMergeActions)
+
+    // validate the table
+    val sourceDatabaseName =
+      CarbonEnv.getDatabaseName(Option(sourceTable.getDatabase))(sparkSession)
+    val sourceTableName = sourceTable.getTable
+    val targetDatabaseName =
+      CarbonEnv.getDatabaseName(Option(targetTable.getDatabase))(sparkSession)
+    val targetTableName = targetTable.getTable
+    TableAPIUtil.validateTableExists(sparkSession,
+      sourceDatabaseName,
+      sourceTableName)
+    TableAPIUtil.validateTableExists(sparkSession,
+      targetDatabaseName,
+      targetTableName)
+
+    val srcDf = sparkSession.sql(s"""SELECT * FROM 
${sourceDatabaseName}.${sourceTableName}""")
+    val tgDf = sparkSession.sql(s"""SELECT * FROM 
${targetDatabaseName}.${targetTableName}""")
+
+    var matches = scala.collection.mutable.ArrayBuffer[MergeMatch]()
+    val mergeExpLength: Int = mergeExpression.length
+
+    // This for loop will gather the match condition and match action to build 
the MergeMatch
+    for (x <- 0 until mergeExpLength) {
+      val currExpression: Expression = mergeExpression.apply(x)
+      val currAction: MergeAction = mergeActions.apply(x)
+      // Use pattern matching to convert current actions to Map
+      // Since the delete action will delete the whole line, we don't need to 
build map here
+      currAction match {
+        case action: UpdateAction =>
+          if (action.isStar) {
+            val srcCols = srcDf.columns
+            val tgCols = tgDf.columns
+            action.updateMap = Map[Column, Column]()
+            for (i <- srcCols.indices) {
+              action.updateMap
+                .+=(col(tgCols.apply(i)) ->
+                    col(mergeInto.getSource.getTable + "." + srcCols.apply(i)))
+            }
+          }
+        case action: InsertAction =>
+          if (action.isStar) {
+            val srcCols = srcDf.columns
+            val tgCols = tgDf.columns
+            action.insertMap = Map[Column, Column]()
+            for (i <- srcCols.indices) {
+              action.insertMap
+                .+=(col(tgCols.apply(i)) ->
+                    col(mergeInto.getSource.getTable + "." + srcCols.apply(i)))
+            }
+          }
+        case _ =>
+      }
+      if (currExpression == null) {
+        // According to the merge actions to re-generate matches
+        if (currAction.isInstanceOf[DeleteAction] || 
currAction.isInstanceOf[UpdateAction]) {
+          matches ++= Seq(WhenMatched().addAction(currAction))
+        } else {
+          matches ++= Seq(WhenNotMatched().addAction(currAction))
+        }
+      } else {
+        // Since the mergeExpression is not null, we need to Initialize the
+        // WhenMatched/WhenNotMatched with the Expression
+        val carbonMergeExpression: Option[Column] = 
Option(Column(currExpression))
+        if (currAction.isInstanceOf[DeleteAction] || 
currAction.isInstanceOf[UpdateAction]) {
+          matches ++= 
Seq(WhenMatched(carbonMergeExpression).addAction(currAction))
+        } else {
+          matches ++= 
Seq(WhenNotMatched(carbonMergeExpression).addAction(currAction))
+        }
+      }
+    }
+    val joinExpression = Column(mergeCondition)
+    val mergeDataSetMatches: MergeDataSetMatches = 
MergeDataSetMatches(joinExpression,
+      matches.toList)
+
+    CarbonMergeDataSetCommand(tgDf, srcDf, 
mergeDataSetMatches).run(sparkSession)
+  }
+
+  override protected def opName: String = "MERGE SQL COMMAND"
+}
diff --git 
a/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonJoinExpression.java
 
b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonJoinExpression.java
new file mode 100644
index 0000000..e3e913a
--- /dev/null
+++ 
b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonJoinExpression.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.merge.model;
+
+public class CarbonJoinExpression {
+
+  String srcTable;
+  String srcCol;
+  String tgTable;
+  String tgCol;
+
+  public CarbonJoinExpression(String srcTable, String srcCol, String tgTable, 
String tgCol) {
+    this.srcTable = srcTable;
+    this.srcCol = srcCol;
+    this.tgTable = tgTable;
+    this.tgCol = tgCol;
+  }
+
+  public String getSrcTable() {
+    return srcTable;
+  }
+
+  public void setSrcTable(String srcTable) {
+    this.srcTable = srcTable;
+  }
+
+  public String getSrcCol() {
+    return srcCol;
+  }
+
+  public void setSrcCol(String srcCol) {
+    this.srcCol = srcCol;
+  }
+
+  public String getTgTable() {
+    return tgTable;
+  }
+
+  public void setTgTable(String tgTable) {
+    this.tgTable = tgTable;
+  }
+
+  public String getTgCol() {
+    return tgCol;
+  }
+
+  public void setTgCol(String tgCol) {
+    this.tgCol = tgCol;
+  }
+}
+
diff --git 
a/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonMergeIntoModel.java
 
b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonMergeIntoModel.java
new file mode 100644
index 0000000..bf88079
--- /dev/null
+++ 
b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonMergeIntoModel.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.merge.model;
+
+import java.util.List;
+
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.execution.command.mutation.merge.MergeAction;
+
+public class CarbonMergeIntoModel {
+  TableModel target;
+  TableModel source;
+  Expression mergeCondition;
+  List<Expression> mergeExpressions;
+  List<MergeAction> mergeActions;
+
+  public CarbonMergeIntoModel(TableModel target, TableModel source, Expression 
mergeCondition,
+                              List<Expression> mergeExpressions, 
List<MergeAction> mergeActions) {
+    this.target = target;
+    this.source = source;
+    this.mergeCondition = mergeCondition;
+    this.mergeExpressions = mergeExpressions;
+    this.mergeActions = mergeActions;
+  }
+
+  public TableModel getTarget() {
+    return target;
+  }
+
+  public void setTarget(TableModel target) {
+    this.target = target;
+  }
+
+  public TableModel getSource() {
+    return source;
+  }
+
+  public void setSource(TableModel source) {
+    this.source = source;
+  }
+
+  public Expression getMergeCondition() {
+    return mergeCondition;
+  }
+
+  public void setMergeCondition(Expression mergeCondition) {
+    this.mergeCondition = mergeCondition;
+  }
+
+  public List<Expression> getMergeExpressions() {
+    return mergeExpressions;
+  }
+
+  public void setMergeExpressions(List<Expression> mergeExpressions) {
+    this.mergeExpressions = mergeExpressions;
+  }
+
+  public List<MergeAction> getMergeActions() {
+    return mergeActions;
+  }
+
+  public void setMergeActions(List<MergeAction> mergeActions) {
+    this.mergeActions = mergeActions;
+  }
+}
diff --git 
a/integration/spark/src/main/java/org/apache/spark/sql/merge/model/ColumnModel.java
 
b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/ColumnModel.java
new file mode 100644
index 0000000..256bd5d
--- /dev/null
+++ 
b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/ColumnModel.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.merge.model;
+
+public class ColumnModel {
+
+  private String table;
+  private String colName;
+
+  public ColumnModel() {
+
+  }
+
+  public ColumnModel(String table, String colName) {
+    this.table = table;
+    this.colName = colName;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public String getColName() {
+    return colName;
+  }
+
+  public void setColName(String colName) {
+    this.colName = colName;
+  }
+}
diff --git 
a/integration/spark/src/main/java/org/apache/spark/sql/merge/model/TableModel.java
 
b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/TableModel.java
new file mode 100644
index 0000000..1fd8905
--- /dev/null
+++ 
b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/TableModel.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.merge.model;
+
+public class TableModel {
+
+  String database;
+  String table;
+  String alias;
+
+  public TableModel() {
+
+  }
+
+  public TableModel(String database, String table, String alias) {
+    this.database = database;
+    this.table = table;
+    this.alias = alias;
+  }
+
+  public String getAlias() {
+    return alias;
+  }
+
+  public void setAlias(String alias) {
+    this.alias = alias;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
index 04ed2c7..b995a75 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql.execution.command.mutation.merge
 
 import java.sql.{Date, Timestamp}
@@ -28,7 +29,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
  */
 case class MergeProjection(
     @transient tableCols: Seq[String],
-    @transient statusCol : String,
+    @transient statusCol: String,
     @transient ds: Dataset[Row],
     @transient rltn: CarbonDatasourceHadoopRelation,
     @transient sparkSession: SparkSession,
@@ -58,8 +59,8 @@ case class MergeProjection(
   private def generateProjection: (Projection, Array[Expression]) = {
     val existingDsOutput = rltn.carbonRelation.schema.toAttributes
     val colsMap = mergeAction match {
-      case UpdateAction(updateMap) => updateMap
-      case InsertAction(insertMap) => insertMap
+      case UpdateAction(updateMap, isStar: Boolean) => updateMap
+      case InsertAction(insertMap, isStar: Boolean) => insertMap
       case _ => null
     }
     if (colsMap != null) {
@@ -81,9 +82,9 @@ case class MergeProjection(
       if (output.contains(null)) {
         throw new CarbonMergeDataSetException(s"Not all columns are mapped")
       }
-      (new InterpretedMutableProjection(output++Seq(
+      (new InterpretedMutableProjection(output ++ Seq(
         ds.queryExecution.analyzed.resolveQuoted(statusCol,
-        sparkSession.sessionState.analyzer.resolver).get),
+          sparkSession.sessionState.analyzer.resolver).get),
         ds.queryExecution.analyzed.output), expectOutput)
     } else {
       (null, null)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala
index cba30ef..55d3b4a 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql.execution.command.mutation.merge
 
 import scala.collection.mutable.ArrayBuffer
@@ -71,9 +72,11 @@ case class WhenNotMatchedAndExistsOnlyOnTarget(expression: 
Option[Column] = None
   override def getExp: Option[Column] = expression
 }
 
-case class UpdateAction(updateMap: Map[Column, Column]) extends MergeAction
+case class UpdateAction(var updateMap: Map[Column, Column], isStar: Boolean = 
false)
+  extends MergeAction
 
-case class InsertAction(insertMap: Map[Column, Column]) extends MergeAction
+case class InsertAction(var insertMap: Map[Column, Column], isStar: Boolean = 
false)
+  extends MergeAction
 
 /**
  * It inserts the history data into history table
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonAntlrParser.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonAntlrParser.scala
new file mode 100644
index 0000000..4c3dd5c
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonAntlrParser.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+
+import org.antlr.v4.runtime.{CharStreams, CommonTokenStream}
+import org.apache.spark.sql.{CarbonAntlrSqlVisitor, CarbonMergeIntoSQLCommand}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.execution.command.AtomicRunnableCommand
+
+class CarbonAntlrParser(sparkParser: ParserInterface) {
+
+  def parse(sqlText: String): AtomicRunnableCommand = {
+
+    if (!sqlText.trim.toLowerCase.startsWith("merge")) {
+      throw new UnsupportedOperationException(
+        "Antlr SQL Parser will only deal with Merge Into SQL Command")
+    }
+    val visitor = new CarbonAntlrSqlVisitor(sparkParser)
+    val lexer = new CarbonSqlBaseLexer(CharStreams.fromString(sqlText))
+    val tokenStream = new CommonTokenStream(lexer)
+    val parser = new CarbonSqlBaseParser(tokenStream)
+    val mergeInto = visitor.visitMergeIntoCarbonTable(parser.mergeInto)
+    // In this place check the mergeInto Map for update *
+    CarbonMergeIntoSQLCommand(mergeInto)
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
index a072b26..a57ca36 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
@@ -37,6 +37,7 @@ class CarbonExtensionSqlParser(
 ) extends SparkSqlParser(conf) {
 
   val parser = new CarbonExtensionSpark2SqlParser
+  val antlrParser = new CarbonAntlrParser(this)
 
   override def parsePlan(sqlText: String): LogicalPlan = {
     parser.synchronized {
@@ -44,28 +45,36 @@ class CarbonExtensionSqlParser(
     }
     CarbonThreadUtil.updateSessionInfoToCurrentThread(sparkSession)
     try {
-      val plan = parser.parse(sqlText)
-      plan
+      parser.parse(sqlText)
     } catch {
       case ce: MalformedCarbonCommandException =>
         throw ce
-      case ex: Throwable =>
+      case ct: Throwable =>
         try {
-          val parsedPlan = initialParser.parsePlan(sqlText)
-          CarbonScalaUtil.cleanParserThreadLocals
-          parsedPlan
+          antlrParser.parse(sqlText)
         } catch {
-          case mce: MalformedCarbonCommandException =>
-            throw mce
-          case e: Throwable =>
-            e.printStackTrace(System.err)
-            CarbonScalaUtil.cleanParserThreadLocals
-            CarbonException.analysisException(
-              s"""== Parser1: ${parser.getClass.getName} ==
-                 |${ex.getMessage}
-                 |== Parser2: ${initialParser.getClass.getName} ==
-                 |${e.getMessage}
+          case ce: MalformedCarbonCommandException =>
+            throw ce
+          case at: Throwable =>
+            try {
+              val parsedPlan = initialParser.parsePlan(sqlText)
+              CarbonScalaUtil.cleanParserThreadLocals
+              parsedPlan
+            } catch {
+              case mce: MalformedCarbonCommandException =>
+                throw mce
+              case st: Throwable =>
+                st.printStackTrace(System.err)
+                CarbonScalaUtil.cleanParserThreadLocals
+                CarbonException.analysisException(
+                  s"""== Spark Parser: ${initialParser.getClass.getName} ==
+                     |${st.getMessage}
+                     |== Carbon Parser: ${ parser.getClass.getName } ==
+                     |${ct.getMessage}
+                     |== Antlr Parser: ${antlrParser.getClass.getName} ==
+                     |${at.getMessage}
                """.stripMargin.trim)
+            }
         }
     }
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/util/SparkUtil.scala 
b/integration/spark/src/main/scala/org/apache/spark/util/SparkUtil.scala
index e92cf5e..7c7aac5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/SparkUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/SparkUtil.scala
@@ -17,16 +17,32 @@
 
 package org.apache.spark.util
 
+import scala.collection.JavaConverters.{mapAsScalaMapConverter, _}
+
 import org.apache.spark.{SPARK_VERSION, TaskContext}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Column, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
-import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, 
DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, 
ShortType, StringType, TimestampType}
+import org.apache.spark.sql.execution.command.mutation.merge.MergeAction
+import org.apache.spark.sql.types._
 
 /*
  * this object use to handle file splits
  */
 object SparkUtil {
 
+  def convertMap(map: java.util.Map[Column, Column]): Map[Column, Column] = {
+    map.asScala.toMap
+  }
+
+  def convertExpressionList(list: java.util.List[Expression]): 
List[Expression] = {
+    list.asScala.toList
+  }
+
+  def convertMergeActionList(list: java.util.List[MergeAction]): 
List[MergeAction] = {
+    list.asScala.toList
+  }
+
   def setTaskContext(context: TaskContext): Unit = {
     val localThreadContext = TaskContext.get()
     if (localThreadContext == null) {
@@ -68,7 +84,7 @@ object SparkUtil {
     }
   }
 
-  def isPrimitiveType(datatype : DataType): Boolean = {
+  def isPrimitiveType(datatype: DataType): Boolean = {
     datatype match {
       case StringType => true
       case ByteType => true
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/MergeIntoCarbonTableTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/MergeIntoCarbonTableTestCase.scala
new file mode 100644
index 0000000..69a1909
--- /dev/null
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/MergeIntoCarbonTableTestCase.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.iud
+
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
+class MergeIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterEach {
+  var df: DataFrame = _
+
+  override def beforeEach {
+    dropTable()
+    buildTestData()
+  }
+
+  private def buildTestData(): Unit = {
+    createTable()
+  }
+
+  private def createTable(): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS A(
+         |   id Int,
+         |   price Int,
+         |   state String
+         | )
+         | STORED AS carbondata
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS B(
+         |   id Int,
+         |   price Int,
+         |   state String
+         | )
+         | STORED AS carbondata
+       """.stripMargin)
+
+    sql(s"""INSERT INTO A VALUES (1,100,"MA")""")
+    sql(s"""INSERT INTO A VALUES (2,200,"NY")""")
+    sql(s"""INSERT INTO A VALUES (3,300,"NH")""")
+    sql(s"""INSERT INTO A VALUES (4,400,"FL")""")
+
+    sql(s"""INSERT INTO B VALUES (1,1,"MA (updated)")""")
+    sql(s"""INSERT INTO B VALUES (2,3,"NY (updated)")""")
+    sql(s"""INSERT INTO B VALUES (3,3,"CA (updated)")""")
+    sql(s"""INSERT INTO B VALUES (5,5,"TX (updated)")""")
+    sql(s"""INSERT INTO B VALUES (7,7,"LO (updated)")""")
+  }
+
+  private def dropTable() = {
+    sql("DROP TABLE IF EXISTS A")
+    sql("DROP TABLE IF EXISTS B")
+  }
+
+  test("test merge into delete") {
+    sql(
+      """MERGE INTO A
+        |USING B
+        |ON A.ID=B.ID
+        |WHEN MATCHED THEN DELETE""".stripMargin)
+
+    checkAnswer(sql("select * from A"), Seq(Row(4, 400, "FL")))
+  }
+
+  test("test merge into delete and update") {
+    sql(
+      """MERGE INTO A
+        |USING B
+        |ON A.ID=B.ID
+        |WHEN MATCHED AND A.ID=2 THEN DELETE
+        |WHEN MATCHED AND A.ID=1 THEN UPDATE SET *""".stripMargin)
+
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 1, "MA (updated)"),
+        Row(3, 300, "NH"),
+        Row(4, 400, "FL")))
+  }
+
+  test("test merge into update with expression") {
+    sql(
+      """MERGE INTO A
+        |USING B
+        |ON A.ID=B.ID
+        |WHEN MATCHED AND A.ID=2 THEN DELETE
+        |WHEN MATCHED AND A.ID=1 THEN UPDATE SET  A.id=9, A.price=100, 
A.state='hahaha'
+        |""".stripMargin)
+
+    sql("select * from A").show(100, false)
+
+    checkAnswer(sql("select * from A"),
+      Seq(Row(9, 100, "hahaha"),
+        Row(3, 300, "NH"),
+        Row(4, 400, "FL")))
+  }
+
+  test("test merge into delete and insert") {
+    sql(
+      """MERGE INTO A
+        |USING B
+        |ON A.ID=B.ID
+        |WHEN MATCHED AND A.ID=2 THEN DELETE
+        |WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 100, "MA"),
+        Row(3, 300, "NH"),
+        Row(4, 400, "FL"),
+        Row(5, 5, "TX (updated)"),
+        Row(7, 7, "LO (updated)")))
+  }
+
+  test("test merge into delete and update and insert") {
+    sql(
+      """MERGE INTO A
+        |USING B
+        |ON A.ID=B.ID
+        |WHEN MATCHED AND A.ID=2 THEN DELETE
+        |WHEN MATCHED AND A.ID=1 THEN UPDATE  SET *
+        |WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 1, "MA (updated)"),
+        Row(3, 300, "NH"),
+        Row(4, 400, "FL"),
+        Row(5, 5, "TX (updated)"),
+        Row(7, 7, "LO (updated)")))
+  }
+
+  test("test merge into update and insert") {
+    sql(
+      """MERGE INTO A
+        |USING B
+        |ON A.ID=B.ID
+        |WHEN MATCHED AND A.ID=1 THEN UPDATE  SET *
+        |WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 1, "MA (updated)"),
+        Row(2, 200, "NY"),
+        Row(3, 300, "NH"),
+        Row(4, 400, "FL"),
+        Row(5, 5, "TX (updated)"),
+        Row(7, 7, "LO (updated)")))
+  }
+
+  test("test merge into delete with condition") {
+    sql(
+      """MERGE INTO A
+        |USING B
+        |ON A.ID=B.ID
+        |WHEN MATCHED AND B.ID=2 THEN DELETE""".stripMargin)
+
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 100, "MA"), Row(3, 300, "NH"), Row(4, 400, "FL")))
+  }
+
+  test("test merge into update all cols") {
+    sql(
+      """MERGE INTO A USING B
+        |ON A.ID=B.ID
+        |WHEN MATCHED THEN UPDATE SET *""".stripMargin)
+
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 1, "MA (updated)"),
+        Row(2, 3, "NY (updated)"),
+        Row(3, 3, "CA (updated)"),
+        Row(4, 400, "FL")))
+  }
+
+  test("test merge into update all cols with condition") {
+    sql(
+      """MERGE INTO A USING B
+        |ON A.ID=B.ID
+        |WHEN MATCHED AND A.ID=2 THEN UPDATE SET *""".stripMargin)
+
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 100, "MA"), Row(2, 3, "NY (updated)"), Row(3, 300, "NH"), 
Row(4, 400, "FL")))
+  }
+
+  test("test merge into update all cols with multiple condition") {
+    sql(
+      """MERGE INTO A USING B
+        |ON A.ID=B.ID
+        |WHEN MATCHED AND A.ID=2 THEN UPDATE SET *
+        |WHEN MATCHED AND A.ID=3 THEN UPDATE SET *""".stripMargin)
+
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 100, "MA"),
+        Row(2, 3, "NY (updated)"),
+        Row(3, 3, "CA (updated)"),
+        Row(4, 400, "FL")))
+  }
+
+  test("test merge into insert all cols") {
+    sql(
+      """MERGE INTO A USING B
+        |ON A.ID=B.ID
+        |WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 100, "MA"),
+        Row(2, 200, "NY"),
+        Row(3, 300, "NH"),
+        Row(4, 400, "FL"),
+        Row(5, 5, "TX (updated)"),
+        Row(7, 7, "LO (updated)")))
+  }
+
+  test("test merge into insert all cols with condition") {
+    sql(
+      """MERGE INTO A USING B
+        |ON A.ID=B.ID
+        |WHEN NOT MATCHED AND B.ID=7 THEN INSERT *""".stripMargin)
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 100, "MA"),
+        Row(2, 200, "NY"),
+        Row(3, 300, "NH"),
+        Row(4, 400, "FL"),
+        Row(7, 7, "LO (updated)")))
+  }
+
+  test("test merge into insert all cols with multiple condition") {
+    sql(
+      """MERGE INTO A USING B
+        |ON A.ID=B.ID
+        |WHEN NOT MATCHED AND B.ID=5 THEN INSERT *
+        |WHEN NOT MATCHED AND B.ID=7 THEN INSERT *""".stripMargin)
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 100, "MA"),
+        Row(2, 200, "NY"),
+        Row(3, 300, "NH"),
+        Row(4, 400, "FL"),
+        Row(5, 5, "TX (updated)"),
+        Row(7, 7, "LO (updated)")))
+  }
+
+  test("test merge into insert with literal") {
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED " +
+      "AND B.ID=7 THEN INSERT (A" +
+      ".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE, 'test-string')"
+    sql(sqlText)
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 100, "MA"),
+        Row(2, 200, "NY"),
+        Row(3, 300, "NH"),
+        Row(4, 400, "FL"),
+        Row(7, 7, "test-string")))
+  }
+
+  test("test merge into insert with expression") {
+    val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED " +
+      "AND B.ID=7 THEN INSERT (A" +
+      ".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE + 10, B.state)"
+    sql(sqlText)
+    checkAnswer(sql("select * from A"),
+      Seq(Row(1, 100, "MA"),
+        Row(2, 200, "NY"),
+        Row(3, 300, "NH"),
+        Row(4, 400, "FL"),
+        Row(7, 17, "LO (updated)")))
+  }
+
+  test("test merge into not exist table exception") {
+    val exceptionCaught = intercept[MalformedCarbonCommandException] {
+      val sqlText = "MERGE INTO A USING C ON A.ID=C.ID WHEN NOT MATCHED " +
+        "AND C.ID=7 THEN INSERT (A" +
+        ".ID,A.PRICE, A.state) VALUES (C.ID,C.PRICE + 10, C.state)"
+      sql(sqlText)
+    }
+    assert(exceptionCaught.getMessage.contains("table default.C not found"))
+  }
+
+  test("test merge into parse exception for size of action columns and 
expression") {
+    val exceptionCaught = intercept[MalformedCarbonCommandException] {
+      val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED " +
+        "AND B.ID=7 THEN INSERT (A" +
+        ".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE + 10)"
+      sql(sqlText)
+    }
+    assert(exceptionCaught.getMessage.contains("Parse failed: " +
+      "size of columns is not equal to size of expression in not matched 
action"))
+  }
+
+  test("test merge into parse exception for tree node parsing") {
+    val exceptionCaught = intercept[MalformedCarbonCommandException] {
+      val sqlText = "MERGE INTO A USING  ON A.ID=B.ID WHEN NOT MATCHED " +
+        "AND B.ID=7 THEN INSERT (A" +
+        ".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE + 10, B.state)"
+      sql(sqlText)
+    }
+    assert(exceptionCaught.getMessage.contains("Parse failed"))
+  }
+}
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index d12b2d4..793f8ff 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
 /**
- * Test Class for join query with orderby and limit
+ * Test Class for carbon merge api
  */
 
 class MergeTestCase extends QueryTest with BeforeAndAfterAll {
diff --git a/pom.xml b/pom.xml
index cd21b4d..c22836e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,6 +131,7 @@
     <hadoop.deps.scope>compile</hadoop.deps.scope>
     <spark.version>2.3.4</spark.version>
     <spark.binary.version>2.3</spark.binary.version>
+    <antlr4.version>4.8</antlr4.version>
     <spark.deps.scope>compile</spark.deps.scope>
     <scala.deps.scope>compile</scala.deps.scope>
     <dev.path>${basedir}/dev</dev.path>
@@ -522,7 +523,11 @@
           </execution>
         </executions>
       </plugin>
-
+      <plugin>
+        <groupId>org.antlr</groupId>
+        <artifactId>antlr4-maven-plugin</artifactId>
+        <version>${antlr4.version}</version>
+      </plugin>
     </plugins>
   </build>
 

Reply via email to