This is an automated email from the ASF dual-hosted git repository.

jiajunxie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/main by this push:
     new f3f5e7eeac [CALCITE-5836] Implement Rel2Sql for MERGE
f3f5e7eeac is described below

commit f3f5e7eeacea2216123f9e9c3094a0926096b923
Author: macroguo <macro...@tencent.com>
AuthorDate: Thu Aug 24 21:06:23 2023 +0800

    [CALCITE-5836] Implement Rel2Sql for MERGE
    
    Co-authored-by: Greg Hart <greg.h...@protegrity.com>
---
 .../java/org/apache/calcite/plan/RelOptUtil.java   |   1 +
 .../apache/calcite/prepare/CalcitePrepareImpl.java |   2 +
 .../calcite/rel/rel2sql/RelToSqlConverter.java     |  58 ++++++++-
 .../calcite/rel/rel2sql/RelToSqlConverterTest.java | 130 +++++++++++++++++++++
 .../org/apache/calcite/test/JdbcAdapterTest.java   |  41 +++++++
 5 files changed, 231 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java 
b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
index b9aa94d981..6ec10cf951 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
@@ -2138,6 +2138,7 @@ public abstract class RelOptUtil {
     case INSERT:
     case DELETE:
     case UPDATE:
+    case MERGE:
       return typeFactory.createStructType(
           PairList.of(AvaticaConnection.ROWCOUNT_COLUMN_NAME,
               typeFactory.createSqlType(SqlTypeName.BIGINT)));
diff --git 
a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java 
b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
index bb0447d1a9..d78e08cded 100644
--- a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
@@ -591,6 +591,7 @@ public class CalcitePrepareImpl implements CalcitePrepare {
     case INSERT:
     case DELETE:
     case UPDATE:
+    case MERGE:
       return Meta.StatementType.IS_DML;
     default:
       return Meta.StatementType.SELECT;
@@ -667,6 +668,7 @@ public class CalcitePrepareImpl implements CalcitePrepare {
       case INSERT:
       case DELETE:
       case UPDATE:
+      case MERGE:
       case EXPLAIN:
         // FIXME: getValidatedNodeType is wrong for DML
         x = RelOptUtil.createDmlRowType(sqlNode.getKind(), typeFactory);
diff --git 
a/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java 
b/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java
index 2462b89d55..5b59d2b9ee 100644
--- a/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java
+++ b/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java
@@ -72,6 +72,7 @@ import org.apache.calcite.sql.SqlJoin;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlMerge;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlSampleSpec;
@@ -1084,7 +1085,62 @@ public class RelToSqlConverter extends SqlImplementor
 
       return result(sqlDelete, input.clauses, modify, null);
     }
-    case MERGE:
+    case MERGE: {
+      final Result input = visitInput(modify, 0);
+      final SqlSelect select = input.asSelect();
+      // When querying with both the `WHEN MATCHED THEN UPDATE` and
+      // `WHEN NOT MATCHED THEN INSERT` clauses, the selectList consists of 
three parts:
+      // the insert expression, the target table reference, and the update 
expression.
+      // When querying with the `WHEN MATCHED THEN UPDATE` clause, the 
selectList will not
+      // include the update expression.
+      // However, when querying with the `WHEN NOT MATCHED THEN INSERT` clause,
+      // the expression list will only contain the insert expression.
+      final SqlNodeList selectList = 
SqlUtil.stripListAs(select.getSelectList());
+      final SqlJoin join =  requireNonNull((SqlJoin) select.getFrom());
+      final SqlNode condition = requireNonNull(join.getCondition());
+      final SqlNode source = join.getLeft();
+
+      SqlUpdate update = null;
+      final List<String> updateColumnList =
+          requireNonNull(modify.getUpdateColumnList(),
+              () -> "modify.getUpdateColumnList() is null for " + modify);
+      final int nUpdateFiled = updateColumnList.size();
+      if (nUpdateFiled != 0) {
+        final SqlNodeList expressionList =
+            Util.last(selectList, nUpdateFiled).stream()
+                .collect(SqlNodeList.toList());
+        update =
+            new SqlUpdate(POS, sqlTargetTable,
+                identifierList(updateColumnList),
+                expressionList,
+                condition, null, null);
+      }
+
+      final RelDataType targetRowType = modify.getTable().getRowType();
+      final int nTargetFiled = targetRowType.getFieldCount();
+      final int nInsertFiled = nUpdateFiled == 0
+          ? selectList.size() : selectList.size() - nTargetFiled - 
nUpdateFiled;
+      SqlInsert insert = null;
+      if (nInsertFiled != 0) {
+        final SqlNodeList expressionList =
+            Util.first(selectList, nInsertFiled).stream()
+                .collect(SqlNodeList.toList());
+        final SqlNode valuesCall =
+            SqlStdOperatorTable.VALUES.createCall(expressionList);
+        final SqlNodeList columnList = targetRowType.getFieldNames().stream()
+            .map(f -> new SqlIdentifier(f, POS))
+            .collect(SqlNodeList.toList());
+        insert = new SqlInsert(POS, SqlNodeList.EMPTY, sqlTargetTable, 
valuesCall, columnList);
+      }
+
+      final SqlNode target = join.getRight();
+      final SqlNode targetTableAlias = target.getKind() == SqlKind.AS
+          ? ((SqlCall) target).operand(1) : null;
+      final SqlMerge merge =
+          new SqlMerge(POS, sqlTargetTable, condition, source, update, insert, 
null,
+              (SqlIdentifier) targetTableAlias);
+      return result(merge, input.clauses, modify, null);
+    }
     default:
       throw new AssertionError("not implemented: " + modify);
     }
diff --git 
a/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java 
b/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java
index 50e9808a84..f78751841e 100644
--- 
a/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java
+++ 
b/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java
@@ -7043,6 +7043,136 @@ class RelToSqlConverterTest {
         .ok(expected);
   }
 
+  @Test void testMerge() {
+    final String sql1 = "merge into \"DEPT\" as \"t\"\n"
+        + "using \"DEPT\" as \"s\"\n"
+        + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n"
+        + "when matched then\n"
+        + "update set \"DNAME\" = \"s\".\"DNAME\"\n"
+        + "when not matched then\n"
+        + "insert (DEPTNO, DNAME, LOC)\n"
+        + "values (\"s\".\"DEPTNO\" + 1, lower(\"s\".\"DNAME\"), 
upper(\"s\".\"LOC\"))";
+    final String expected1 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"DEPT0\"\n"
+        + "USING \"SCOTT\".\"DEPT\"\n"
+        + "ON \"DEPT\".\"DEPTNO\" = \"DEPT0\".\"DEPTNO\"\n"
+        + "WHEN MATCHED THEN UPDATE SET \"DNAME\" = \"DEPT\".\"DNAME\"\n"
+        + "WHEN NOT MATCHED THEN INSERT (\"DEPTNO\", \"DNAME\", \"LOC\") "
+        + "VALUES CAST(\"DEPT\".\"DEPTNO\" + 1 AS TINYINT),\n"
+        + "LOWER(\"DEPT\".\"DNAME\"),\n"
+        + "UPPER(\"DEPT\".\"LOC\")";
+    sql(sql1)
+        .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT)
+        .ok(expected1);
+
+    // without insert columns
+    final String sql2 = "merge into \"DEPT\" as \"t\"\n"
+        + "using \"DEPT\" as \"s\"\n"
+        + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n"
+        + "when matched then\n"
+        + "update set \"DNAME\" = \"s\".\"DNAME\"\n"
+        + "when not matched then insert\n"
+        + "values (\"s\".\"DEPTNO\" + 1, lower(\"s\".\"DNAME\"), 
upper(\"s\".\"LOC\"))";
+    final String expected2 = expected1;
+    sql(sql2)
+        .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT)
+        .ok(expected2);
+
+    // reorder insert columns
+    final String sql3 = "merge into \"DEPT\" as \"t\"\n"
+        + "using \"DEPT\" as \"s\"\n"
+        + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n"
+        + "when matched then\n"
+        + "update set \"DNAME\" = \"s\".\"DNAME\"\n"
+        + "when not matched then\n"
+        + "insert (DEPTNO, LOC, DNAME)\n"
+        + "values (\"s\".\"DEPTNO\" + 1, lower(\"s\".\"DNAME\"), 'abc')";
+    final String expected3 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"DEPT0\"\n"
+        + "USING \"SCOTT\".\"DEPT\"\n"
+        + "ON \"DEPT\".\"DEPTNO\" = \"DEPT0\".\"DEPTNO\"\n"
+        + "WHEN MATCHED THEN UPDATE SET \"DNAME\" = \"DEPT\".\"DNAME\"\n"
+        + "WHEN NOT MATCHED THEN INSERT (\"DEPTNO\", \"DNAME\", \"LOC\") "
+        + "VALUES CAST(\"DEPT\".\"DEPTNO\" + 1 AS TINYINT),\n"
+        + "'abc',\n"
+        + "LOWER(\"DEPT\".\"DNAME\")";
+    sql(sql3)
+        .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT)
+        .ok(expected3);
+
+    // without WHEN NOT MATCHED THEN
+    final String sql4 = "merge into \"DEPT\" as \"t\"\n"
+        + "using \"DEPT\" as \"s\"\n"
+        + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n"
+        + "when matched then\n"
+        + "update set \"DNAME\" = \"s\".\"DNAME\"";
+    final String expected4 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"DEPT0\"\n"
+        + "USING \"SCOTT\".\"DEPT\"\n"
+        + "ON \"DEPT\".\"DEPTNO\" = \"DEPT0\".\"DEPTNO\"\n"
+        + "WHEN MATCHED THEN UPDATE SET \"DNAME\" = \"DEPT\".\"DNAME\"";
+    sql(sql4)
+        .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT)
+        .ok(expected4);
+
+    // without WHEN MATCHED THEN
+    final String sql5 = "merge into \"DEPT\" as \"t\"\n"
+        + "using \"DEPT\" as \"s\"\n"
+        + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n"
+        + "when not matched then\n"
+        + "insert (DEPTNO, DNAME, LOC)\n"
+        + "values (\"s\".\"DEPTNO\" + 1, lower(\"s\".\"DNAME\"), 
upper(\"s\".\"LOC\"))";
+    final String expected5 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"DEPT0\"\n"
+        + "USING \"SCOTT\".\"DEPT\"\n"
+        + "ON \"DEPT\".\"DEPTNO\" = \"DEPT0\".\"DEPTNO\"\n"
+        + "WHEN NOT MATCHED THEN INSERT (\"DEPTNO\", \"DNAME\", \"LOC\") "
+        + "VALUES CAST(\"DEPT\".\"DEPTNO\" + 1 AS TINYINT),\n"
+        + "LOWER(\"DEPT\".\"DNAME\"),\n"
+        + "UPPER(\"DEPT\".\"LOC\")";
+    sql(sql5)
+        .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT)
+        .ok(expected5);
+
+    // using query
+    final String sql6 = "merge into \"DEPT\" as \"t\"\n"
+        + "using (select * from \"DEPT\" where \"DEPTNO\" <> 5) as \"s\"\n"
+        + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n"
+        + "when not matched then\n"
+        + "insert (DEPTNO, DNAME, LOC)\n"
+        + "values (\"s\".\"DEPTNO\" + 1, lower(\"s\".\"DNAME\"), 
upper(\"s\".\"LOC\"))";
+    final String expected6 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"DEPT0\"\n"
+        + "USING (SELECT *\n"
+        + "FROM \"SCOTT\".\"DEPT\"\n"
+        + "WHERE CAST(\"DEPTNO\" AS INTEGER) <> 5) AS \"t0\"\n"
+        + "ON \"t0\".\"DEPTNO\" = \"DEPT0\".\"DEPTNO\"\n"
+        + "WHEN NOT MATCHED THEN INSERT (\"DEPTNO\", \"DNAME\", \"LOC\") "
+        + "VALUES CAST(\"t0\".\"DEPTNO\" + 1 AS TINYINT),\n"
+        + "LOWER(\"t0\".\"DNAME\"),\n"
+        + "UPPER(\"t0\".\"LOC\")";
+    sql(sql6)
+        .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT)
+        .ok(expected6);
+
+    final String sql7 = "merge into \"DEPT\" as \"t\"\n"
+        + "using (select * from (values (1, 'name', 'loc'))) as \"s\"(\"a\", 
\"b\", \"c\")\n"
+        + "on \"t\".\"DEPTNO\" = \"s\".\"a\"\n"
+        + "when matched then\n"
+        + "update set \"DNAME\" = 'abc'"
+        + "when not matched then\n"
+        + "insert (DEPTNO, DNAME, LOC)\n"
+        + "values (\"s\".\"a\" + 1, lower(\"s\".\"b\"), upper(\"s\".\"c\"))";
+    final String expected7 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"t1\"\n"
+        + "USING (SELECT *\n"
+        + "FROM (VALUES (1, 'name', 'loc')) "
+        + "AS \"t\" (\"EXPR$0\", \"EXPR$1\", \"EXPR$2\")) AS \"t0\"\n"
+        + "ON \"t0\".\"EXPR$0\" = \"t1\".\"DEPTNO0\"\n"
+        + "WHEN MATCHED THEN UPDATE SET \"DNAME\" = 'abc'\n"
+        + "WHEN NOT MATCHED THEN INSERT (\"DEPTNO\", \"DNAME\", \"LOC\") "
+        + "VALUES CAST(\"t0\".\"EXPR$0\" + 1 AS TINYINT),\n"
+        + "LOWER(\"t0\".\"EXPR$1\"),\n"
+        + "UPPER(\"t0\".\"EXPR$2\")";
+    sql(sql7)
+        .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT)
+        .ok(expected7);
+  }
+
   /** Test case for
    * <a 
href="https://issues.apache.org/jira/browse/CALCITE-5265";>[CALCITE-5265]
    * JDBC adapter sometimes adds unnecessary parentheses around SELECT in 
INSERT</a>. */
diff --git a/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java 
b/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java
index 564fe6dab9..ea57e5d945 100644
--- a/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java
@@ -1106,6 +1106,47 @@ class JdbcAdapterTest {
         .returns("C=null\nC=null\nC=null\nC=null\nC=null\nC=null\nC=null\n");
   }
 
+  @Test void testMerge() throws Exception {
+    final String sql = "merge into \"foodmart\".\"expense_fact\"\n"
+        + "using (values(666, 42)) as vals(store_id, amount)\n"
+        + "on \"expense_fact\".\"store_id\" = vals.store_id\n"
+        + "when matched then update\n"
+        + "set \"amount\" = vals.amount\n"
+        + "when not matched then insert\n"
+        + "values (vals.store_id, 666, TIMESTAMP '1997-01-01 00:00:00', 666, 
'666', 666,"
+        + " vals.amount)";
+    final String explain = "PLAN=JdbcToEnumerableConverter\n"
+        + "  JdbcTableModify(table=[[foodmart, expense_fact]], 
operation=[MERGE],"
+        + " updateColumnList=[[amount]], flattened=[false])\n"
+        + "    JdbcProject(STORE_ID=[$0], $f1=[666], $f2=[1997-01-01 
00:00:00], $f3=[666],"
+        + " $f4=['666'], $f5=[666], AMOUNT=[CAST($1):DECIMAL(10, 4) NOT NULL], 
store_id=[$2],"
+        + " account_id=[$3], exp_date=[$4], time_id=[$5], category_id=[$6], 
currency_id=[$7],"
+        + " amount=[$8], AMOUNT0=[$1])\n"
+        + "      JdbcJoin(condition=[=($2, $0)], joinType=[left])\n"
+        + "        JdbcValues(tuples=[[{ 666, 42 }]])\n"
+        + "        JdbcTableScan(table=[[foodmart, expense_fact]])\n";
+    final String jdbcSql = "MERGE INTO \"foodmart\".\"expense_fact\"\n"
+        + "USING (VALUES (666, 42)) AS \"t\" (\"STORE_ID\", \"AMOUNT\")\n"
+        + "ON \"t\".\"STORE_ID\" = \"expense_fact\".\"store_id\"\n"
+        + "WHEN MATCHED THEN UPDATE SET \"amount\" = \"t\".\"AMOUNT\"\n"
+        + "WHEN NOT MATCHED THEN INSERT (\"store_id\", \"account_id\", 
\"exp_date\", \"time_id\", "
+        + "\"category_id\", \"currency_id\", \"amount\") VALUES 
\"t\".\"STORE_ID\",\n"
+        + "666,\nTIMESTAMP '1997-01-01 00:00:00',\n666,\n'666',\n666,\n"
+        + "CAST(\"t\".\"AMOUNT\" AS DECIMAL(10, 4))";
+    final AssertThat that =
+        CalciteAssert.model(FoodmartSchema.FOODMART_MODEL)
+            .enable(CalciteAssert.DB == DatabaseInstance.HSQLDB);
+    that.doWithConnection(connection -> {
+      try (LockWrapper ignore = exclusiveCleanDb(connection)) {
+        that.query(sql)
+            .explainContains(explain)
+            .planUpdateHasSql(jdbcSql, 1);
+      } catch (SQLException e) {
+        throw TestUtil.rethrow(e);
+      }
+    });
+  }
+
   /** Acquires a lock, and releases it when closed. */
   static class LockWrapper implements AutoCloseable {
     private final Lock lock;

Reply via email to