This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9ada38327b8ae204987c5e309c2c3528da5776ff
Author: meiyi <myime...@gmail.com>
AuthorDate: Sun Mar 10 17:39:05 2024 +0800

    [feature](txn insert) txn insert support insert into select (#31666)
---
 .../doris/nereids/parser/LogicalPlanBuilder.java   |   6 +-
 .../commands/insert/AbstractInsertExecutor.java    |   2 +-
 .../commands/insert/InsertIntoTableCommand.java    |   4 +-
 .../trees/plans/commands/insert/InsertUtils.java   |   5 +
 .../plans/commands/insert/OlapInsertExecutor.java  |  30 ++-
 .../java/org/apache/doris/qe/ConnectContext.java   |  13 +-
 .../apache/doris/qe/InsertStreamTxnExecutor.java   |   3 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  90 +++----
 .../apache/doris/transaction/TransactionEntry.java | 163 ++++++++++++-
 regression-test/data/insert_p0/txn_insert.out      | 259 ++++++++++++++++++++
 regression-test/suites/insert_p0/txn_insert.groovy | 263 +++++++++++++++------
 11 files changed, 683 insertions(+), 155 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 66d4eebecff..874b6a0d432 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -546,7 +546,11 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         if (isOverwrite) {
             command = new InsertOverwriteTableCommand(sink, labelName);
         } else {
-            if (ConnectContext.get() != null && 
ConnectContext.get().isTxnModel()) {
+            if (ConnectContext.get() != null && 
ConnectContext.get().isTxnModel()
+                    && sink.child() instanceof LogicalInlineTable) {
+                // FIXME: In legacy, the `insert into select 1` is handled as 
`insert into values`.
+                //  In nereids, the original way is throw an AnalysisException 
and fallback to legacy.
+                //  Now handle it as `insert into select`(a separate load 
job), should fix it as the legacy.
                 command = new BatchInsertIntoTableCommand(sink);
             } else {
                 command = new InsertIntoTableCommand(sink, labelName, 
Optional.empty());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 2ee55caf18d..f0df240762b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -49,7 +49,7 @@ public abstract class AbstractInsertExecutor {
     protected long jobId;
     protected final ConnectContext ctx;
     protected final Coordinator coordinator;
-    protected final String labelName;
+    protected String labelName;
     protected final DatabaseIf database;
     protected final TableIf table;
     protected final long createTime = System.currentTimeMillis();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 56b99d8288d..8bf454f0980 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -152,7 +152,9 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
             Preconditions.checkArgument(plan.isPresent(), "insert into command 
must contain target table");
             PhysicalSink physicalSink = plan.get();
             DataSink sink = planner.getFragments().get(0).getSink();
-            String label = this.labelName.orElse(String.format("label_%x_%x", 
ctx.queryId().hi, ctx.queryId().lo));
+            // Transaction insert should reuse the label in the transaction.
+            String label = this.labelName.orElse(
+                    ctx.isTxnModel() ? null : String.format("label_%x_%x", 
ctx.queryId().hi, ctx.queryId().lo));
 
             if (physicalSink instanceof PhysicalOlapTableSink) {
                 if (GroupCommitInserter.groupCommit(ctx, sink, physicalSink)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index c1894c50eb3..f0e7fab736c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -165,6 +165,11 @@ public class InsertUtils {
     private static void beginBatchInsertTransaction(ConnectContext ctx,
             String dbName, String tblName, List<Column> columns) {
         TransactionEntry txnEntry = ctx.getTxnEntry();
+        if (txnEntry.isTransactionBegan()) {
+            // FIXME: support mix usage of `insert into values` and `insert 
into select`
+            throw new AnalysisException(
+                    "Transaction insert can not insert into values and insert 
into select at the same time");
+        }
         TTxnParams txnConf = txnEntry.getTxnConf();
         SessionVariable sessionVariable = ctx.getSessionVariable();
         long timeoutSecond = ctx.getExecTimeout();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index bb61c71b282..289f9a974d6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -47,6 +47,7 @@ import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TOlapTableLocationParam;
 import org.apache.doris.thrift.TPartitionType;
 import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TransactionEntry;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
@@ -86,10 +87,22 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
     @Override
     public void beginTransaction() {
         try {
-            this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
-                    database.getId(), ImmutableList.of(table.getId()), 
labelName,
-                    new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
-                    LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
+            if (ctx.isTxnModel()) {
+                TransactionEntry txnEntry = ctx.getTxnEntry();
+                // check the same label with begin
+                if (this.labelName != null && 
!this.labelName.equals(txnEntry.getLabel())) {
+                    throw new AnalysisException("Transaction insert expect 
label " + txnEntry.getLabel()
+                            + ", but got " + this.labelName);
+                }
+                txnEntry.beginTransaction(database, table);
+                this.txnId = txnEntry.getTransactionId();
+                this.labelName = txnEntry.getLabel();
+            } else {
+                this.txnId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(
+                        database.getId(), ImmutableList.of(table.getId()), 
labelName,
+                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        LoadJobSourceType.INSERT_STREAMING, 
ctx.getExecTimeout());
+            }
         } catch (Exception e) {
             throw new AnalysisException("begin transaction failed. " + 
e.getMessage(), e);
         }
@@ -160,6 +173,12 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
 
     @Override
     protected void onComplete() throws UserException {
+        if (ctx.isTxnModel()) {
+            TransactionEntry txnEntry = ctx.getTxnEntry();
+            txnEntry.addCommitInfos((Table) table, 
coordinator.getCommitInfos());
+            return;
+        }
+
         if (ctx.getState().getStateType() == MysqlStateType.ERR) {
             try {
                 String errMsg = 
Strings.emptyToNull(ctx.getState().getErrorMessage());
@@ -241,7 +260,8 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
         // {'label':'my_label1', 'status':'visible', 'txnId':'123'}
         // {'label':'my_label1', 'status':'visible', 'txnId':'123' 
'err':'error messages'}
         StringBuilder sb = new StringBuilder();
-        sb.append("{'label':'").append(labelName).append("', 
'status':'").append(txnStatus.name());
+        sb.append("{'label':'").append(labelName).append("', 'status':'")
+                .append(ctx.isTxnModel() ? TransactionStatus.PREPARE.name() : 
txnStatus.name());
         sb.append("', 'txnId':'").append(txnId).append("'");
         if (table.getType() == TableType.MATERIALIZED_VIEW) {
             sb.append("', 'rows':'").append(loadedRows).append("'");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index a36d4aeacc9..d561494fe52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -402,14 +402,11 @@ public class ConnectContext {
 
     public void closeTxn() {
         if (isTxnModel()) {
-            if (isTxnBegin()) {
-                try {
-                    InsertStreamTxnExecutor executor = new 
InsertStreamTxnExecutor(getTxnEntry());
-                    executor.abortTransaction();
-                } catch (Exception e) {
-                    LOG.error("db: {}, txnId: {}, rollback error.", currentDb,
-                            txnEntry.getTxnConf().getTxnId(), e);
-                }
+            try {
+                txnEntry.abortTransaction();
+            } catch (Exception e) {
+                LOG.error("db: {}, txnId: {}, rollback error.", currentDb,
+                        txnEntry.getTransactionId(), e);
             }
             txnEntry = null;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 77d4c52ac9d..f37457cf58d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.qe;
 
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.Config;
@@ -67,7 +68,7 @@ public class InsertStreamTxnExecutor {
         // StreamLoadTask's id == request's load_id
         StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(request);
         StreamLoadPlanner planner = new StreamLoadPlanner(
-                txnEntry.getDb(), (OlapTable) txnEntry.getTable(), 
streamLoadTask);
+                (Database) txnEntry.getDb(), (OlapTable) txnEntry.getTable(), 
streamLoadTask);
         // Will using load id as query id in fragment
         if (Config.enable_pipeline_load) {
             TPipelineFragmentParams tRequest = 
planner.planForPipeline(streamLoadTask.getId());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 8e33c9cde85..a52ed1c260e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -175,8 +175,6 @@ import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TTxnParams;
 import org.apache.doris.thrift.TUniqueId;
-import org.apache.doris.thrift.TWaitingTxnStatusRequest;
-import org.apache.doris.thrift.TWaitingTxnStatusResult;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionEntry;
 import org.apache.doris.transaction.TransactionState;
@@ -512,10 +510,19 @@ public class StmtExecutor {
                         LOG.warn("Analyze failed. {}", 
context.getQueryIdentifier(), e);
                         throw ((NereidsException) e).getException();
                     }
-                    boolean isInsertIntoCommand = parsedStmt != null && 
parsedStmt instanceof LogicalPlanAdapter
+                    // FIXME: Force fallback for:
+                    //  1. group commit because nereids does not support it 
(see the following `isGroupCommit` variable)
+                    //  2. insert into command because some nereids cases fail 
(including case1)
+                    //  Skip force fallback for:
+                    //  1. Transaction insert because nereids support `insert 
into select` while legacy does not
+                    boolean isInsertCommand = parsedStmt != null
+                            && parsedStmt instanceof LogicalPlanAdapter
                             && ((LogicalPlanAdapter) 
parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand;
-                    if (e instanceof NereidsException
-                            && 
!context.getSessionVariable().enableFallbackToOriginalPlanner && 
!isInsertIntoCommand) {
+                    /*boolean isGroupCommit = 
(Config.wait_internal_group_commit_finish
+                            || 
context.sessionVariable.isEnableInsertGroupCommit()) && isInsertCommand;*/
+                    boolean forceFallback = isInsertCommand && 
!context.isTxnModel();
+                    if (e instanceof NereidsException && 
!context.getSessionVariable().enableFallbackToOriginalPlanner
+                            && !forceFallback) {
                         LOG.warn("Analyze failed. {}", 
context.getQueryIdentifier(), e);
                         throw ((NereidsException) e).getException();
                     }
@@ -595,7 +602,8 @@ public class StmtExecutor {
         LogicalPlan logicalPlan = ((LogicalPlanAdapter) 
parsedStmt).getLogicalPlan();
         // when we in transaction mode, we only support insert into command 
and transaction command
         if (context.isTxnModel()) {
-            if (!(logicalPlan instanceof BatchInsertIntoTableCommand)) {
+            if (!(logicalPlan instanceof BatchInsertIntoTableCommand
+                    || logicalPlan instanceof InsertIntoTableCommand)) {
                 String errMsg = "This is in a transaction, only insert, 
commit, rollback is acceptable.";
                 throw new NereidsException(errMsg, new 
AnalysisException(errMsg));
             }
@@ -1701,18 +1709,6 @@ public class StmtExecutor {
         }
     }
 
-    private TWaitingTxnStatusResult 
getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception {
-        TWaitingTxnStatusResult statusResult = null;
-        if (Env.getCurrentEnv().isMaster()) {
-            statusResult = Env.getCurrentGlobalTransactionMgr()
-                    .getWaitingTxnStatus(request);
-        } else {
-            MasterTxnExecutor masterTxnExecutor = new 
MasterTxnExecutor(context);
-            statusResult = masterTxnExecutor.getWaitingTxnStatus(request);
-        }
-        return statusResult;
-    }
-
     private void handleTransactionStmt() throws Exception {
         if (context.getConnectType() == ConnectType.MYSQL) {
             // Every time set no send flag and clean all data in buffer
@@ -1721,6 +1717,7 @@ public class StmtExecutor {
         context.getState().setOk(0, 0, "");
         // create plan
         if (context.getTxnEntry() != null && 
context.getTxnEntry().getRowsInTransaction() == 0
+                && !context.getTxnEntry().isTransactionBegan()
                 && (parsedStmt instanceof TransactionCommitStmt || parsedStmt 
instanceof TransactionRollbackStmt)) {
             context.setTxnEntry(null);
         } else if (parsedStmt instanceof TransactionBeginStmt) {
@@ -1728,18 +1725,13 @@ public class StmtExecutor {
                 LOG.info("A transaction has already begin");
                 return;
             }
-            TTxnParams txnParams = new TTxnParams();
-            
txnParams.setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load)
-                    
.setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("");
-            if (context.getSessionVariable().getEnableInsertStrict()) {
-                txnParams.setMaxFilterRatio(0);
-            } else {
-                txnParams.setMaxFilterRatio(1.0);
-            }
             if (context.getTxnEntry() == null) {
                 context.setTxnEntry(new TransactionEntry());
             }
-            context.getTxnEntry().setTxnConf(txnParams);
+            context.getTxnEntry()
+                    .setTxnConf(new 
TTxnParams().setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load)
+                            
.setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("")
+                            
.setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0 : 
1.0));
             StringBuilder sb = new StringBuilder();
             
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 
'status':'")
                     .append(TransactionStatus.PREPARE.name());
@@ -1750,39 +1742,13 @@ public class StmtExecutor {
                 LOG.info("No transaction to commit");
                 return;
             }
-
-            TTxnParams txnConf = context.getTxnEntry().getTxnConf();
             try {
-                InsertStreamTxnExecutor executor = new 
InsertStreamTxnExecutor(context.getTxnEntry());
-                if (context.getTxnEntry().getDataToSend().size() > 0) {
-                    // send rest data
-                    executor.sendData();
-                }
-                // commit txn
-                executor.commitTransaction();
-
-                // wait txn visible
-                TWaitingTxnStatusRequest request = new 
TWaitingTxnStatusRequest();
-                
request.setDbId(txnConf.getDbId()).setTxnId(txnConf.getTxnId());
-                request.setLabelIsSet(false);
-                request.setTxnIdIsSet(true);
-
-                TWaitingTxnStatusResult statusResult = 
getWaitingTxnStatus(request);
-                TransactionStatus txnStatus = 
TransactionStatus.valueOf(statusResult.getTxnStatusId());
-                if (txnStatus == TransactionStatus.COMMITTED) {
-                    throw new AnalysisException("transaction commit 
successfully, BUT data will be visible later.");
-                } else if (txnStatus != TransactionStatus.VISIBLE) {
-                    String errMsg = "commit failed, rollback.";
-                    if (statusResult.getStatus().isSetErrorMsgs()
-                            && statusResult.getStatus().getErrorMsgs().size() 
> 0) {
-                        errMsg = String.join(". ", 
statusResult.getStatus().getErrorMsgs());
-                    }
-                    throw new AnalysisException(errMsg);
-                }
+                TransactionEntry txnEntry = context.getTxnEntry();
+                TransactionStatus txnStatus = txnEntry.commitTransaction();
                 StringBuilder sb = new StringBuilder();
-                
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 
'status':'")
+                sb.append("{'label':'").append(txnEntry.getLabel()).append("', 
'status':'")
                         .append(txnStatus.name()).append("', 'txnId':'")
-                        
.append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}");
+                        
.append(txnEntry.getTransactionId()).append("'").append("}");
                 context.getState().setOk(0, 0, sb.toString());
             } catch (Exception e) {
                 LOG.warn("Txn commit failed", e);
@@ -1796,14 +1762,12 @@ public class StmtExecutor {
                 return;
             }
             try {
-                // abort txn
-                InsertStreamTxnExecutor executor = new 
InsertStreamTxnExecutor(context.getTxnEntry());
-                executor.abortTransaction();
-
+                TransactionEntry txnEntry = context.getTxnEntry();
+                long txnId = txnEntry.abortTransaction();
                 StringBuilder sb = new StringBuilder();
-                
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 
'status':'")
+                sb.append("{'label':'").append(txnEntry.getLabel()).append("', 
'status':'")
                         .append(TransactionStatus.ABORTED.name()).append("', 
'txnId':'")
-                        
.append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}");
+                        .append(txnId).append("'").append("}");
                 context.getState().setOk(0, 0, sb.toString());
             } catch (Exception e) {
                 throw new AnalysisException(e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 71368715794..43e6585e584 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -18,19 +18,48 @@
 package org.apache.doris.transaction;
 
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.QuotaExceedException;
+import org.apache.doris.common.UserException;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.Types;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.InsertStreamTxnExecutor;
+import org.apache.doris.qe.MasterTxnExecutor;
+import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TTabletCommitInfo;
 import org.apache.doris.thrift.TTxnParams;
+import org.apache.doris.thrift.TWaitingTxnStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusResult;
+import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
+import org.apache.doris.transaction.TransactionState.TxnCoordinator;
+import org.apache.doris.transaction.TransactionState.TxnSourceType;
+
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 public class TransactionEntry {
 
+    private static final Logger LOG = 
LogManager.getLogger(TransactionEntry.class);
+
     private String label = "";
-    private Database db;
+    private DatabaseIf database;
+
+    // for insert into values for one table
     private Table table;
     private Backend backend;
     private TTxnParams txnConf;
@@ -38,12 +67,19 @@ public class TransactionEntry {
     private long rowsInTransaction = 0;
     private Types.PUniqueId pLoadId;
 
+    // for insert into select for multi tables
+    private boolean isTransactionBegan = false;
+    private long transactionId = -1;
+    private TransactionState transactionState;
+    private List<Table> tableList = new ArrayList<>();
+    private List<TTabletCommitInfo> tabletCommitInfos = new ArrayList<>();
+
     public TransactionEntry() {
     }
 
     public TransactionEntry(TTxnParams txnConf, Database db, Table table) {
         this.txnConf = txnConf;
-        this.db = db;
+        this.database = db;
         this.table = table;
     }
 
@@ -55,12 +91,12 @@ public class TransactionEntry {
         this.label = label;
     }
 
-    public Database getDb() {
-        return db;
+    public DatabaseIf getDb() {
+        return database;
     }
 
     public void setDb(Database db) {
-        this.db = db;
+        this.database = db;
     }
 
     public Table getTable() {
@@ -126,4 +162,121 @@ public class TransactionEntry {
     public void setpLoadId(Types.PUniqueId pLoadId) {
         this.pLoadId = pLoadId;
     }
+
+    // Used for insert into select
+    public void beginTransaction(DatabaseIf database, TableIf table)
+            throws DdlException, BeginTransactionException, 
MetaNotFoundException, AnalysisException,
+            QuotaExceedException {
+        if (isTxnBegin()) {
+            // FIXME: support mix usage of `insert into values` and `insert 
into select`
+            throw new AnalysisException(
+                    "Transaction insert can not insert into values and insert 
into select at the same time");
+        }
+        if (!isTransactionBegan) {
+            this.transactionId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(
+                    database.getId(), Lists.newArrayList(table.getId()), label,
+                    new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                    LoadJobSourceType.INSERT_STREAMING, 
ConnectContext.get().getExecTimeout());
+            this.isTransactionBegan = true;
+            this.database = database;
+            this.transactionState = Env.getCurrentGlobalTransactionMgr()
+                    .getTransactionState(database.getId(), transactionId);
+        } else {
+            if (this.database.getId() != database.getId()) {
+                throw new AnalysisException(
+                        "Transaction insert must be in the same database, 
expect db_id=" + this.database.getId());
+            }
+            this.transactionState.getTableIdList().add(table.getId());
+        }
+    }
+
+    public TransactionStatus commitTransaction()
+            throws Exception {
+        if (isTransactionBegan) {
+            if (Env.getCurrentGlobalTransactionMgr()
+                    .commitAndPublishTransaction(database, tableList, 
transactionId,
+                            TabletCommitInfo.fromThrift(tabletCommitInfos), 
ConnectContext.get().getExecTimeout())) {
+                return TransactionStatus.VISIBLE;
+            } else {
+                return TransactionStatus.COMMITTED;
+            }
+        } else if (isTxnBegin()) {
+            InsertStreamTxnExecutor executor = new 
InsertStreamTxnExecutor(this);
+            if (dataToSend.size() > 0) {
+                // send rest data
+                executor.sendData();
+            }
+            // commit txn
+            executor.commitTransaction();
+
+            // wait txn visible
+            TWaitingTxnStatusRequest request = new TWaitingTxnStatusRequest();
+            request.setDbId(txnConf.getDbId()).setTxnId(txnConf.getTxnId());
+            request.setLabelIsSet(false);
+            request.setTxnIdIsSet(true);
+
+            TWaitingTxnStatusResult statusResult = 
getWaitingTxnStatus(request);
+            TransactionStatus txnStatus = 
TransactionStatus.valueOf(statusResult.getTxnStatusId());
+            if (txnStatus == TransactionStatus.COMMITTED) {
+                throw new AnalysisException("transaction commit successfully, 
BUT data will be visible later.");
+            } else if (txnStatus != TransactionStatus.VISIBLE) {
+                String errMsg = "commit failed, rollback.";
+                if (statusResult.getStatus().isSetErrorMsgs()
+                        && statusResult.getStatus().getErrorMsgs().size() > 0) 
{
+                    errMsg = String.join(". ", 
statusResult.getStatus().getErrorMsgs());
+                }
+                throw new AnalysisException(errMsg);
+            }
+            return txnStatus;
+        } else {
+            LOG.info("No transaction to commit");
+            return null;
+        }
+    }
+
+    private TWaitingTxnStatusResult 
getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception {
+        TWaitingTxnStatusResult statusResult = null;
+        if (Env.getCurrentEnv().isMaster()) {
+            statusResult = Env.getCurrentGlobalTransactionMgr()
+                    .getWaitingTxnStatus(request);
+        } else {
+            MasterTxnExecutor masterTxnExecutor = new 
MasterTxnExecutor(ConnectContext.get());
+            statusResult = masterTxnExecutor.getWaitingTxnStatus(request);
+        }
+        return statusResult;
+    }
+
+    public long abortTransaction()
+            throws UserException, TException, ExecutionException, 
InterruptedException, TimeoutException {
+        if (isTransactionBegan) {
+            
Env.getCurrentGlobalTransactionMgr().abortTransaction(database.getId(), 
transactionId, "user rollback");
+            return transactionId;
+        } else if (isTxnBegin()) {
+            InsertStreamTxnExecutor executor = new 
InsertStreamTxnExecutor(this);
+            executor.abortTransaction();
+            return txnConf.getTxnId();
+        } else {
+            LOG.info("No transaction to abort");
+            return -1;
+        }
+    }
+
+    public long getTransactionId() {
+        if (isTransactionBegan) {
+            return transactionId;
+        } else if (isTxnBegin()) {
+            return txnConf.getTxnId();
+        } else {
+            return -1;
+        }
+    }
+
+    public void addCommitInfos(Table table, List<TTabletCommitInfo> 
commitInfos) {
+        this.tableList.add(table);
+        this.tabletCommitInfos.addAll(commitInfos);
+    }
+
+    public boolean isTransactionBegan() {
+        return this.isTransactionBegan;
+    }
 }
diff --git a/regression-test/data/insert_p0/txn_insert.out 
b/regression-test/data/insert_p0/txn_insert.out
index f4947a2ac65..f6f9a5a648e 100644
--- a/regression-test/data/insert_p0/txn_insert.out
+++ b/regression-test/data/insert_p0/txn_insert.out
@@ -39,3 +39,262 @@
 6
 8
 
+-- !select7 --
+
+-- !select8 --
+
+-- !select9 --
+
+-- !select1 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select2 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select3 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select4 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select5 --
+1      2
+3      4
+5      6
+7      8
+
+-- !select6 --
+2
+4
+6
+8
+
+-- !select7 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select8 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select9 --
+
+-- !select10 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select11 --
+\N     \N      \N      [null]  [null, 0]
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select12 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select13 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select14 --
+\N     \N      \N      [null]  [null, 0]
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select15 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select16 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select17 --
+\N     \N      \N      [null]  [null, 0]
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+100    2.2     abc     []      []
+101    2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select18 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select19 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select20 --
+\N     \N      \N      [null]  [null, 0]
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+100    2.2     abc     []      []
+101    2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select21 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select22 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select23 --
+\N     \N      \N      [null]  [null, 0]
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+100    2.2     abc     []      []
+101    2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select24 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
diff --git a/regression-test/suites/insert_p0/txn_insert.groovy 
b/regression-test/suites/insert_p0/txn_insert.groovy
index 6e745073b1b..ef3d46141a7 100644
--- a/regression-test/suites/insert_p0/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert.groovy
@@ -21,74 +21,197 @@
 
 suite("txn_insert") {
     def table = "txn_insert_tbl"
-    sql """ DROP TABLE IF EXISTS $table """
-    sql """
-        create table $table (
-            k1 int, 
-            k2 double,
-            k3 varchar(100),
-            k4 array<int>,
-            k5 array<boolean>
-        ) distributed by hash(k1) buckets 1
-        properties("replication_num" = "1"); 
-    """
-
-    // begin and commit
-    sql """begin"""
-    sql """insert into $table values(1, 2.2, "abc", [], [])"""
-    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
-    sql """insert into $table values(null, null, null, [null], [null, 0])"""
-    sql "commit"
-    sql "sync"
-    order_qt_select1 """select * from $table"""
-
-    // begin and rollback
-    sql "begin"
-    sql """insert into $table values(1, 2.2, "abc", [], [])"""
-    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
-    sql "rollback"
-    sql "sync"
-    order_qt_select2 """select * from $table"""
-
-    // begin 2 times and commit
-    sql "begin"
-    sql """insert into $table values(1, 2.2, "abc", [], [])"""
-    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
-    sql "begin"
-    sql """insert into $table values(1, 2.2, "abc", [], [])"""
-    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
-    sql "commit"
-    sql "sync"
-    order_qt_select3 """select * from $table"""
-
-    // begin 2 times and rollback
-    sql "begin"
-    sql """insert into $table values(1, 2.2, "abc", [], [])"""
-    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
-    sql "begin"
-    sql """insert into $table values(1, 2.2, "abc", [], [])"""
-    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
-    sql "rollback"
-    sql "sync"
-    order_qt_select4 """select * from $table"""
-
-    // write to table with mv
-    table = table + "_mv"
-    sql """ DROP TABLE IF EXISTS $table """
-    sql """
-        create table $table (
-            id int default '10', 
-            c1 int default '10'
-        ) distributed by hash(id, c1) 
-        properties('replication_num'="1");
-    """
-    createMV """ create materialized view mv_${table} as select c1 from 
$table; """
-    sql "begin"
-    sql """insert into $table values(1, 2), (3, 4)"""
-    sql """insert into $table values(5, 6)"""
-    sql """insert into $table values(7, 8)"""
-    sql "commit"
-    sql "sync"
-    order_qt_select5 """select * from $table"""
-    order_qt_select6 """select c1 from $table"""
+    for (def use_nereids_planner : [false, true]) {
+        sql " SET enable_nereids_planner = $use_nereids_planner; "
+        sql " SET enable_fallback_to_original_planner = false; "
+
+        sql """ DROP TABLE IF EXISTS $table """
+        sql """
+            create table $table (
+                k1 int, 
+                k2 double,
+                k3 varchar(100),
+                k4 array<int>,
+                k5 array<boolean>
+            ) distributed by hash(k1) buckets 1
+            properties("replication_num" = "1"); 
+        """
+
+        // begin and commit
+        sql """begin"""
+        sql """insert into $table values(1, 2.2, "abc", [], [])"""
+        sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+        sql """insert into $table values(null, null, null, [null], [null, 
0])"""
+        sql "commit"
+        sql "sync"
+        order_qt_select1 """select * from $table"""
+
+        // begin and rollback
+        sql "begin"
+        sql """insert into $table values(1, 2.2, "abc", [], [])"""
+        sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+        sql "rollback"
+        sql "sync"
+        order_qt_select2 """select * from $table"""
+
+        // begin 2 times and commit
+        sql "begin"
+        sql """insert into $table values(1, 2.2, "abc", [], [])"""
+        sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+        sql "begin"
+        sql """insert into $table values(1, 2.2, "abc", [], [])"""
+        sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+        sql "commit"
+        sql "sync"
+        order_qt_select3 """select * from $table"""
+
+        // begin 2 times and rollback
+        sql "begin"
+        sql """insert into $table values(1, 2.2, "abc", [], [])"""
+        sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+        sql "begin"
+        sql """insert into $table values(1, 2.2, "abc", [], [])"""
+        sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+        sql "rollback"
+        sql "sync"
+        order_qt_select4 """select * from $table"""
+
+        // write to table with mv
+        def tableMV = table + "_mv"
+        do {
+            sql """ DROP TABLE IF EXISTS $tableMV """
+            sql """
+                create table $tableMV (
+                    id int default '10', 
+                    c1 int default '10'
+                ) distributed by hash(id, c1) 
+                properties('replication_num'="1");
+            """
+            createMV """ create materialized view mv_${tableMV} as select c1 
from $tableMV; """
+            sql "begin"
+            sql """insert into $tableMV values(1, 2), (3, 4)"""
+            sql """insert into $tableMV values(5, 6)"""
+            sql """insert into $tableMV values(7, 8)"""
+            sql "commit"
+            sql "sync"
+            order_qt_select5 """select * from $tableMV"""
+            order_qt_select6 """select c1 from $tableMV"""
+        } while (0);
+
+        // ------------------- insert into select -------------------
+        for (int j = 0; j < 3; j++) {
+            def tableName = table + "_" + j
+            sql """ DROP TABLE IF EXISTS $tableName """
+            sql """
+                create table $tableName (
+                    k1 int, 
+                    k2 double,
+                    k3 varchar(100),
+                    k4 array<int>,
+                    k5 array<boolean>
+                ) distributed by hash(k1) buckets 1
+                properties("replication_num" = "1"); 
+            """
+        }
+
+        def result = sql """ show variables like 
'enable_fallback_to_original_planner'; """
+        logger.info("enable_fallback_to_original_planner: $result")
+
+        // 1. insert into select to 3 tables: batch insert into select only 
supports nereids planner, and can't fallback
+        sql """ begin; """
+        if (use_nereids_planner) {
+            sql """ insert into ${table}_0 select * from $table; """
+            sql """ insert into ${table}_1 select * from $table; """
+            sql """ insert into ${table}_2 select * from ${table}_0; """
+        } else {
+            test {
+                sql """ insert into ${table}_0 select * from $table; """
+                exception "Insert into ** select is not supported in a 
transaction"
+            }
+        }
+        sql """ commit; """
+        sql "sync"
+        order_qt_select7 """select * from ${table}_0"""
+        order_qt_select8 """select * from ${table}_1"""
+        order_qt_select9 """select * from ${table}_2"""
+
+        // 2. with different label
+        if (use_nereids_planner) {
+            def label = UUID.randomUUID().toString().replaceAll("-", "")
+            def label2 = UUID.randomUUID().toString().replaceAll("-", "")
+            sql """ begin with label $label; """
+            test {
+                sql """ insert into ${table}_0 with label $label2 select * 
from $table; """
+                exception "Transaction insert expect label"
+            }
+            sql """ insert into ${table}_1 select * from $table; """
+            sql """ insert into ${table}_2 select * from ${table}_0; """
+            sql """ commit; """
+            sql "sync"
+            order_qt_select10 """select * from ${table}_0"""
+            order_qt_select11 """select * from ${table}_1"""
+            order_qt_select12 """select * from ${table}_2"""
+        }
+
+        // 3. insert into select and values
+        if (use_nereids_planner) {
+            sql """ begin; """
+            sql """ insert into ${table}_0 select * from $table where k1 = 1; 
"""
+            test {
+                sql """insert into ${table}_1 values(1, 2.2, "abc", [], [])"""
+                exception "Transaction insert can not insert into values and 
insert into select at the same time"
+            }
+            sql """ insert into ${table}_1 select * from $table where k2 = 2.2 
limit 1; """
+            sql """ commit; """
+            sql "sync"
+            order_qt_select13 """select * from ${table}_0"""
+            order_qt_select14 """select * from ${table}_1"""
+            order_qt_select15 """select * from ${table}_2"""
+        }
+
+        // 4. insert into values and select
+        if (use_nereids_planner) {
+            sql """ begin; """
+            sql """insert into ${table}_1 values(100, 2.2, "abc", [], [])"""
+            test {
+                sql """ insert into ${table}_0 select * from $table; """
+                exception "Transaction insert can not insert into values and 
insert into select at the same time"
+            }
+            sql """insert into ${table}_1 values(101, 2.2, "abc", [], [])"""
+            sql """ commit; """
+            sql "sync"
+            order_qt_select16 """select * from ${table}_0"""
+            order_qt_select17 """select * from ${table}_1"""
+            order_qt_select18 """select * from ${table}_2"""
+        }
+
+        // 5. rollback
+        if (use_nereids_planner) {
+            def label = UUID.randomUUID().toString().replaceAll("-", "")
+            sql """ begin with label $label; """
+            sql """ insert into ${table}_0 select * from $table where k1 = 1; 
"""
+            sql """ insert into ${table}_1 select * from $table where k2 = 2.2 
limit 1; """
+            sql """ rollback; """
+            logger.info("rollback $label")
+            sql "sync"
+            order_qt_select19 """select * from ${table}_0"""
+            order_qt_select20 """select * from ${table}_1"""
+            order_qt_select21 """select * from ${table}_2"""
+        }
+
+        // 6. insert select with error
+        if (use_nereids_planner) {
+            sql """ begin; """
+            test {
+                sql """ insert into ${table}_0 select * from $tableMV; """
+                exception "insert into cols should be corresponding to the 
query output"
+            }
+            sql """ insert into ${table}_1 select * from $table where k2 = 2.2 
limit 1; """
+            sql """ commit; """
+            sql "sync"
+            order_qt_select22 """select * from ${table}_0"""
+            order_qt_select23 """select * from ${table}_1"""
+            order_qt_select24 """select * from ${table}_2"""
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to