This is an automated email from the ASF dual-hosted git repository. jiajunxie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/main by this push: new f3f5e7eeac [CALCITE-5836] Implement Rel2Sql for MERGE f3f5e7eeac is described below commit f3f5e7eeacea2216123f9e9c3094a0926096b923 Author: macroguo <macro...@tencent.com> AuthorDate: Thu Aug 24 21:06:23 2023 +0800 [CALCITE-5836] Implement Rel2Sql for MERGE Co-authored-by: Greg Hart <greg.h...@protegrity.com> --- .../java/org/apache/calcite/plan/RelOptUtil.java | 1 + .../apache/calcite/prepare/CalcitePrepareImpl.java | 2 + .../calcite/rel/rel2sql/RelToSqlConverter.java | 58 ++++++++- .../calcite/rel/rel2sql/RelToSqlConverterTest.java | 130 +++++++++++++++++++++ .../org/apache/calcite/test/JdbcAdapterTest.java | 41 +++++++ 5 files changed, 231 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java index b9aa94d981..6ec10cf951 100644 --- a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java +++ b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java @@ -2138,6 +2138,7 @@ public abstract class RelOptUtil { case INSERT: case DELETE: case UPDATE: + case MERGE: return typeFactory.createStructType( PairList.of(AvaticaConnection.ROWCOUNT_COLUMN_NAME, typeFactory.createSqlType(SqlTypeName.BIGINT))); diff --git a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java index bb0447d1a9..d78e08cded 100644 --- a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java +++ b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java @@ -591,6 +591,7 @@ public class CalcitePrepareImpl implements CalcitePrepare { case INSERT: case DELETE: case UPDATE: + case MERGE: return Meta.StatementType.IS_DML; default: return Meta.StatementType.SELECT; @@ -667,6 +668,7 @@ public class CalcitePrepareImpl implements CalcitePrepare { case INSERT: case DELETE: case UPDATE: + case MERGE: case EXPLAIN: // FIXME: getValidatedNodeType is wrong for DML x = RelOptUtil.createDmlRowType(sqlNode.getKind(), typeFactory); diff --git a/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java b/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java index 2462b89d55..5b59d2b9ee 100644 --- a/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java +++ b/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java @@ -72,6 +72,7 @@ import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlMatchRecognize; +import org.apache.calcite.sql.SqlMerge; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlSampleSpec; @@ -1084,7 +1085,62 @@ public class RelToSqlConverter extends SqlImplementor return result(sqlDelete, input.clauses, modify, null); } - case MERGE: + case MERGE: { + final Result input = visitInput(modify, 0); + final SqlSelect select = input.asSelect(); + // When querying with both the `WHEN MATCHED THEN UPDATE` and + // `WHEN NOT MATCHED THEN INSERT` clauses, the selectList consists of three parts: + // the insert expression, the target table reference, and the update expression. + // When querying with the `WHEN MATCHED THEN UPDATE` clause, the selectList will not + // include the update expression. + // However, when querying with the `WHEN NOT MATCHED THEN INSERT` clause, + // the expression list will only contain the insert expression. + final SqlNodeList selectList = SqlUtil.stripListAs(select.getSelectList()); + final SqlJoin join = requireNonNull((SqlJoin) select.getFrom()); + final SqlNode condition = requireNonNull(join.getCondition()); + final SqlNode source = join.getLeft(); + + SqlUpdate update = null; + final List<String> updateColumnList = + requireNonNull(modify.getUpdateColumnList(), + () -> "modify.getUpdateColumnList() is null for " + modify); + final int nUpdateFiled = updateColumnList.size(); + if (nUpdateFiled != 0) { + final SqlNodeList expressionList = + Util.last(selectList, nUpdateFiled).stream() + .collect(SqlNodeList.toList()); + update = + new SqlUpdate(POS, sqlTargetTable, + identifierList(updateColumnList), + expressionList, + condition, null, null); + } + + final RelDataType targetRowType = modify.getTable().getRowType(); + final int nTargetFiled = targetRowType.getFieldCount(); + final int nInsertFiled = nUpdateFiled == 0 + ? selectList.size() : selectList.size() - nTargetFiled - nUpdateFiled; + SqlInsert insert = null; + if (nInsertFiled != 0) { + final SqlNodeList expressionList = + Util.first(selectList, nInsertFiled).stream() + .collect(SqlNodeList.toList()); + final SqlNode valuesCall = + SqlStdOperatorTable.VALUES.createCall(expressionList); + final SqlNodeList columnList = targetRowType.getFieldNames().stream() + .map(f -> new SqlIdentifier(f, POS)) + .collect(SqlNodeList.toList()); + insert = new SqlInsert(POS, SqlNodeList.EMPTY, sqlTargetTable, valuesCall, columnList); + } + + final SqlNode target = join.getRight(); + final SqlNode targetTableAlias = target.getKind() == SqlKind.AS + ? ((SqlCall) target).operand(1) : null; + final SqlMerge merge = + new SqlMerge(POS, sqlTargetTable, condition, source, update, insert, null, + (SqlIdentifier) targetTableAlias); + return result(merge, input.clauses, modify, null); + } default: throw new AssertionError("not implemented: " + modify); } diff --git a/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java b/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java index 50e9808a84..f78751841e 100644 --- a/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java +++ b/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java @@ -7043,6 +7043,136 @@ class RelToSqlConverterTest { .ok(expected); } + @Test void testMerge() { + final String sql1 = "merge into \"DEPT\" as \"t\"\n" + + "using \"DEPT\" as \"s\"\n" + + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n" + + "when matched then\n" + + "update set \"DNAME\" = \"s\".\"DNAME\"\n" + + "when not matched then\n" + + "insert (DEPTNO, DNAME, LOC)\n" + + "values (\"s\".\"DEPTNO\" + 1, lower(\"s\".\"DNAME\"), upper(\"s\".\"LOC\"))"; + final String expected1 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"DEPT0\"\n" + + "USING \"SCOTT\".\"DEPT\"\n" + + "ON \"DEPT\".\"DEPTNO\" = \"DEPT0\".\"DEPTNO\"\n" + + "WHEN MATCHED THEN UPDATE SET \"DNAME\" = \"DEPT\".\"DNAME\"\n" + + "WHEN NOT MATCHED THEN INSERT (\"DEPTNO\", \"DNAME\", \"LOC\") " + + "VALUES CAST(\"DEPT\".\"DEPTNO\" + 1 AS TINYINT),\n" + + "LOWER(\"DEPT\".\"DNAME\"),\n" + + "UPPER(\"DEPT\".\"LOC\")"; + sql(sql1) + .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT) + .ok(expected1); + + // without insert columns + final String sql2 = "merge into \"DEPT\" as \"t\"\n" + + "using \"DEPT\" as \"s\"\n" + + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n" + + "when matched then\n" + + "update set \"DNAME\" = \"s\".\"DNAME\"\n" + + "when not matched then insert\n" + + "values (\"s\".\"DEPTNO\" + 1, lower(\"s\".\"DNAME\"), upper(\"s\".\"LOC\"))"; + final String expected2 = expected1; + sql(sql2) + .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT) + .ok(expected2); + + // reorder insert columns + final String sql3 = "merge into \"DEPT\" as \"t\"\n" + + "using \"DEPT\" as \"s\"\n" + + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n" + + "when matched then\n" + + "update set \"DNAME\" = \"s\".\"DNAME\"\n" + + "when not matched then\n" + + "insert (DEPTNO, LOC, DNAME)\n" + + "values (\"s\".\"DEPTNO\" + 1, lower(\"s\".\"DNAME\"), 'abc')"; + final String expected3 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"DEPT0\"\n" + + "USING \"SCOTT\".\"DEPT\"\n" + + "ON \"DEPT\".\"DEPTNO\" = \"DEPT0\".\"DEPTNO\"\n" + + "WHEN MATCHED THEN UPDATE SET \"DNAME\" = \"DEPT\".\"DNAME\"\n" + + "WHEN NOT MATCHED THEN INSERT (\"DEPTNO\", \"DNAME\", \"LOC\") " + + "VALUES CAST(\"DEPT\".\"DEPTNO\" + 1 AS TINYINT),\n" + + "'abc',\n" + + "LOWER(\"DEPT\".\"DNAME\")"; + sql(sql3) + .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT) + .ok(expected3); + + // without WHEN NOT MATCHED THEN + final String sql4 = "merge into \"DEPT\" as \"t\"\n" + + "using \"DEPT\" as \"s\"\n" + + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n" + + "when matched then\n" + + "update set \"DNAME\" = \"s\".\"DNAME\""; + final String expected4 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"DEPT0\"\n" + + "USING \"SCOTT\".\"DEPT\"\n" + + "ON \"DEPT\".\"DEPTNO\" = \"DEPT0\".\"DEPTNO\"\n" + + "WHEN MATCHED THEN UPDATE SET \"DNAME\" = \"DEPT\".\"DNAME\""; + sql(sql4) + .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT) + .ok(expected4); + + // without WHEN MATCHED THEN + final String sql5 = "merge into \"DEPT\" as \"t\"\n" + + "using \"DEPT\" as \"s\"\n" + + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n" + + "when not matched then\n" + + "insert (DEPTNO, DNAME, LOC)\n" + + "values (\"s\".\"DEPTNO\" + 1, lower(\"s\".\"DNAME\"), upper(\"s\".\"LOC\"))"; + final String expected5 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"DEPT0\"\n" + + "USING \"SCOTT\".\"DEPT\"\n" + + "ON \"DEPT\".\"DEPTNO\" = \"DEPT0\".\"DEPTNO\"\n" + + "WHEN NOT MATCHED THEN INSERT (\"DEPTNO\", \"DNAME\", \"LOC\") " + + "VALUES CAST(\"DEPT\".\"DEPTNO\" + 1 AS TINYINT),\n" + + "LOWER(\"DEPT\".\"DNAME\"),\n" + + "UPPER(\"DEPT\".\"LOC\")"; + sql(sql5) + .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT) + .ok(expected5); + + // using query + final String sql6 = "merge into \"DEPT\" as \"t\"\n" + + "using (select * from \"DEPT\" where \"DEPTNO\" <> 5) as \"s\"\n" + + "on \"t\".\"DEPTNO\" = \"s\".\"DEPTNO\"\n" + + "when not matched then\n" + + "insert (DEPTNO, DNAME, LOC)\n" + + "values (\"s\".\"DEPTNO\" + 1, lower(\"s\".\"DNAME\"), upper(\"s\".\"LOC\"))"; + final String expected6 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"DEPT0\"\n" + + "USING (SELECT *\n" + + "FROM \"SCOTT\".\"DEPT\"\n" + + "WHERE CAST(\"DEPTNO\" AS INTEGER) <> 5) AS \"t0\"\n" + + "ON \"t0\".\"DEPTNO\" = \"DEPT0\".\"DEPTNO\"\n" + + "WHEN NOT MATCHED THEN INSERT (\"DEPTNO\", \"DNAME\", \"LOC\") " + + "VALUES CAST(\"t0\".\"DEPTNO\" + 1 AS TINYINT),\n" + + "LOWER(\"t0\".\"DNAME\"),\n" + + "UPPER(\"t0\".\"LOC\")"; + sql(sql6) + .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT) + .ok(expected6); + + final String sql7 = "merge into \"DEPT\" as \"t\"\n" + + "using (select * from (values (1, 'name', 'loc'))) as \"s\"(\"a\", \"b\", \"c\")\n" + + "on \"t\".\"DEPTNO\" = \"s\".\"a\"\n" + + "when matched then\n" + + "update set \"DNAME\" = 'abc'" + + "when not matched then\n" + + "insert (DEPTNO, DNAME, LOC)\n" + + "values (\"s\".\"a\" + 1, lower(\"s\".\"b\"), upper(\"s\".\"c\"))"; + final String expected7 = "MERGE INTO \"SCOTT\".\"DEPT\" AS \"t1\"\n" + + "USING (SELECT *\n" + + "FROM (VALUES (1, 'name', 'loc')) " + + "AS \"t\" (\"EXPR$0\", \"EXPR$1\", \"EXPR$2\")) AS \"t0\"\n" + + "ON \"t0\".\"EXPR$0\" = \"t1\".\"DEPTNO0\"\n" + + "WHEN MATCHED THEN UPDATE SET \"DNAME\" = 'abc'\n" + + "WHEN NOT MATCHED THEN INSERT (\"DEPTNO\", \"DNAME\", \"LOC\") " + + "VALUES CAST(\"t0\".\"EXPR$0\" + 1 AS TINYINT),\n" + + "LOWER(\"t0\".\"EXPR$1\"),\n" + + "UPPER(\"t0\".\"EXPR$2\")"; + sql(sql7) + .schema(CalciteAssert.SchemaSpec.JDBC_SCOTT) + .ok(expected7); + } + /** Test case for * <a href="https://issues.apache.org/jira/browse/CALCITE-5265">[CALCITE-5265] * JDBC adapter sometimes adds unnecessary parentheses around SELECT in INSERT</a>. */ diff --git a/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java b/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java index 564fe6dab9..ea57e5d945 100644 --- a/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java +++ b/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java @@ -1106,6 +1106,47 @@ class JdbcAdapterTest { .returns("C=null\nC=null\nC=null\nC=null\nC=null\nC=null\nC=null\n"); } + @Test void testMerge() throws Exception { + final String sql = "merge into \"foodmart\".\"expense_fact\"\n" + + "using (values(666, 42)) as vals(store_id, amount)\n" + + "on \"expense_fact\".\"store_id\" = vals.store_id\n" + + "when matched then update\n" + + "set \"amount\" = vals.amount\n" + + "when not matched then insert\n" + + "values (vals.store_id, 666, TIMESTAMP '1997-01-01 00:00:00', 666, '666', 666," + + " vals.amount)"; + final String explain = "PLAN=JdbcToEnumerableConverter\n" + + " JdbcTableModify(table=[[foodmart, expense_fact]], operation=[MERGE]," + + " updateColumnList=[[amount]], flattened=[false])\n" + + " JdbcProject(STORE_ID=[$0], $f1=[666], $f2=[1997-01-01 00:00:00], $f3=[666]," + + " $f4=['666'], $f5=[666], AMOUNT=[CAST($1):DECIMAL(10, 4) NOT NULL], store_id=[$2]," + + " account_id=[$3], exp_date=[$4], time_id=[$5], category_id=[$6], currency_id=[$7]," + + " amount=[$8], AMOUNT0=[$1])\n" + + " JdbcJoin(condition=[=($2, $0)], joinType=[left])\n" + + " JdbcValues(tuples=[[{ 666, 42 }]])\n" + + " JdbcTableScan(table=[[foodmart, expense_fact]])\n"; + final String jdbcSql = "MERGE INTO \"foodmart\".\"expense_fact\"\n" + + "USING (VALUES (666, 42)) AS \"t\" (\"STORE_ID\", \"AMOUNT\")\n" + + "ON \"t\".\"STORE_ID\" = \"expense_fact\".\"store_id\"\n" + + "WHEN MATCHED THEN UPDATE SET \"amount\" = \"t\".\"AMOUNT\"\n" + + "WHEN NOT MATCHED THEN INSERT (\"store_id\", \"account_id\", \"exp_date\", \"time_id\", " + + "\"category_id\", \"currency_id\", \"amount\") VALUES \"t\".\"STORE_ID\",\n" + + "666,\nTIMESTAMP '1997-01-01 00:00:00',\n666,\n'666',\n666,\n" + + "CAST(\"t\".\"AMOUNT\" AS DECIMAL(10, 4))"; + final AssertThat that = + CalciteAssert.model(FoodmartSchema.FOODMART_MODEL) + .enable(CalciteAssert.DB == DatabaseInstance.HSQLDB); + that.doWithConnection(connection -> { + try (LockWrapper ignore = exclusiveCleanDb(connection)) { + that.query(sql) + .explainContains(explain) + .planUpdateHasSql(jdbcSql, 1); + } catch (SQLException e) { + throw TestUtil.rethrow(e); + } + }); + } + /** Acquires a lock, and releases it when closed. */ static class LockWrapper implements AutoCloseable { private final Lock lock;