This is an automated email from the ASF dual-hosted git repository. qiangcai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new e019806 [CARBONDATA-4065] Support MERGE INTO SQL Command e019806 is described below commit e019806af5c27bd7d75ce144105570a21369013b Author: BrooksLi <jianxili2...@outlook.com> AuthorDate: Thu Nov 5 14:42:35 2020 +0800 [CARBONDATA-4065] Support MERGE INTO SQL Command Why is this PR needed? In order to support MERGE INTO SQL Command in Carbondata The previous Scala Parser having trouble to parse the complicated Merge Into SQL Command What changes were proposed in this PR? Add an ANTLR parser, and support parse MERGE INTO SQL Command to DataSet Command Does this PR introduce any user interface change? Yes. The PR introduces the MERGE INTO SQL Command. Is any new testcase added? Yes This closes #4032 Co-authored-by: Zhangshunyu <zhangshunyu1...@126.com> --- dev/findbugs-exclude.xml | 22 +- docs/scd-and-cdc-guide.md | 38 +- .../carbondata/examples/DataMergeIntoExample.scala | 164 ++++++ integration/spark/pom.xml | 21 + .../org/apache/spark/sql/parser/CarbonSqlBase.g4 | 646 +++++++++++++++++++++ .../apache/spark/sql/CarbonAntlrSqlVisitor.java | 323 +++++++++++ .../spark/sql/CarbonMergeIntoSQLCommand.scala | 119 ++++ .../sql/merge/model/CarbonJoinExpression.java | 66 +++ .../sql/merge/model/CarbonMergeIntoModel.java | 80 +++ .../apache/spark/sql/merge/model/ColumnModel.java | 49 ++ .../apache/spark/sql/merge/model/TableModel.java | 59 ++ .../command/mutation/merge/MergeProjection.scala | 11 +- .../command/mutation/merge/interfaces.scala | 7 +- .../spark/sql/parser/CarbonAntlrParser.scala | 41 ++ .../sql/parser/CarbonExtensionSqlParser.scala | 41 +- .../scala/org/apache/spark/util/SparkUtil.scala | 22 +- .../iud/MergeIntoCarbonTableTestCase.scala | 314 ++++++++++ .../spark/testsuite/merge/MergeTestCase.scala | 2 +- pom.xml | 7 +- 19 files changed, 1997 insertions(+), 35 deletions(-) diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml index 060ea09..33dd252 100644 --- a/dev/findbugs-exclude.xml +++ b/dev/findbugs-exclude.xml @@ -21,11 +21,11 @@ </Match> <Match> - <Source name="~.*\.scala" /> + <Source name="~.*\.scala"/> </Match> <Match> - <Source name="~.*Test\.java" /> + <Source name="~.*Test\.java"/> </Match> <!-- This method creates stream but the caller methods are responsible for closing the stream --> @@ -95,7 +95,7 @@ <Match> <Class name="org.apache.carbondata.core.scan.aggregator.impl.BitSet"/> <Or> - <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> + <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> </Or> </Match> <Match> @@ -126,6 +126,18 @@ <Class name="org.apache.carbondata.events.OperationContext"/> <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> </Match> - <Match> <Class name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/> <Field name="indexExprWrapper"/> <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> </Match> - <Match> <Class name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/> <Field name="validSegments"/> <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> </Match> + <Match> + <Class name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/> + <Field name="indexExprWrapper"/> + <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> + </Match> + <Match> + <Class name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/> + <Field name="validSegments"/> + <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> + </Match> + + <Match> + <Package name="org.apache.spark.sql.parser"/> + </Match> </FindBugsFilter> \ No newline at end of file diff --git a/docs/scd-and-cdc-guide.md b/docs/scd-and-cdc-guide.md index 892958b..38a2973 100644 --- a/docs/scd-and-cdc-guide.md +++ b/docs/scd-and-cdc-guide.md @@ -55,8 +55,42 @@ Below is the detailed description of the `merge` API operation. * `whenNotMatched` clause can have only the `insertExpr` action. The new row is generated based on the specified column and corresponding expressions. Users do not need to specify all the columns in the target table. For unspecified target columns, NULL is inserted. * `whenNotMatchedAndExistsOnlyOnTarget` clause is executed when row does not match source and exists only in target. This clause can have only delete action. -**NOTE:** SQL syntax for merge is not yet supported. +#### MERGE SQL + +Below sql merges a set of updates, insertions, and deletions based on a source table +into a target carbondata table. + +``` + MERGE INTO target_table_identifier + USING source_table_identifier + ON <merge_condition> + [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ] + [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ] + [ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ] +``` + +#### MERGE SQL Operation Semantics +Below is the detailed description of the `merge` SQL operation. +* `table_identifier` a table name, optionally qualified with a database name +* `merge_condition` how the rows from one relation are combined with the rows of another relation. An expression with a return type of Boolean. +* `WHEN MATCHED` clauses are executed when a source row matches a target table row based on the match condition, +clauses can have at most one UPDATE and one DELETE action, These clauses have the following semantics. + * The UPDATE action in merge only updates the specified columns of the matched target row. + * The DELETE action will delete the matched row. + * WHEN MATCHED clauses can have at most one UPDATE and one DELETE action. The UPDATE action in merge only updates the specified columns of the matched target row. The DELETE action will delete the matched row. + * Each WHEN MATCHED clause can have an optional condition. If this clause condition exists, the UPDATE or DELETE action is executed for any matching source-target row pair row only when when the clause condition is true. + * If there are multiple WHEN MATCHED clauses, then they are evaluated in order they are specified (that is, the order of the clauses matter). All WHEN MATCHED clauses, except the last one, must have conditions. + * If both WHEN MATCHED clauses have conditions and neither of the conditions are true for a matching source-target row pair, then the matched target row is left unchanged. + * To update all the columns of the target carbondata table with the corresponding columns of the source dataset, use UPDATE SET *. This is equivalent to UPDATE SET col1 = source.col1 [, col2 = source.col2 ...] for all the columns of the target carbondata table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error. +* `matched_action` can be DELETE | UPDATE SET * |UPDATE SET column1 = value1 [, column2 = value2 ...] +* `WHEN NOT MATCHED` clause is executed when a source row does not match any target row based on the match condition, these clauses have the following semantics. + * WHEN NOT MATCHED clauses can only have the INSERT action. The new row is generated based on the specified column and corresponding expressions. All the columns in the target table do not need to be specified. For unspecified target columns, NULL is inserted. + * Each WHEN NOT MATCHED clause can have an optional condition. If the clause condition is present, a source row is inserted only if that condition is true for that row. Otherwise, the source column is ignored. + * If there are multiple WHEN NOT MATCHED clauses, then they are evaluated in order they are specified (that is, the order of the clauses matter). All WHEN NOT MATCHED clauses, except the last one, must have conditions. + * To insert all the columns of the target carbondata table with the corresponding columns of the source dataset, use INSERT *. This is equivalent to INSERT (col1 [, col2 ...]) VALUES (source.col1 [, source.col2 ...]) for all the columns of the target carbondata table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query will throw an error. +* `not_matched_action` can be INSERT * | INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...]) ##### Example code to implement cdc/scd scenario -Please refer example class [MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala) to understand and implement scd and cdc scenarios. +Please refer example class [MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala) to understand and implement scd and cdc scenarios using api. +Please refer example class [DataMergeIntoExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala) to understand and implement scd and cdc scenarios using sql. diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala new file mode 100644 index 0000000..e3dc2da --- /dev/null +++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.examples.util.ExampleUtils + +object DataMergeIntoExample { + + def main(args: Array[String]) { + val spark = ExampleUtils.createSparkSession("DataManagementExample") + deleteExampleBody(spark) + deleteWithExpressionExample(spark) + updateExampleBody(spark) + updateWithExpressionExample(spark) + updateSpecificColWithExpressionExample(spark) + insertExampleBody(spark) + insertWithExpressionExample(spark) + insertSpecificColWithExpressionExample(spark) + spark.close() + } + + def initTable(spark: SparkSession): Unit = { + spark.sql("DROP TABLE IF EXISTS A") + spark.sql("DROP TABLE IF EXISTS B") + + spark.sql( + s""" + | CREATE TABLE IF NOT EXISTS A( + | id Int, + | price Int, + | state String + | ) + | STORED AS carbondata + """.stripMargin) + + spark.sql( + s""" + | CREATE TABLE IF NOT EXISTS B( + | id Int, + | price Int, + | state String + | ) + | STORED AS carbondata + """.stripMargin) + + spark.sql(s"""INSERT INTO A VALUES (1,100,"MA")""") + spark.sql(s"""INSERT INTO A VALUES (2,200,"NY")""") + spark.sql(s"""INSERT INTO A VALUES (3,300,"NH")""") + spark.sql(s"""INSERT INTO A VALUES (4,400,"FL")""") + + spark.sql(s"""INSERT INTO B VALUES (1,1,"MA (updated)")""") + spark.sql(s"""INSERT INTO B VALUES (2,3,"NY (updated)")""") + spark.sql(s"""INSERT INTO B VALUES (3,3,"CA (updated)")""") + spark.sql(s"""INSERT INTO B VALUES (5,5,"TX (updated)")""") + spark.sql(s"""INSERT INTO B VALUES (7,7,"LO (updated)")""") + } + + def dropTables(spark: SparkSession): Unit = { + spark.sql("DROP TABLE IF EXISTS A") + spark.sql("DROP TABLE IF EXISTS B") + } + + def deleteExampleBody(spark: SparkSession): Unit = { + dropTables(spark) + initTable(spark) + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED THEN DELETE" + spark.sql(sqlText) + spark.sql(s"""SELECT * FROM A""").show() + dropTables(spark) + } + + def deleteWithExpressionExample(spark: SparkSession): Unit = { + dropTables(spark) + initTable(spark) + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND B.ID=2 THEN DELETE" + spark.sql(sqlText) + spark.sql(s"""SELECT * FROM A""").show() + dropTables(spark) + } + + def updateExampleBody(spark: SparkSession): Unit = { + dropTables(spark) + initTable(spark) + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED THEN UPDATE SET *" + spark.sql(sqlText) + spark.sql(s"""SELECT * FROM A""").show() + dropTables(spark) + } + + def updateWithExpressionExample(spark: SparkSession): Unit = { + dropTables(spark) + initTable(spark) + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND A.ID=2 THEN UPDATE SET *" + spark.sql(sqlText) + spark.sql(s"""SELECT * FROM A""").show() + dropTables(spark) + } + + def updateSpecificColWithExpressionExample(spark: SparkSession): Unit = { + dropTables(spark) + initTable(spark) + // In this example, it will only update the state + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND A.ID=2 THEN UPDATE SET " + + "STATE=B.STATE" + spark.sql(sqlText) + spark.sql(s"""SELECT * FROM A""").show() + dropTables(spark) + } + + def updateSpecificMultiColWithExpressionExample(spark: SparkSession): Unit = { + dropTables(spark) + initTable(spark) + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND A.ID=2 THEN UPDATE SET A" + + ".STATE=B.STATE, A.PRICE=B.PRICE" + spark.sql(sqlText) + spark.sql(s"""SELECT * FROM A""").show() + dropTables(spark) + } + + def insertExampleBody(spark: SparkSession): Unit = { + dropTables(spark) + initTable(spark) + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED THEN INSERT *" + spark.sql(sqlText) + spark.sql(s"""SELECT * FROM A""").show() + dropTables(spark) + } + + def insertWithExpressionExample(spark: SparkSession): Unit = { + dropTables(spark) + initTable(spark) + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED AND B.ID=7 THEN INSERT *" + spark.sql(sqlText) + spark.sql(s"""SELECT * FROM A""").show() + dropTables(spark) + } + + def insertSpecificColWithExpressionExample(spark: SparkSession): Unit = { + dropTables(spark) + initTable(spark) + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED AND B.ID=7 THEN INSERT (A" + + ".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE, 'test-string')" + spark.sql(sqlText) + spark.sql(s"""SELECT * FROM A""").show() + dropTables(spark) + } +} diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml index 2a591e7..6dd3ed4 100644 --- a/integration/spark/pom.xml +++ b/integration/spark/pom.xml @@ -84,6 +84,11 @@ <dependencies> <!-- carbon --> <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + <version>${antlr4.version}</version> + </dependency> + <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-hive</artifactId> <version>${project.version}</version> @@ -528,6 +533,22 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.antlr</groupId> + <artifactId>antlr4-maven-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>antlr4</goal> + </goals> + </execution> + </executions> + <configuration> + <visitor>true</visitor> + <sourceDirectory>../spark/src/main/antlr4</sourceDirectory> + <treatWarningsAsErrors>true</treatWarningsAsErrors> + </configuration> + </plugin> </plugins> </build> diff --git a/integration/spark/src/main/antlr4/org/apache/spark/sql/parser/CarbonSqlBase.g4 b/integration/spark/src/main/antlr4/org/apache/spark/sql/parser/CarbonSqlBase.g4 new file mode 100644 index 0000000..4efb340 --- /dev/null +++ b/integration/spark/src/main/antlr4/org/apache/spark/sql/parser/CarbonSqlBase.g4 @@ -0,0 +1,646 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Code ported from Apache Spark + * spark/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 + */ +grammar CarbonSqlBase; + +@parser::members { + /** + * When false, INTERSECT is given the greater precedence over the other set + * operations (UNION, EXCEPT and MINUS) as per the SQL standard. + */ + public boolean legacy_setops_precedence_enbled = false; + + /** + * When false, a literal with an exponent would be converted into + * double type rather than decimal type. + */ + public boolean legacy_exponent_literal_as_decimal_enabled = false; + + /** + * When true, the behavior of keywords follows ANSI SQL standard. + */ + public boolean SQL_standard_keyword_behavior = false; +} + +@lexer::members { + /** + * Verify whether current token is a valid decimal token (which contains dot). + * Returns true if the character that follows the token is not a digit or letter or underscore. + * + * For example: + * For char stream "2.3", "2." is not a valid decimal token, because it is followed by digit '3'. + * For char stream "2.3_", "2.3" is not a valid decimal token, because it is followed by '_'. + * For char stream "2.3W", "2.3" is not a valid decimal token, because it is followed by 'W'. + * For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token because it is followed + * by a space. 34.E2 is a valid decimal token because it is followed by symbol '+' + * which is not a digit or letter or underscore. + */ + public boolean isValidDecimal() { + int nextChar = _input.LA(1); + if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' || + nextChar == '_') { + return false; + } else { + return true; + } + } + + /** + * This method will be called when we see '/*' and try to match it as a bracketed comment. + * If the next character is '+', it should be parsed as hint later, and we cannot match + * it as a bracketed comment. + * + * Returns true if the next character is '+'. + */ + public boolean isHint() { + int nextChar = _input.LA(1); + if (nextChar == '+') { + return true; + } else { + return false; + } + } +} + +singleStatement + : statement ';'* EOF + ; + +singleExpression + : namedExpression EOF + ; + +singleTableIdentifier + : tableIdentifier EOF + ; + +singleMultipartIdentifier + : multipartIdentifier EOF + ; + +singleFunctionIdentifier + : functionIdentifier EOF + ; + +statement + : query #statementDefault + | ctes? dmlStatementNoWith #dmlStatement + ; + +query + : ctes? queryTerm queryOrganization + ; + +ctes + : WITH namedQuery (',' namedQuery)* + ; + +namedQuery + : name=errorCapturingIdentifier (columnAliases=identifierList)? AS? '(' query ')' + ; + +constantList + : '(' constant (',' constant)* ')' + ; + +nestedConstantList + : '(' constantList (',' constantList)* ')' + ; + +resource + : identifier STRING + ; + +dmlStatementNoWith + : DELETE FROM multipartIdentifier tableAlias whereClause? #deleteFromTable + | UPDATE multipartIdentifier tableAlias setClause whereClause? #updateTable + | MERGE INTO target=multipartIdentifier targetAlias=tableAlias + USING (source=multipartIdentifier | + '(' sourceQuery=query')') sourceAlias=tableAlias + ON mergeCondition=booleanExpression + matchedClause* + notMatchedClause* #mergeIntoTable + ; + +mergeInto + : MERGE INTO target=multipartIdentifier targetAlias=tableAlias + USING (source=multipartIdentifier | + '(' sourceQuery=query')') sourceAlias=tableAlias + ON mergeCondition=booleanExpression + matchedClause* + notMatchedClause* + ; + +queryOrganization + : (LIMIT (limit=expression))? + ; + +queryTerm + : queryPrimary #queryTermDefault + ; + +queryPrimary + : querySpecification #queryPrimaryDefault + | '(' query ')' #subquery + ; + +fromStatementBody + : selectClause + whereClause? + havingClause? + queryOrganization + ; + +querySpecification + : selectClause + fromClause? + whereClause? + havingClause? + ; + +selectClause + : SELECT (hints+=hint)* setQuantifier? namedExpressionSeq + ; + +setClause + : SET assignmentList + ; + +matchedClause + : WHEN MATCHED (AND matchedCond=booleanExpression)? THEN matchedAction + ; +notMatchedClause + : WHEN NOT MATCHED (AND notMatchedCond=booleanExpression)? THEN notMatchedAction + ; + +matchedAction + : DELETE + | UPDATE SET ASTERISK + | UPDATE SET assignmentList + ; + +notMatchedAction + : INSERT ASTERISK + | INSERT '(' columns=multipartIdentifierList ')' + VALUES '(' expression (',' expression)* ')' + ; + +assignmentList + : assignment (',' assignment)* + ; + +assignment + : key=multipartIdentifier EQ value=expression + ; + +whereClause + : WHERE booleanExpression + ; + +havingClause + : HAVING booleanExpression + ; + +hint + : '/*+' hintStatements+=hintStatement (','? hintStatements+=hintStatement)* '*/' + ; + +hintStatement + : hintName=identifier + | hintName=identifier '(' parameters+=primaryExpression (',' parameters+=primaryExpression)* ')' + ; + +fromClause + : FROM relation (',' relation)* + ; + +setQuantifier + : DISTINCT + ; + +relation + : relationPrimary + ; + +identifierList + : '(' identifierSeq ')' + ; + +identifierSeq + : ident+=errorCapturingIdentifier (',' ident+=errorCapturingIdentifier)* + ; + +relationPrimary + : multipartIdentifier #tableName + | inlineTable #inlineTableDefault2 + | functionTable #tableValuedFunction + ; + +inlineTable + : VALUES expression (',' expression)* tableAlias + ; + +functionTable + : funcName=errorCapturingIdentifier '(' (expression (',' expression)*)? ')' tableAlias + ; + +tableAlias + : (AS? strictIdentifier identifierList?)? + ; + +multipartIdentifierList + : multipartIdentifier (',' multipartIdentifier)* + ; + +multipartIdentifier + : parts+=errorCapturingIdentifier ('.' parts+=errorCapturingIdentifier)* + ; + +tableIdentifier + : (db=errorCapturingIdentifier '.')? table=errorCapturingIdentifier + ; + +functionIdentifier + : (db=errorCapturingIdentifier '.')? function=errorCapturingIdentifier + ; + +namedExpression + : expression (AS? (name=errorCapturingIdentifier | identifierList))? + ; + +namedExpressionSeq + : namedExpression (',' namedExpression)* + ; + +expression + : booleanExpression + ; + +booleanExpression + : NOT booleanExpression #logicalNot + | valueExpression predicate? #predicated + | left=booleanExpression operator=AND right=booleanExpression #logicalBinary + | left=booleanExpression operator=OR right=booleanExpression #logicalBinary + ; + +predicate + : NOT? kind=BETWEEN lower=valueExpression AND upper=valueExpression + | NOT? kind=IN '(' expression (',' expression)* ')' + | NOT? kind=IN '(' query ')' + | IS NOT? kind=NULL + | IS NOT? kind=(TRUE | FALSE) + | IS NOT? kind=DISTINCT FROM right=valueExpression + ; + +valueExpression + : primaryExpression #valueExpressionDefault + | operator=(MINUS | PLUS | TILDE) valueExpression #arithmeticUnary + | left=valueExpression operator=(ASTERISK | SLASH | PERCENT | DIV) right=valueExpression #arithmeticBinary + | left=valueExpression operator=(PLUS | MINUS | CONCAT_PIPE) right=valueExpression #arithmeticBinary + | left=valueExpression operator=AMPERSAND right=valueExpression #arithmeticBinary + | left=valueExpression operator=HAT right=valueExpression #arithmeticBinary + | left=valueExpression operator=PIPE right=valueExpression #arithmeticBinary + | left=valueExpression comparisonOperator right=valueExpression #comparison + ; + +primaryExpression + : name=(CURRENT_DATE | CURRENT_TIMESTAMP) #currentDatetime + | constant #constantDefault + | ASTERISK #star + | qualifiedName '.' ASTERISK #star + | '(' namedExpression (',' namedExpression)+ ')' #rowConstructor + | '(' query ')' #subqueryExpression + | identifier '->' expression #lambda + | '(' identifier (',' identifier)+ ')' '->' expression #lambda + | value=primaryExpression '[' index=valueExpression ']' #subscript + | identifier #columnReference + | base=primaryExpression '.' fieldName=identifier #dereference + | '(' expression ')' #parenthesizedExpression + ; + +constant + : NULL #nullLiteral + | identifier STRING #typeConstructor + | number #numericLiteral + | booleanValue #booleanLiteral + | STRING+ #stringLiteral + ; + +comparisonOperator + : EQ | NEQ | NEQJ | LT | LTE | GT | GTE | NSEQ + ; + +booleanValue + : TRUE | FALSE + ; + +qualifiedName + : identifier ('.' identifier)* + ; + +errorCapturingIdentifier + : identifier errorCapturingIdentifierExtra + ; + +// extra left-factoring grammar +errorCapturingIdentifierExtra + : (MINUS identifier)+ #errorIdent + | #realIdent + ; + +identifier + : strictIdentifier + | {!SQL_standard_keyword_behavior}? strictNonReserved + ; + +strictIdentifier + : IDENTIFIER #unquotedIdentifier + | quotedIdentifier #quotedIdentifierAlternative + | {SQL_standard_keyword_behavior}? ansiNonReserved #unquotedIdentifier + | {!SQL_standard_keyword_behavior}? nonReserved #unquotedIdentifier + ; + +quotedIdentifier + : BACKQUOTED_IDENTIFIER + ; + +number + : {!legacy_exponent_literal_as_decimal_enabled}? MINUS? EXPONENT_VALUE #exponentLiteral + | {!legacy_exponent_literal_as_decimal_enabled}? MINUS? DECIMAL_VALUE #decimalLiteral + | {legacy_exponent_literal_as_decimal_enabled}? MINUS? (EXPONENT_VALUE | DECIMAL_VALUE) #legacyDecimalLiteral + | MINUS? INTEGER_VALUE #integerLiteral + | MINUS? BIGINT_LITERAL #bigIntLiteral + | MINUS? SMALLINT_LITERAL #smallIntLiteral + | MINUS? TINYINT_LITERAL #tinyIntLiteral + | MINUS? DOUBLE_LITERAL #doubleLiteral + | MINUS? FLOAT_LITERAL #floatLiteral + | MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral + ; + +ansiNonReserved +//--ANSI-NON-RESERVED-START + : BETWEEN + | CURRENT + | DELETE + | DIV + | IF + | INSERT + | LIKE + | LIMIT + | MATCHED + | MERGE + | NO + | NULLS + | OF + | SET + | SETMINUS + | SETS + | SORT + | SORTED + | TRUE + | UPDATE + | VALUES +//--ANSI-NON-RESERVED-END + ; + +strictNonReserved + : JOIN + | LEFT + | ON + | RIGHT + | SEMI + | SETMINUS + | UNION + | USING + ; + +nonReserved + : AND + | AS + | BETWEEN + | CURRENT + | CURRENT_DATE + | CURRENT_TIME + | CURRENT_TIMESTAMP + | CURRENT_USER + | DELETE + | DISTINCT + | DIV + | ELSE + | END + | FALSE + | FROM + | IN + | INSERT + | INTO + | IS + | LIKE + | LIMIT + | MATCHED + | MERGE + | NO + | NOT + | NULL + | NULLS + | OF + | OR + | ORDER + | SELECT + | SET + | SETS + | SORT + | SORTED + | THEN + | TIME + | TO + | TRUE + | UPDATE + | VALUES + | WHEN + | WHERE +//--DEFAULT-NON-RESERVED-END + ; + +//============================ +// Start of the keywords list +//============================ +AND: 'AND'; +AS: 'AS'; +BETWEEN: 'BETWEEN'; +CURRENT: 'CURRENT'; +CURRENT_DATE: 'CURRENT_DATE'; +CURRENT_TIME: 'CURRENT_TIME'; +CURRENT_TIMESTAMP: 'CURRENT_TIMESTAMP'; +CURRENT_USER: 'CURRENT_USER'; +DELETE: 'DELETE'; +DISTINCT: 'DISTINCT'; +DIV: 'DIV'; +ELSE: 'ELSE'; +END: 'END'; +FALSE: 'FALSE'; +FROM: 'FROM'; +HAVING: 'HAVING'; +IF: 'IF'; +IN: 'IN'; +INSERT: 'INSERT'; +INTO: 'INTO'; +IS: 'IS'; +JOIN: 'JOIN'; +LEFT: 'LEFT'; +LIKE: 'LIKE'; +LIMIT: 'LIMIT'; +MATCHED: 'MATCHED'; +MERGE: 'MERGE'; +NO: 'NO'; +NOT: 'NOT' | '!'; +NULL: 'NULL'; +NULLS: 'NULLS'; +OF: 'OF'; +ON: 'ON'; +OR: 'OR'; +ORDER: 'ORDER'; +OUTER: 'OUTER'; +RIGHT: 'RIGHT'; +SELECT: 'SELECT'; +SEMI: 'SEMI'; +SET: 'SET'; +SETMINUS: 'MINUS'; +SETS: 'SETS'; +SORT: 'SORT'; +SORTED: 'SORTED'; +THEN: 'THEN'; +TIME: 'TIME'; +TO: 'TO'; +TRUE: 'TRUE'; +UNION: 'UNION'; +UPDATE: 'UPDATE'; +USING: 'USING'; +VALUES: 'VALUES'; +WHEN: 'WHEN'; +WHERE: 'WHERE'; +WITH: 'WITH'; + +EQ : '=' | '=='; +NSEQ: '<=>'; +NEQ : '<>'; +NEQJ: '!='; +LT : '<'; +LTE : '<=' | '!>'; +GT : '>'; +GTE : '>=' | '!<'; + +PLUS: '+'; +MINUS: '-'; +ASTERISK: '*'; +SLASH: '/'; +PERCENT: '%'; +TILDE: '~'; +AMPERSAND: '&'; +PIPE: '|'; +CONCAT_PIPE: '||'; +HAT: '^'; + +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + +BIGINT_LITERAL + : DIGIT+ 'L' + ; + +SMALLINT_LITERAL + : DIGIT+ 'S' + ; + +TINYINT_LITERAL + : DIGIT+ 'Y' + ; + +INTEGER_VALUE + : DIGIT+ + ; + +EXPONENT_VALUE + : DIGIT+ EXPONENT + | DECIMAL_DIGITS EXPONENT {isValidDecimal()}? + ; + +DECIMAL_VALUE + : DECIMAL_DIGITS {isValidDecimal()}? + ; + +FLOAT_LITERAL + : DIGIT+ EXPONENT? 'F' + | DECIMAL_DIGITS EXPONENT? 'F' {isValidDecimal()}? + ; + +DOUBLE_LITERAL + : DIGIT+ EXPONENT? 'D' + | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}? + ; + +BIGDECIMAL_LITERAL + : DIGIT+ EXPONENT? 'BD' + | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}? + ; + +IDENTIFIER + : (LETTER | DIGIT | '_')+ + ; + +BACKQUOTED_IDENTIFIER + : '`' ( ~'`' | '``' )* '`' + ; + +fragment DECIMAL_DIGITS + : DIGIT+ '.' DIGIT* + | '.' DIGIT+ + ; + +fragment EXPONENT + : 'E' [+-]? DIGIT+ + ; + +fragment DIGIT + : [0-9] + ; + +fragment LETTER + : [A-Z] + ; + +SIMPLE_COMMENT + : '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN) + ; + +BRACKETED_COMMENT + : '/*' {!isHint()}? (BRACKETED_COMMENT|.)*? '*/' -> channel(HIDDEN) + ; + +WS + : [ \r\n\t]+ -> channel(HIDDEN) + ; + +// Catch-all for anything we can't recognize. +// We use this to be able to ignore and recover all the text +// when splitting statements with DelimiterLexer +UNRECOGNIZED + : . + ; diff --git a/integration/spark/src/main/java/org/apache/spark/sql/CarbonAntlrSqlVisitor.java b/integration/spark/src/main/java/org/apache/spark/sql/CarbonAntlrSqlVisitor.java new file mode 100644 index 0000000..69a5f98 --- /dev/null +++ b/integration/spark/src/main/java/org/apache/spark/sql/CarbonAntlrSqlVisitor.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException; + +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.catalyst.parser.ParserInterface; +import org.apache.spark.sql.execution.command.mutation.merge.DeleteAction; +import org.apache.spark.sql.execution.command.mutation.merge.InsertAction; +import org.apache.spark.sql.execution.command.mutation.merge.MergeAction; +import org.apache.spark.sql.execution.command.mutation.merge.UpdateAction; +import org.apache.spark.sql.merge.model.CarbonJoinExpression; +import org.apache.spark.sql.merge.model.CarbonMergeIntoModel; +import org.apache.spark.sql.merge.model.ColumnModel; +import org.apache.spark.sql.merge.model.TableModel; +import org.apache.spark.sql.parser.CarbonSqlBaseParser; +import org.apache.spark.util.SparkUtil; + +public class CarbonAntlrSqlVisitor { + + private final ParserInterface sparkParser; + + public CarbonAntlrSqlVisitor(ParserInterface sparkParser) { + this.sparkParser = sparkParser; + } + + public String visitTableAlias(CarbonSqlBaseParser.TableAliasContext ctx) { + if (null == ctx.children) { + return null; + } + String res = ctx.getChild(1).getText(); + return res; + } + + public MergeAction visitCarbonAssignmentList(CarbonSqlBaseParser.AssignmentListContext ctx) + throws MalformedCarbonCommandException { + // UPDATE SET assignmentList + Map<Column, Column> map = new HashMap<>(); + for (int currIdx = 0; currIdx < ctx.getChildCount(); currIdx++) { + if (ctx.getChild(currIdx) instanceof CarbonSqlBaseParser.AssignmentContext) { + //Assume the actions are all use to pass value + String left = ctx.getChild(currIdx).getChild(0).getText(); + if (left.split("\\.").length > 1) { + left = left.split("\\.")[1]; + } + String right = ctx.getChild(currIdx).getChild(2).getText(); + Column rightColumn = null; + try { + Expression expression = sparkParser.parseExpression(right); + rightColumn = new Column(expression); + } catch (Exception e) { + throw new MalformedCarbonCommandException("Parse failed: " + e.getMessage()); + } + map.put(new Column(left), rightColumn); + } + } + return new UpdateAction(SparkUtil.convertMap(map), false); + } + + public MergeAction visitCarbonMatchedAction(CarbonSqlBaseParser.MatchedActionContext ctx) + throws MalformedCarbonCommandException { + int childCount = ctx.getChildCount(); + if (childCount == 1) { + // when matched ** delete + return new DeleteAction(); + } else { + if (ctx.getChild(ctx.getChildCount() - 1) instanceof + CarbonSqlBaseParser.AssignmentListContext) { + //UPDATE SET assignmentList + return visitCarbonAssignmentList( + (CarbonSqlBaseParser.AssignmentListContext) ctx.getChild(ctx.getChildCount() - 1)); + } else { + //UPDATE SET * + return new UpdateAction(null, true); + } + } + } + + public InsertAction visitCarbonNotMatchedAction(CarbonSqlBaseParser.NotMatchedActionContext ctx) { + if (ctx.getChildCount() <= 2) { + //INSERT * + return InsertAction.apply(null, true); + } else { + return InsertAction.apply(null, false); + } + } + + public MergeAction visitCarbonNotMatchedClause(CarbonSqlBaseParser.NotMatchedClauseContext ctx) { + int currIdx = 0; + for (; currIdx < ctx.getChildCount(); currIdx++) { + if (ctx.getChild(currIdx) instanceof CarbonSqlBaseParser.NotMatchedActionContext) { + break; + } + } + return visitCarbonNotMatchedAction( + (CarbonSqlBaseParser.NotMatchedActionContext) ctx.getChild(currIdx)); + } + + public MergeAction visitCarbonMatchedClause(CarbonSqlBaseParser.MatchedClauseContext ctx) + throws MalformedCarbonCommandException { + //There will be lots of childs at ctx, + // we need to find the predicate + int currIdx = 0; + for (; currIdx < ctx.getChildCount(); currIdx++) { + if (ctx.getChild(currIdx) instanceof CarbonSqlBaseParser.MatchedActionContext) { + break; + } + } + // Throw Exception in case of no Matched Action + return visitCarbonMatchedAction( + (CarbonSqlBaseParser.MatchedActionContext) ctx.getChild(currIdx)); + } + + public boolean containsWhenMatchedPredicateExpression(int childCount) { + return childCount > 4; + } + + public boolean containsWhenNotMatchedPredicateExpression(int childCount) { + return childCount > 5; + } + + public CarbonMergeIntoModel visitMergeIntoCarbonTable(CarbonSqlBaseParser.MergeIntoContext ctx) + throws MalformedCarbonCommandException { + // handle the exception msg from base parser + if (ctx.exception != null) { + throw new MalformedCarbonCommandException("Parse failed!"); + } + TableModel targetTable = visitMultipartIdentifier(ctx.target); + TableModel sourceTable = visitMultipartIdentifier(ctx.source); + + //Once get these two table, + //We can try to get CarbonTable + + //Build a matched clause list to store the when matched and when not matched clause + int size = ctx.getChildCount(); + int currIdx = 0; + Expression joinExpression = null; + List<Expression> mergeExpressions = new ArrayList<>(); + List<MergeAction> mergeActions = new ArrayList<>(); + + // There should be two List to store the result retrieve from + // when matched / when not matched context + while (currIdx < size) { + if (ctx.getChild(currIdx) instanceof CarbonSqlBaseParser.PredicatedContext) { + //This branch will visit the Join Expression + ctx.getChild(currIdx).getChildCount(); + joinExpression = this.visitCarbonPredicated( + (CarbonSqlBaseParser.PredicatedContext) ctx.getChild(currIdx)); + } else if (ctx.getChild(currIdx) instanceof CarbonSqlBaseParser.MatchedClauseContext) { + //This branch will deal with the Matched Clause + Expression whenMatchedExpression = null; + //Get the whenMatched expression + try { + if (this.containsWhenMatchedPredicateExpression(ctx.getChild(currIdx).getChildCount())) { + whenMatchedExpression = sparkParser.parseExpression( + ((CarbonSqlBaseParser.MatchedClauseContext) ctx.getChild(currIdx)) + .booleanExpression().getText()); + } + } catch (ParseException e) { + throw new MalformedCarbonCommandException("Parse failed: " + e.getMessage()); + } + mergeExpressions.add(whenMatchedExpression); + mergeActions.add(visitCarbonMatchedAction( + (CarbonSqlBaseParser.MatchedActionContext) ctx.getChild(currIdx) + .getChild(ctx.getChild(currIdx).getChildCount() - 1))); + } else if (ctx.getChild(currIdx) instanceof CarbonSqlBaseParser.NotMatchedClauseContext) { + //This branch will deal with the Matched Clause + Expression whenNotMatchedExpression = null; + //Get the whenMatched expression + try { + if (this + .containsWhenNotMatchedPredicateExpression(ctx.getChild(currIdx).getChildCount())) { + whenNotMatchedExpression = sparkParser.parseExpression( + ((CarbonSqlBaseParser.NotMatchedClauseContext) ctx.getChild(currIdx)) + .booleanExpression().getText()); + } + } catch (ParseException e) { + throw new MalformedCarbonCommandException("Parse failed: " + e.getMessage()); + } + mergeExpressions.add(whenNotMatchedExpression); + CarbonSqlBaseParser.NotMatchedActionContext notMatchedActionContext = + (CarbonSqlBaseParser.NotMatchedActionContext) ctx.getChild(currIdx) + .getChild(ctx.getChild(currIdx).getChildCount() - 1); + if (notMatchedActionContext.getChildCount() <= 2) { + mergeActions.add(InsertAction.apply(null, true)); + } else if (notMatchedActionContext.ASTERISK() == null) { + if (notMatchedActionContext.columns.multipartIdentifier().size() != + notMatchedActionContext.expression().size()) { + throw new MalformedCarbonCommandException("Parse failed: size of columns " + + "is not equal to size of expression in not matched action."); + } + Map<Column, Column> insertMap = new HashMap<>(); + for (int i = 0; i < notMatchedActionContext.columns.multipartIdentifier().size(); i++) { + String left = visitMultipartIdentifier( + notMatchedActionContext.columns.multipartIdentifier().get(i), "") + .getColName(); + String right = notMatchedActionContext.expression().get(i).getText(); + // some times the right side is literal or expression, not table column + // so we need to check the left side is a column or expression + Column rightColumn = null; + try { + Expression expression = sparkParser.parseExpression(right); + rightColumn = new Column(expression); + } catch (Exception ex) { + throw new MalformedCarbonCommandException("Parse failed: " + ex.getMessage()); + } + insertMap.put(new Column(left), rightColumn); + } + mergeActions.add(InsertAction.apply(SparkUtil.convertMap(insertMap), false)); + } else { + mergeActions.add(InsertAction.apply(null, false)); + } + } + currIdx++; + } + return new CarbonMergeIntoModel(targetTable, sourceTable, joinExpression, + mergeExpressions, mergeActions); + } + + public CarbonJoinExpression visitComparison(CarbonSqlBaseParser.ComparisonContext ctx) { + // we need to get left Expression and Right Expression + // Even get the table name and col name + ctx.getText(); + String t1Name = ctx.left.getChild(0).getChild(0).getText(); + String c1Name = ctx.left.getChild(0).getChild(2).getText(); + String t2Name = ctx.right.getChild(0).getChild(0).getText(); + String c2Name = ctx.right.getChild(0).getChild(2).getText(); + return new CarbonJoinExpression(t1Name, c1Name, t2Name, c2Name); + } + + public Expression visitComparison(CarbonSqlBaseParser.ComparisonContext ctx, String x) { + Expression expression = null; + try { + expression = sparkParser.parseExpression(ctx.getText()); + } catch (ParseException e) { + e.printStackTrace(); + } + return expression; + } + + public CarbonJoinExpression visitPredicated(CarbonSqlBaseParser.PredicatedContext ctx) { + return visitComparison((CarbonSqlBaseParser.ComparisonContext) ctx.getChild(0)); + } + + public Expression visitCarbonPredicated(CarbonSqlBaseParser.PredicatedContext ctx) { + return visitComparison((CarbonSqlBaseParser.ComparisonContext) ctx.getChild(0), ""); + } + + public ColumnModel visitDereference(CarbonSqlBaseParser.DereferenceContext ctx) { + // In this part, it will return two colunm name + int count = ctx.getChildCount(); + ColumnModel col = new ColumnModel(); + if (count == 3) { + String tableName = ctx.getChild(0).getText(); + String colName = ctx.getChild(2).getText(); + col = new ColumnModel(tableName, colName); + } + return col; + } + + public TableModel visitMultipartIdentifier(CarbonSqlBaseParser.MultipartIdentifierContext ctx) { + TableModel table = new TableModel(); + List<CarbonSqlBaseParser.ErrorCapturingIdentifierContext> parts = ctx.parts; + if (parts.size() == 2) { + table.setDatabase(parts.get(0).getText()); + table.setTable(parts.get(1).getText()); + } + if (parts.size() == 1) { + table.setTable(parts.get(0).getText()); + } + return table; + } + + public ColumnModel visitMultipartIdentifier(CarbonSqlBaseParser.MultipartIdentifierContext ctx, + String x) { + ColumnModel column = new ColumnModel(); + List<CarbonSqlBaseParser.ErrorCapturingIdentifierContext> parts = ctx.parts; + if (parts.size() == 2) { + column.setTable(parts.get(0).getText()); + column.setColName(parts.get(1).getText()); + } + if (parts.size() == 1) { + column.setColName(parts.get(0).getText()); + } + return column; + } + + public String visitUnquotedIdentifier(CarbonSqlBaseParser.UnquotedIdentifierContext ctx) { + String res = ctx.getChild(0).getText(); + return res; + } + + public String visitComparisonOperator(CarbonSqlBaseParser.ComparisonOperatorContext ctx) { + String res = ctx.getChild(0).getText(); + return res; + } + + public String visitTableIdentifier(CarbonSqlBaseParser.TableIdentifierContext ctx) { + return ctx.getChild(0).getText(); + } +} diff --git a/integration/spark/src/main/java/org/apache/spark/sql/CarbonMergeIntoSQLCommand.scala b/integration/spark/src/main/java/org/apache/spark/sql/CarbonMergeIntoSQLCommand.scala new file mode 100644 index 0000000..67611fd --- /dev/null +++ b/integration/spark/src/main/java/org/apache/spark/sql/CarbonMergeIntoSQLCommand.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.command.AtomicRunnableCommand +import org.apache.spark.sql.execution.command.mutation.merge._ +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.merge.model.{CarbonMergeIntoModel, TableModel} +import org.apache.spark.util.SparkUtil._ +import org.apache.spark.util.TableAPIUtil + +case class CarbonMergeIntoSQLCommand(mergeInto: CarbonMergeIntoModel) + extends AtomicRunnableCommand { + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + Seq.empty + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val sourceTable: TableModel = mergeInto.getSource + val targetTable: TableModel = mergeInto.getTarget + val mergeCondition: Expression = mergeInto.getMergeCondition + val mergeExpression: Seq[Expression] = convertExpressionList(mergeInto.getMergeExpressions) + val mergeActions: Seq[MergeAction] = convertMergeActionList(mergeInto.getMergeActions) + + // validate the table + val sourceDatabaseName = + CarbonEnv.getDatabaseName(Option(sourceTable.getDatabase))(sparkSession) + val sourceTableName = sourceTable.getTable + val targetDatabaseName = + CarbonEnv.getDatabaseName(Option(targetTable.getDatabase))(sparkSession) + val targetTableName = targetTable.getTable + TableAPIUtil.validateTableExists(sparkSession, + sourceDatabaseName, + sourceTableName) + TableAPIUtil.validateTableExists(sparkSession, + targetDatabaseName, + targetTableName) + + val srcDf = sparkSession.sql(s"""SELECT * FROM ${sourceDatabaseName}.${sourceTableName}""") + val tgDf = sparkSession.sql(s"""SELECT * FROM ${targetDatabaseName}.${targetTableName}""") + + var matches = scala.collection.mutable.ArrayBuffer[MergeMatch]() + val mergeExpLength: Int = mergeExpression.length + + // This for loop will gather the match condition and match action to build the MergeMatch + for (x <- 0 until mergeExpLength) { + val currExpression: Expression = mergeExpression.apply(x) + val currAction: MergeAction = mergeActions.apply(x) + // Use pattern matching to convert current actions to Map + // Since the delete action will delete the whole line, we don't need to build map here + currAction match { + case action: UpdateAction => + if (action.isStar) { + val srcCols = srcDf.columns + val tgCols = tgDf.columns + action.updateMap = Map[Column, Column]() + for (i <- srcCols.indices) { + action.updateMap + .+=(col(tgCols.apply(i)) -> + col(mergeInto.getSource.getTable + "." + srcCols.apply(i))) + } + } + case action: InsertAction => + if (action.isStar) { + val srcCols = srcDf.columns + val tgCols = tgDf.columns + action.insertMap = Map[Column, Column]() + for (i <- srcCols.indices) { + action.insertMap + .+=(col(tgCols.apply(i)) -> + col(mergeInto.getSource.getTable + "." + srcCols.apply(i))) + } + } + case _ => + } + if (currExpression == null) { + // According to the merge actions to re-generate matches + if (currAction.isInstanceOf[DeleteAction] || currAction.isInstanceOf[UpdateAction]) { + matches ++= Seq(WhenMatched().addAction(currAction)) + } else { + matches ++= Seq(WhenNotMatched().addAction(currAction)) + } + } else { + // Since the mergeExpression is not null, we need to Initialize the + // WhenMatched/WhenNotMatched with the Expression + val carbonMergeExpression: Option[Column] = Option(Column(currExpression)) + if (currAction.isInstanceOf[DeleteAction] || currAction.isInstanceOf[UpdateAction]) { + matches ++= Seq(WhenMatched(carbonMergeExpression).addAction(currAction)) + } else { + matches ++= Seq(WhenNotMatched(carbonMergeExpression).addAction(currAction)) + } + } + } + val joinExpression = Column(mergeCondition) + val mergeDataSetMatches: MergeDataSetMatches = MergeDataSetMatches(joinExpression, + matches.toList) + + CarbonMergeDataSetCommand(tgDf, srcDf, mergeDataSetMatches).run(sparkSession) + } + + override protected def opName: String = "MERGE SQL COMMAND" +} diff --git a/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonJoinExpression.java b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonJoinExpression.java new file mode 100644 index 0000000..e3e913a --- /dev/null +++ b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonJoinExpression.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.merge.model; + +public class CarbonJoinExpression { + + String srcTable; + String srcCol; + String tgTable; + String tgCol; + + public CarbonJoinExpression(String srcTable, String srcCol, String tgTable, String tgCol) { + this.srcTable = srcTable; + this.srcCol = srcCol; + this.tgTable = tgTable; + this.tgCol = tgCol; + } + + public String getSrcTable() { + return srcTable; + } + + public void setSrcTable(String srcTable) { + this.srcTable = srcTable; + } + + public String getSrcCol() { + return srcCol; + } + + public void setSrcCol(String srcCol) { + this.srcCol = srcCol; + } + + public String getTgTable() { + return tgTable; + } + + public void setTgTable(String tgTable) { + this.tgTable = tgTable; + } + + public String getTgCol() { + return tgCol; + } + + public void setTgCol(String tgCol) { + this.tgCol = tgCol; + } +} + diff --git a/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonMergeIntoModel.java b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonMergeIntoModel.java new file mode 100644 index 0000000..bf88079 --- /dev/null +++ b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/CarbonMergeIntoModel.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.merge.model; + +import java.util.List; + +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.execution.command.mutation.merge.MergeAction; + +public class CarbonMergeIntoModel { + TableModel target; + TableModel source; + Expression mergeCondition; + List<Expression> mergeExpressions; + List<MergeAction> mergeActions; + + public CarbonMergeIntoModel(TableModel target, TableModel source, Expression mergeCondition, + List<Expression> mergeExpressions, List<MergeAction> mergeActions) { + this.target = target; + this.source = source; + this.mergeCondition = mergeCondition; + this.mergeExpressions = mergeExpressions; + this.mergeActions = mergeActions; + } + + public TableModel getTarget() { + return target; + } + + public void setTarget(TableModel target) { + this.target = target; + } + + public TableModel getSource() { + return source; + } + + public void setSource(TableModel source) { + this.source = source; + } + + public Expression getMergeCondition() { + return mergeCondition; + } + + public void setMergeCondition(Expression mergeCondition) { + this.mergeCondition = mergeCondition; + } + + public List<Expression> getMergeExpressions() { + return mergeExpressions; + } + + public void setMergeExpressions(List<Expression> mergeExpressions) { + this.mergeExpressions = mergeExpressions; + } + + public List<MergeAction> getMergeActions() { + return mergeActions; + } + + public void setMergeActions(List<MergeAction> mergeActions) { + this.mergeActions = mergeActions; + } +} diff --git a/integration/spark/src/main/java/org/apache/spark/sql/merge/model/ColumnModel.java b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/ColumnModel.java new file mode 100644 index 0000000..256bd5d --- /dev/null +++ b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/ColumnModel.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.merge.model; + +public class ColumnModel { + + private String table; + private String colName; + + public ColumnModel() { + + } + + public ColumnModel(String table, String colName) { + this.table = table; + this.colName = colName; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public String getColName() { + return colName; + } + + public void setColName(String colName) { + this.colName = colName; + } +} diff --git a/integration/spark/src/main/java/org/apache/spark/sql/merge/model/TableModel.java b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/TableModel.java new file mode 100644 index 0000000..1fd8905 --- /dev/null +++ b/integration/spark/src/main/java/org/apache/spark/sql/merge/model/TableModel.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.merge.model; + +public class TableModel { + + String database; + String table; + String alias; + + public TableModel() { + + } + + public TableModel(String database, String table, String alias) { + this.database = database; + this.table = table; + this.alias = alias; + } + + public String getAlias() { + return alias; + } + + public void setAlias(String alias) { + this.alias = alias; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala index 04ed2c7..b995a75 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.sql.execution.command.mutation.merge import java.sql.{Date, Timestamp} @@ -28,7 +29,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils */ case class MergeProjection( @transient tableCols: Seq[String], - @transient statusCol : String, + @transient statusCol: String, @transient ds: Dataset[Row], @transient rltn: CarbonDatasourceHadoopRelation, @transient sparkSession: SparkSession, @@ -58,8 +59,8 @@ case class MergeProjection( private def generateProjection: (Projection, Array[Expression]) = { val existingDsOutput = rltn.carbonRelation.schema.toAttributes val colsMap = mergeAction match { - case UpdateAction(updateMap) => updateMap - case InsertAction(insertMap) => insertMap + case UpdateAction(updateMap, isStar: Boolean) => updateMap + case InsertAction(insertMap, isStar: Boolean) => insertMap case _ => null } if (colsMap != null) { @@ -81,9 +82,9 @@ case class MergeProjection( if (output.contains(null)) { throw new CarbonMergeDataSetException(s"Not all columns are mapped") } - (new InterpretedMutableProjection(output++Seq( + (new InterpretedMutableProjection(output ++ Seq( ds.queryExecution.analyzed.resolveQuoted(statusCol, - sparkSession.sessionState.analyzer.resolver).get), + sparkSession.sessionState.analyzer.resolver).get), ds.queryExecution.analyzed.output), expectOutput) } else { (null, null) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala index cba30ef..55d3b4a 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.sql.execution.command.mutation.merge import scala.collection.mutable.ArrayBuffer @@ -71,9 +72,11 @@ case class WhenNotMatchedAndExistsOnlyOnTarget(expression: Option[Column] = None override def getExp: Option[Column] = expression } -case class UpdateAction(updateMap: Map[Column, Column]) extends MergeAction +case class UpdateAction(var updateMap: Map[Column, Column], isStar: Boolean = false) + extends MergeAction -case class InsertAction(insertMap: Map[Column, Column]) extends MergeAction +case class InsertAction(var insertMap: Map[Column, Column], isStar: Boolean = false) + extends MergeAction /** * It inserts the history data into history table diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonAntlrParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonAntlrParser.scala new file mode 100644 index 0000000..4c3dd5c --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonAntlrParser.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parser + +import org.antlr.v4.runtime.{CharStreams, CommonTokenStream} +import org.apache.spark.sql.{CarbonAntlrSqlVisitor, CarbonMergeIntoSQLCommand} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.execution.command.AtomicRunnableCommand + +class CarbonAntlrParser(sparkParser: ParserInterface) { + + def parse(sqlText: String): AtomicRunnableCommand = { + + if (!sqlText.trim.toLowerCase.startsWith("merge")) { + throw new UnsupportedOperationException( + "Antlr SQL Parser will only deal with Merge Into SQL Command") + } + val visitor = new CarbonAntlrSqlVisitor(sparkParser) + val lexer = new CarbonSqlBaseLexer(CharStreams.fromString(sqlText)) + val tokenStream = new CommonTokenStream(lexer) + val parser = new CarbonSqlBaseParser(tokenStream) + val mergeInto = visitor.visitMergeIntoCarbonTable(parser.mergeInto) + // In this place check the mergeInto Map for update * + CarbonMergeIntoSQLCommand(mergeInto) + } +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala index a072b26..a57ca36 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala @@ -37,6 +37,7 @@ class CarbonExtensionSqlParser( ) extends SparkSqlParser(conf) { val parser = new CarbonExtensionSpark2SqlParser + val antlrParser = new CarbonAntlrParser(this) override def parsePlan(sqlText: String): LogicalPlan = { parser.synchronized { @@ -44,28 +45,36 @@ class CarbonExtensionSqlParser( } CarbonThreadUtil.updateSessionInfoToCurrentThread(sparkSession) try { - val plan = parser.parse(sqlText) - plan + parser.parse(sqlText) } catch { case ce: MalformedCarbonCommandException => throw ce - case ex: Throwable => + case ct: Throwable => try { - val parsedPlan = initialParser.parsePlan(sqlText) - CarbonScalaUtil.cleanParserThreadLocals - parsedPlan + antlrParser.parse(sqlText) } catch { - case mce: MalformedCarbonCommandException => - throw mce - case e: Throwable => - e.printStackTrace(System.err) - CarbonScalaUtil.cleanParserThreadLocals - CarbonException.analysisException( - s"""== Parser1: ${parser.getClass.getName} == - |${ex.getMessage} - |== Parser2: ${initialParser.getClass.getName} == - |${e.getMessage} + case ce: MalformedCarbonCommandException => + throw ce + case at: Throwable => + try { + val parsedPlan = initialParser.parsePlan(sqlText) + CarbonScalaUtil.cleanParserThreadLocals + parsedPlan + } catch { + case mce: MalformedCarbonCommandException => + throw mce + case st: Throwable => + st.printStackTrace(System.err) + CarbonScalaUtil.cleanParserThreadLocals + CarbonException.analysisException( + s"""== Spark Parser: ${initialParser.getClass.getName} == + |${st.getMessage} + |== Carbon Parser: ${ parser.getClass.getName } == + |${ct.getMessage} + |== Antlr Parser: ${antlrParser.getClass.getName} == + |${at.getMessage} """.stripMargin.trim) + } } } } diff --git a/integration/spark/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/SparkUtil.scala index e92cf5e..7c7aac5 100644 --- a/integration/spark/src/main/scala/org/apache/spark/util/SparkUtil.scala +++ b/integration/spark/src/main/scala/org/apache/spark/util/SparkUtil.scala @@ -17,16 +17,32 @@ package org.apache.spark.util +import scala.collection.JavaConverters.{mapAsScalaMapConverter, _} + import org.apache.spark.{SPARK_VERSION, TaskContext} -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY -import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType} +import org.apache.spark.sql.execution.command.mutation.merge.MergeAction +import org.apache.spark.sql.types._ /* * this object use to handle file splits */ object SparkUtil { + def convertMap(map: java.util.Map[Column, Column]): Map[Column, Column] = { + map.asScala.toMap + } + + def convertExpressionList(list: java.util.List[Expression]): List[Expression] = { + list.asScala.toList + } + + def convertMergeActionList(list: java.util.List[MergeAction]): List[MergeAction] = { + list.asScala.toList + } + def setTaskContext(context: TaskContext): Unit = { val localThreadContext = TaskContext.get() if (localThreadContext == null) { @@ -68,7 +84,7 @@ object SparkUtil { } } - def isPrimitiveType(datatype : DataType): Boolean = { + def isPrimitiveType(datatype: DataType): Boolean = { datatype match { case StringType => true case ByteType => true diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/MergeIntoCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/MergeIntoCarbonTableTestCase.scala new file mode 100644 index 0000000..69a1909 --- /dev/null +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/MergeIntoCarbonTableTestCase.scala @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.iud + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException + +class MergeIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterEach { + var df: DataFrame = _ + + override def beforeEach { + dropTable() + buildTestData() + } + + private def buildTestData(): Unit = { + createTable() + } + + private def createTable(): Unit = { + sql( + s""" + | CREATE TABLE IF NOT EXISTS A( + | id Int, + | price Int, + | state String + | ) + | STORED AS carbondata + """.stripMargin) + + sql( + s""" + | CREATE TABLE IF NOT EXISTS B( + | id Int, + | price Int, + | state String + | ) + | STORED AS carbondata + """.stripMargin) + + sql(s"""INSERT INTO A VALUES (1,100,"MA")""") + sql(s"""INSERT INTO A VALUES (2,200,"NY")""") + sql(s"""INSERT INTO A VALUES (3,300,"NH")""") + sql(s"""INSERT INTO A VALUES (4,400,"FL")""") + + sql(s"""INSERT INTO B VALUES (1,1,"MA (updated)")""") + sql(s"""INSERT INTO B VALUES (2,3,"NY (updated)")""") + sql(s"""INSERT INTO B VALUES (3,3,"CA (updated)")""") + sql(s"""INSERT INTO B VALUES (5,5,"TX (updated)")""") + sql(s"""INSERT INTO B VALUES (7,7,"LO (updated)")""") + } + + private def dropTable() = { + sql("DROP TABLE IF EXISTS A") + sql("DROP TABLE IF EXISTS B") + } + + test("test merge into delete") { + sql( + """MERGE INTO A + |USING B + |ON A.ID=B.ID + |WHEN MATCHED THEN DELETE""".stripMargin) + + checkAnswer(sql("select * from A"), Seq(Row(4, 400, "FL"))) + } + + test("test merge into delete and update") { + sql( + """MERGE INTO A + |USING B + |ON A.ID=B.ID + |WHEN MATCHED AND A.ID=2 THEN DELETE + |WHEN MATCHED AND A.ID=1 THEN UPDATE SET *""".stripMargin) + + checkAnswer(sql("select * from A"), + Seq(Row(1, 1, "MA (updated)"), + Row(3, 300, "NH"), + Row(4, 400, "FL"))) + } + + test("test merge into update with expression") { + sql( + """MERGE INTO A + |USING B + |ON A.ID=B.ID + |WHEN MATCHED AND A.ID=2 THEN DELETE + |WHEN MATCHED AND A.ID=1 THEN UPDATE SET A.id=9, A.price=100, A.state='hahaha' + |""".stripMargin) + + sql("select * from A").show(100, false) + + checkAnswer(sql("select * from A"), + Seq(Row(9, 100, "hahaha"), + Row(3, 300, "NH"), + Row(4, 400, "FL"))) + } + + test("test merge into delete and insert") { + sql( + """MERGE INTO A + |USING B + |ON A.ID=B.ID + |WHEN MATCHED AND A.ID=2 THEN DELETE + |WHEN NOT MATCHED THEN INSERT *""".stripMargin) + + checkAnswer(sql("select * from A"), + Seq(Row(1, 100, "MA"), + Row(3, 300, "NH"), + Row(4, 400, "FL"), + Row(5, 5, "TX (updated)"), + Row(7, 7, "LO (updated)"))) + } + + test("test merge into delete and update and insert") { + sql( + """MERGE INTO A + |USING B + |ON A.ID=B.ID + |WHEN MATCHED AND A.ID=2 THEN DELETE + |WHEN MATCHED AND A.ID=1 THEN UPDATE SET * + |WHEN NOT MATCHED THEN INSERT *""".stripMargin) + + checkAnswer(sql("select * from A"), + Seq(Row(1, 1, "MA (updated)"), + Row(3, 300, "NH"), + Row(4, 400, "FL"), + Row(5, 5, "TX (updated)"), + Row(7, 7, "LO (updated)"))) + } + + test("test merge into update and insert") { + sql( + """MERGE INTO A + |USING B + |ON A.ID=B.ID + |WHEN MATCHED AND A.ID=1 THEN UPDATE SET * + |WHEN NOT MATCHED THEN INSERT *""".stripMargin) + + checkAnswer(sql("select * from A"), + Seq(Row(1, 1, "MA (updated)"), + Row(2, 200, "NY"), + Row(3, 300, "NH"), + Row(4, 400, "FL"), + Row(5, 5, "TX (updated)"), + Row(7, 7, "LO (updated)"))) + } + + test("test merge into delete with condition") { + sql( + """MERGE INTO A + |USING B + |ON A.ID=B.ID + |WHEN MATCHED AND B.ID=2 THEN DELETE""".stripMargin) + + checkAnswer(sql("select * from A"), + Seq(Row(1, 100, "MA"), Row(3, 300, "NH"), Row(4, 400, "FL"))) + } + + test("test merge into update all cols") { + sql( + """MERGE INTO A USING B + |ON A.ID=B.ID + |WHEN MATCHED THEN UPDATE SET *""".stripMargin) + + checkAnswer(sql("select * from A"), + Seq(Row(1, 1, "MA (updated)"), + Row(2, 3, "NY (updated)"), + Row(3, 3, "CA (updated)"), + Row(4, 400, "FL"))) + } + + test("test merge into update all cols with condition") { + sql( + """MERGE INTO A USING B + |ON A.ID=B.ID + |WHEN MATCHED AND A.ID=2 THEN UPDATE SET *""".stripMargin) + + checkAnswer(sql("select * from A"), + Seq(Row(1, 100, "MA"), Row(2, 3, "NY (updated)"), Row(3, 300, "NH"), Row(4, 400, "FL"))) + } + + test("test merge into update all cols with multiple condition") { + sql( + """MERGE INTO A USING B + |ON A.ID=B.ID + |WHEN MATCHED AND A.ID=2 THEN UPDATE SET * + |WHEN MATCHED AND A.ID=3 THEN UPDATE SET *""".stripMargin) + + checkAnswer(sql("select * from A"), + Seq(Row(1, 100, "MA"), + Row(2, 3, "NY (updated)"), + Row(3, 3, "CA (updated)"), + Row(4, 400, "FL"))) + } + + test("test merge into insert all cols") { + sql( + """MERGE INTO A USING B + |ON A.ID=B.ID + |WHEN NOT MATCHED THEN INSERT *""".stripMargin) + + checkAnswer(sql("select * from A"), + Seq(Row(1, 100, "MA"), + Row(2, 200, "NY"), + Row(3, 300, "NH"), + Row(4, 400, "FL"), + Row(5, 5, "TX (updated)"), + Row(7, 7, "LO (updated)"))) + } + + test("test merge into insert all cols with condition") { + sql( + """MERGE INTO A USING B + |ON A.ID=B.ID + |WHEN NOT MATCHED AND B.ID=7 THEN INSERT *""".stripMargin) + checkAnswer(sql("select * from A"), + Seq(Row(1, 100, "MA"), + Row(2, 200, "NY"), + Row(3, 300, "NH"), + Row(4, 400, "FL"), + Row(7, 7, "LO (updated)"))) + } + + test("test merge into insert all cols with multiple condition") { + sql( + """MERGE INTO A USING B + |ON A.ID=B.ID + |WHEN NOT MATCHED AND B.ID=5 THEN INSERT * + |WHEN NOT MATCHED AND B.ID=7 THEN INSERT *""".stripMargin) + checkAnswer(sql("select * from A"), + Seq(Row(1, 100, "MA"), + Row(2, 200, "NY"), + Row(3, 300, "NH"), + Row(4, 400, "FL"), + Row(5, 5, "TX (updated)"), + Row(7, 7, "LO (updated)"))) + } + + test("test merge into insert with literal") { + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED " + + "AND B.ID=7 THEN INSERT (A" + + ".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE, 'test-string')" + sql(sqlText) + checkAnswer(sql("select * from A"), + Seq(Row(1, 100, "MA"), + Row(2, 200, "NY"), + Row(3, 300, "NH"), + Row(4, 400, "FL"), + Row(7, 7, "test-string"))) + } + + test("test merge into insert with expression") { + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED " + + "AND B.ID=7 THEN INSERT (A" + + ".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE + 10, B.state)" + sql(sqlText) + checkAnswer(sql("select * from A"), + Seq(Row(1, 100, "MA"), + Row(2, 200, "NY"), + Row(3, 300, "NH"), + Row(4, 400, "FL"), + Row(7, 17, "LO (updated)"))) + } + + test("test merge into not exist table exception") { + val exceptionCaught = intercept[MalformedCarbonCommandException] { + val sqlText = "MERGE INTO A USING C ON A.ID=C.ID WHEN NOT MATCHED " + + "AND C.ID=7 THEN INSERT (A" + + ".ID,A.PRICE, A.state) VALUES (C.ID,C.PRICE + 10, C.state)" + sql(sqlText) + } + assert(exceptionCaught.getMessage.contains("table default.C not found")) + } + + test("test merge into parse exception for size of action columns and expression") { + val exceptionCaught = intercept[MalformedCarbonCommandException] { + val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED " + + "AND B.ID=7 THEN INSERT (A" + + ".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE + 10)" + sql(sqlText) + } + assert(exceptionCaught.getMessage.contains("Parse failed: " + + "size of columns is not equal to size of expression in not matched action")) + } + + test("test merge into parse exception for tree node parsing") { + val exceptionCaught = intercept[MalformedCarbonCommandException] { + val sqlText = "MERGE INTO A USING ON A.ID=B.ID WHEN NOT MATCHED " + + "AND B.ID=7 THEN INSERT (A" + + ".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE + 10, B.state)" + sql(sqlText) + } + assert(exceptionCaught.getMessage.contains("Parse failed")) + } +} diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala index d12b2d4..793f8ff 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala @@ -40,7 +40,7 @@ import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath /** - * Test Class for join query with orderby and limit + * Test Class for carbon merge api */ class MergeTestCase extends QueryTest with BeforeAndAfterAll { diff --git a/pom.xml b/pom.xml index cd21b4d..c22836e 100644 --- a/pom.xml +++ b/pom.xml @@ -131,6 +131,7 @@ <hadoop.deps.scope>compile</hadoop.deps.scope> <spark.version>2.3.4</spark.version> <spark.binary.version>2.3</spark.binary.version> + <antlr4.version>4.8</antlr4.version> <spark.deps.scope>compile</spark.deps.scope> <scala.deps.scope>compile</scala.deps.scope> <dev.path>${basedir}/dev</dev.path> @@ -522,7 +523,11 @@ </execution> </executions> </plugin> - + <plugin> + <groupId>org.antlr</groupId> + <artifactId>antlr4-maven-plugin</artifactId> + <version>${antlr4.version}</version> + </plugin> </plugins> </build>