This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9ada38327b8ae204987c5e309c2c3528da5776ff Author: meiyi <myime...@gmail.com> AuthorDate: Sun Mar 10 17:39:05 2024 +0800 [feature](txn insert) txn insert support insert into select (#31666) --- .../doris/nereids/parser/LogicalPlanBuilder.java | 6 +- .../commands/insert/AbstractInsertExecutor.java | 2 +- .../commands/insert/InsertIntoTableCommand.java | 4 +- .../trees/plans/commands/insert/InsertUtils.java | 5 + .../plans/commands/insert/OlapInsertExecutor.java | 30 ++- .../java/org/apache/doris/qe/ConnectContext.java | 13 +- .../apache/doris/qe/InsertStreamTxnExecutor.java | 3 +- .../java/org/apache/doris/qe/StmtExecutor.java | 90 +++---- .../apache/doris/transaction/TransactionEntry.java | 163 ++++++++++++- regression-test/data/insert_p0/txn_insert.out | 259 ++++++++++++++++++++ regression-test/suites/insert_p0/txn_insert.groovy | 263 +++++++++++++++------ 11 files changed, 683 insertions(+), 155 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 66d4eebecff..874b6a0d432 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -546,7 +546,11 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { if (isOverwrite) { command = new InsertOverwriteTableCommand(sink, labelName); } else { - if (ConnectContext.get() != null && ConnectContext.get().isTxnModel()) { + if (ConnectContext.get() != null && ConnectContext.get().isTxnModel() + && sink.child() instanceof LogicalInlineTable) { + // FIXME: In legacy, the `insert into select 1` is handled as `insert into values`. + // In nereids, the original way is throw an AnalysisException and fallback to legacy. + // Now handle it as `insert into select`(a separate load job), should fix it as the legacy. command = new BatchInsertIntoTableCommand(sink); } else { command = new InsertIntoTableCommand(sink, labelName, Optional.empty()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index 2ee55caf18d..f0df240762b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -49,7 +49,7 @@ public abstract class AbstractInsertExecutor { protected long jobId; protected final ConnectContext ctx; protected final Coordinator coordinator; - protected final String labelName; + protected String labelName; protected final DatabaseIf database; protected final TableIf table; protected final long createTime = System.currentTimeMillis(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 56b99d8288d..8bf454f0980 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -152,7 +152,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, Preconditions.checkArgument(plan.isPresent(), "insert into command must contain target table"); PhysicalSink physicalSink = plan.get(); DataSink sink = planner.getFragments().get(0).getSink(); - String label = this.labelName.orElse(String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo)); + // Transaction insert should reuse the label in the transaction. + String label = this.labelName.orElse( + ctx.isTxnModel() ? null : String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo)); if (physicalSink instanceof PhysicalOlapTableSink) { if (GroupCommitInserter.groupCommit(ctx, sink, physicalSink)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index c1894c50eb3..f0e7fab736c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -165,6 +165,11 @@ public class InsertUtils { private static void beginBatchInsertTransaction(ConnectContext ctx, String dbName, String tblName, List<Column> columns) { TransactionEntry txnEntry = ctx.getTxnEntry(); + if (txnEntry.isTransactionBegan()) { + // FIXME: support mix usage of `insert into values` and `insert into select` + throw new AnalysisException( + "Transaction insert can not insert into values and insert into select at the same time"); + } TTxnParams txnConf = txnEntry.getTxnConf(); SessionVariable sessionVariable = ctx.getSessionVariable(); long timeoutSecond = ctx.getExecTimeout(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index bb61c71b282..289f9a974d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -47,6 +47,7 @@ import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TOlapTableLocationParam; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.transaction.TransactionEntry; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; @@ -86,10 +87,22 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { @Override public void beginTransaction() { try { - this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - database.getId(), ImmutableList.of(table.getId()), labelName, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), - LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout()); + if (ctx.isTxnModel()) { + TransactionEntry txnEntry = ctx.getTxnEntry(); + // check the same label with begin + if (this.labelName != null && !this.labelName.equals(txnEntry.getLabel())) { + throw new AnalysisException("Transaction insert expect label " + txnEntry.getLabel() + + ", but got " + this.labelName); + } + txnEntry.beginTransaction(database, table); + this.txnId = txnEntry.getTransactionId(); + this.labelName = txnEntry.getLabel(); + } else { + this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( + database.getId(), ImmutableList.of(table.getId()), labelName, + new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout()); + } } catch (Exception e) { throw new AnalysisException("begin transaction failed. " + e.getMessage(), e); } @@ -160,6 +173,12 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { @Override protected void onComplete() throws UserException { + if (ctx.isTxnModel()) { + TransactionEntry txnEntry = ctx.getTxnEntry(); + txnEntry.addCommitInfos((Table) table, coordinator.getCommitInfos()); + return; + } + if (ctx.getState().getStateType() == MysqlStateType.ERR) { try { String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); @@ -241,7 +260,8 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { // {'label':'my_label1', 'status':'visible', 'txnId':'123'} // {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'} StringBuilder sb = new StringBuilder(); - sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name()); + sb.append("{'label':'").append(labelName).append("', 'status':'") + .append(ctx.isTxnModel() ? TransactionStatus.PREPARE.name() : txnStatus.name()); sb.append("', 'txnId':'").append(txnId).append("'"); if (table.getType() == TableType.MATERIALIZED_VIEW) { sb.append("', 'rows':'").append(loadedRows).append("'"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index a36d4aeacc9..d561494fe52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -402,14 +402,11 @@ public class ConnectContext { public void closeTxn() { if (isTxnModel()) { - if (isTxnBegin()) { - try { - InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(getTxnEntry()); - executor.abortTransaction(); - } catch (Exception e) { - LOG.error("db: {}, txnId: {}, rollback error.", currentDb, - txnEntry.getTxnConf().getTxnId(), e); - } + try { + txnEntry.abortTransaction(); + } catch (Exception e) { + LOG.error("db: {}, txnId: {}, rollback error.", currentDb, + txnEntry.getTransactionId(), e); } txnEntry = null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index 77d4c52ac9d..f37457cf58d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.Config; @@ -67,7 +68,7 @@ public class InsertStreamTxnExecutor { // StreamLoadTask's id == request's load_id StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); StreamLoadPlanner planner = new StreamLoadPlanner( - txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask); + (Database) txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask); // Will using load id as query id in fragment if (Config.enable_pipeline_load) { TPipelineFragmentParams tRequest = planner.planForPipeline(streamLoadTask.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 8e33c9cde85..a52ed1c260e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -175,8 +175,6 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TTxnParams; import org.apache.doris.thrift.TUniqueId; -import org.apache.doris.thrift.TWaitingTxnStatusRequest; -import org.apache.doris.thrift.TWaitingTxnStatusResult; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionEntry; import org.apache.doris.transaction.TransactionState; @@ -512,10 +510,19 @@ public class StmtExecutor { LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); throw ((NereidsException) e).getException(); } - boolean isInsertIntoCommand = parsedStmt != null && parsedStmt instanceof LogicalPlanAdapter + // FIXME: Force fallback for: + // 1. group commit because nereids does not support it (see the following `isGroupCommit` variable) + // 2. insert into command because some nereids cases fail (including case1) + // Skip force fallback for: + // 1. Transaction insert because nereids support `insert into select` while legacy does not + boolean isInsertCommand = parsedStmt != null + && parsedStmt instanceof LogicalPlanAdapter && ((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand; - if (e instanceof NereidsException - && !context.getSessionVariable().enableFallbackToOriginalPlanner && !isInsertIntoCommand) { + /*boolean isGroupCommit = (Config.wait_internal_group_commit_finish + || context.sessionVariable.isEnableInsertGroupCommit()) && isInsertCommand;*/ + boolean forceFallback = isInsertCommand && !context.isTxnModel(); + if (e instanceof NereidsException && !context.getSessionVariable().enableFallbackToOriginalPlanner + && !forceFallback) { LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); throw ((NereidsException) e).getException(); } @@ -595,7 +602,8 @@ public class StmtExecutor { LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); // when we in transaction mode, we only support insert into command and transaction command if (context.isTxnModel()) { - if (!(logicalPlan instanceof BatchInsertIntoTableCommand)) { + if (!(logicalPlan instanceof BatchInsertIntoTableCommand + || logicalPlan instanceof InsertIntoTableCommand)) { String errMsg = "This is in a transaction, only insert, commit, rollback is acceptable."; throw new NereidsException(errMsg, new AnalysisException(errMsg)); } @@ -1701,18 +1709,6 @@ public class StmtExecutor { } } - private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception { - TWaitingTxnStatusResult statusResult = null; - if (Env.getCurrentEnv().isMaster()) { - statusResult = Env.getCurrentGlobalTransactionMgr() - .getWaitingTxnStatus(request); - } else { - MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context); - statusResult = masterTxnExecutor.getWaitingTxnStatus(request); - } - return statusResult; - } - private void handleTransactionStmt() throws Exception { if (context.getConnectType() == ConnectType.MYSQL) { // Every time set no send flag and clean all data in buffer @@ -1721,6 +1717,7 @@ public class StmtExecutor { context.getState().setOk(0, 0, ""); // create plan if (context.getTxnEntry() != null && context.getTxnEntry().getRowsInTransaction() == 0 + && !context.getTxnEntry().isTransactionBegan() && (parsedStmt instanceof TransactionCommitStmt || parsedStmt instanceof TransactionRollbackStmt)) { context.setTxnEntry(null); } else if (parsedStmt instanceof TransactionBeginStmt) { @@ -1728,18 +1725,13 @@ public class StmtExecutor { LOG.info("A transaction has already begin"); return; } - TTxnParams txnParams = new TTxnParams(); - txnParams.setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load) - .setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl(""); - if (context.getSessionVariable().getEnableInsertStrict()) { - txnParams.setMaxFilterRatio(0); - } else { - txnParams.setMaxFilterRatio(1.0); - } if (context.getTxnEntry() == null) { context.setTxnEntry(new TransactionEntry()); } - context.getTxnEntry().setTxnConf(txnParams); + context.getTxnEntry() + .setTxnConf(new TTxnParams().setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load) + .setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("") + .setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0 : 1.0)); StringBuilder sb = new StringBuilder(); sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'") .append(TransactionStatus.PREPARE.name()); @@ -1750,39 +1742,13 @@ public class StmtExecutor { LOG.info("No transaction to commit"); return; } - - TTxnParams txnConf = context.getTxnEntry().getTxnConf(); try { - InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(context.getTxnEntry()); - if (context.getTxnEntry().getDataToSend().size() > 0) { - // send rest data - executor.sendData(); - } - // commit txn - executor.commitTransaction(); - - // wait txn visible - TWaitingTxnStatusRequest request = new TWaitingTxnStatusRequest(); - request.setDbId(txnConf.getDbId()).setTxnId(txnConf.getTxnId()); - request.setLabelIsSet(false); - request.setTxnIdIsSet(true); - - TWaitingTxnStatusResult statusResult = getWaitingTxnStatus(request); - TransactionStatus txnStatus = TransactionStatus.valueOf(statusResult.getTxnStatusId()); - if (txnStatus == TransactionStatus.COMMITTED) { - throw new AnalysisException("transaction commit successfully, BUT data will be visible later."); - } else if (txnStatus != TransactionStatus.VISIBLE) { - String errMsg = "commit failed, rollback."; - if (statusResult.getStatus().isSetErrorMsgs() - && statusResult.getStatus().getErrorMsgs().size() > 0) { - errMsg = String.join(". ", statusResult.getStatus().getErrorMsgs()); - } - throw new AnalysisException(errMsg); - } + TransactionEntry txnEntry = context.getTxnEntry(); + TransactionStatus txnStatus = txnEntry.commitTransaction(); StringBuilder sb = new StringBuilder(); - sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'") + sb.append("{'label':'").append(txnEntry.getLabel()).append("', 'status':'") .append(txnStatus.name()).append("', 'txnId':'") - .append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}"); + .append(txnEntry.getTransactionId()).append("'").append("}"); context.getState().setOk(0, 0, sb.toString()); } catch (Exception e) { LOG.warn("Txn commit failed", e); @@ -1796,14 +1762,12 @@ public class StmtExecutor { return; } try { - // abort txn - InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(context.getTxnEntry()); - executor.abortTransaction(); - + TransactionEntry txnEntry = context.getTxnEntry(); + long txnId = txnEntry.abortTransaction(); StringBuilder sb = new StringBuilder(); - sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'") + sb.append("{'label':'").append(txnEntry.getLabel()).append("', 'status':'") .append(TransactionStatus.ABORTED.name()).append("', 'txnId':'") - .append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}"); + .append(txnId).append("'").append("}"); context.getState().setOk(0, 0, sb.toString()); } catch (Exception e) { throw new AnalysisException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index 71368715794..43e6585e584 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -18,19 +18,48 @@ package org.apache.doris.transaction; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.QuotaExceedException; +import org.apache.doris.common.UserException; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.InsertStreamTxnExecutor; +import org.apache.doris.qe.MasterTxnExecutor; +import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TTabletCommitInfo; import org.apache.doris.thrift.TTxnParams; +import org.apache.doris.thrift.TWaitingTxnStatusRequest; +import org.apache.doris.thrift.TWaitingTxnStatusResult; +import org.apache.doris.transaction.TransactionState.LoadJobSourceType; +import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; + +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; public class TransactionEntry { + private static final Logger LOG = LogManager.getLogger(TransactionEntry.class); + private String label = ""; - private Database db; + private DatabaseIf database; + + // for insert into values for one table private Table table; private Backend backend; private TTxnParams txnConf; @@ -38,12 +67,19 @@ public class TransactionEntry { private long rowsInTransaction = 0; private Types.PUniqueId pLoadId; + // for insert into select for multi tables + private boolean isTransactionBegan = false; + private long transactionId = -1; + private TransactionState transactionState; + private List<Table> tableList = new ArrayList<>(); + private List<TTabletCommitInfo> tabletCommitInfos = new ArrayList<>(); + public TransactionEntry() { } public TransactionEntry(TTxnParams txnConf, Database db, Table table) { this.txnConf = txnConf; - this.db = db; + this.database = db; this.table = table; } @@ -55,12 +91,12 @@ public class TransactionEntry { this.label = label; } - public Database getDb() { - return db; + public DatabaseIf getDb() { + return database; } public void setDb(Database db) { - this.db = db; + this.database = db; } public Table getTable() { @@ -126,4 +162,121 @@ public class TransactionEntry { public void setpLoadId(Types.PUniqueId pLoadId) { this.pLoadId = pLoadId; } + + // Used for insert into select + public void beginTransaction(DatabaseIf database, TableIf table) + throws DdlException, BeginTransactionException, MetaNotFoundException, AnalysisException, + QuotaExceedException { + if (isTxnBegin()) { + // FIXME: support mix usage of `insert into values` and `insert into select` + throw new AnalysisException( + "Transaction insert can not insert into values and insert into select at the same time"); + } + if (!isTransactionBegan) { + this.transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction( + database.getId(), Lists.newArrayList(table.getId()), label, + new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + LoadJobSourceType.INSERT_STREAMING, ConnectContext.get().getExecTimeout()); + this.isTransactionBegan = true; + this.database = database; + this.transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(database.getId(), transactionId); + } else { + if (this.database.getId() != database.getId()) { + throw new AnalysisException( + "Transaction insert must be in the same database, expect db_id=" + this.database.getId()); + } + this.transactionState.getTableIdList().add(table.getId()); + } + } + + public TransactionStatus commitTransaction() + throws Exception { + if (isTransactionBegan) { + if (Env.getCurrentGlobalTransactionMgr() + .commitAndPublishTransaction(database, tableList, transactionId, + TabletCommitInfo.fromThrift(tabletCommitInfos), ConnectContext.get().getExecTimeout())) { + return TransactionStatus.VISIBLE; + } else { + return TransactionStatus.COMMITTED; + } + } else if (isTxnBegin()) { + InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(this); + if (dataToSend.size() > 0) { + // send rest data + executor.sendData(); + } + // commit txn + executor.commitTransaction(); + + // wait txn visible + TWaitingTxnStatusRequest request = new TWaitingTxnStatusRequest(); + request.setDbId(txnConf.getDbId()).setTxnId(txnConf.getTxnId()); + request.setLabelIsSet(false); + request.setTxnIdIsSet(true); + + TWaitingTxnStatusResult statusResult = getWaitingTxnStatus(request); + TransactionStatus txnStatus = TransactionStatus.valueOf(statusResult.getTxnStatusId()); + if (txnStatus == TransactionStatus.COMMITTED) { + throw new AnalysisException("transaction commit successfully, BUT data will be visible later."); + } else if (txnStatus != TransactionStatus.VISIBLE) { + String errMsg = "commit failed, rollback."; + if (statusResult.getStatus().isSetErrorMsgs() + && statusResult.getStatus().getErrorMsgs().size() > 0) { + errMsg = String.join(". ", statusResult.getStatus().getErrorMsgs()); + } + throw new AnalysisException(errMsg); + } + return txnStatus; + } else { + LOG.info("No transaction to commit"); + return null; + } + } + + private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception { + TWaitingTxnStatusResult statusResult = null; + if (Env.getCurrentEnv().isMaster()) { + statusResult = Env.getCurrentGlobalTransactionMgr() + .getWaitingTxnStatus(request); + } else { + MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ConnectContext.get()); + statusResult = masterTxnExecutor.getWaitingTxnStatus(request); + } + return statusResult; + } + + public long abortTransaction() + throws UserException, TException, ExecutionException, InterruptedException, TimeoutException { + if (isTransactionBegan) { + Env.getCurrentGlobalTransactionMgr().abortTransaction(database.getId(), transactionId, "user rollback"); + return transactionId; + } else if (isTxnBegin()) { + InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(this); + executor.abortTransaction(); + return txnConf.getTxnId(); + } else { + LOG.info("No transaction to abort"); + return -1; + } + } + + public long getTransactionId() { + if (isTransactionBegan) { + return transactionId; + } else if (isTxnBegin()) { + return txnConf.getTxnId(); + } else { + return -1; + } + } + + public void addCommitInfos(Table table, List<TTabletCommitInfo> commitInfos) { + this.tableList.add(table); + this.tabletCommitInfos.addAll(commitInfos); + } + + public boolean isTransactionBegan() { + return this.isTransactionBegan; + } } diff --git a/regression-test/data/insert_p0/txn_insert.out b/regression-test/data/insert_p0/txn_insert.out index f4947a2ac65..f6f9a5a648e 100644 --- a/regression-test/data/insert_p0/txn_insert.out +++ b/regression-test/data/insert_p0/txn_insert.out @@ -39,3 +39,262 @@ 6 8 +-- !select7 -- + +-- !select8 -- + +-- !select9 -- + +-- !select1 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] + +-- !select2 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] + +-- !select3 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select4 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select5 -- +1 2 +3 4 +5 6 +7 8 + +-- !select6 -- +2 +4 +6 +8 + +-- !select7 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select8 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select9 -- + +-- !select10 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select11 -- +\N \N \N [null] [null, 0] +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select12 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select13 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select14 -- +\N \N \N [null] [null, 0] +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select15 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select16 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select17 -- +\N \N \N [null] [null, 0] +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +100 2.2 abc [] [] +101 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select18 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select19 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select20 -- +\N \N \N [null] [null, 0] +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +100 2.2 abc [] [] +101 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select21 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select22 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select23 -- +\N \N \N [null] [null, 0] +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +100 2.2 abc [] [] +101 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select24 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + diff --git a/regression-test/suites/insert_p0/txn_insert.groovy b/regression-test/suites/insert_p0/txn_insert.groovy index 6e745073b1b..ef3d46141a7 100644 --- a/regression-test/suites/insert_p0/txn_insert.groovy +++ b/regression-test/suites/insert_p0/txn_insert.groovy @@ -21,74 +21,197 @@ suite("txn_insert") { def table = "txn_insert_tbl" - sql """ DROP TABLE IF EXISTS $table """ - sql """ - create table $table ( - k1 int, - k2 double, - k3 varchar(100), - k4 array<int>, - k5 array<boolean> - ) distributed by hash(k1) buckets 1 - properties("replication_num" = "1"); - """ - - // begin and commit - sql """begin""" - sql """insert into $table values(1, 2.2, "abc", [], [])""" - sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" - sql """insert into $table values(null, null, null, [null], [null, 0])""" - sql "commit" - sql "sync" - order_qt_select1 """select * from $table""" - - // begin and rollback - sql "begin" - sql """insert into $table values(1, 2.2, "abc", [], [])""" - sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" - sql "rollback" - sql "sync" - order_qt_select2 """select * from $table""" - - // begin 2 times and commit - sql "begin" - sql """insert into $table values(1, 2.2, "abc", [], [])""" - sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" - sql "begin" - sql """insert into $table values(1, 2.2, "abc", [], [])""" - sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" - sql "commit" - sql "sync" - order_qt_select3 """select * from $table""" - - // begin 2 times and rollback - sql "begin" - sql """insert into $table values(1, 2.2, "abc", [], [])""" - sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" - sql "begin" - sql """insert into $table values(1, 2.2, "abc", [], [])""" - sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" - sql "rollback" - sql "sync" - order_qt_select4 """select * from $table""" - - // write to table with mv - table = table + "_mv" - sql """ DROP TABLE IF EXISTS $table """ - sql """ - create table $table ( - id int default '10', - c1 int default '10' - ) distributed by hash(id, c1) - properties('replication_num'="1"); - """ - createMV """ create materialized view mv_${table} as select c1 from $table; """ - sql "begin" - sql """insert into $table values(1, 2), (3, 4)""" - sql """insert into $table values(5, 6)""" - sql """insert into $table values(7, 8)""" - sql "commit" - sql "sync" - order_qt_select5 """select * from $table""" - order_qt_select6 """select c1 from $table""" + for (def use_nereids_planner : [false, true]) { + sql " SET enable_nereids_planner = $use_nereids_planner; " + sql " SET enable_fallback_to_original_planner = false; " + + sql """ DROP TABLE IF EXISTS $table """ + sql """ + create table $table ( + k1 int, + k2 double, + k3 varchar(100), + k4 array<int>, + k5 array<boolean> + ) distributed by hash(k1) buckets 1 + properties("replication_num" = "1"); + """ + + // begin and commit + sql """begin""" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql """insert into $table values(null, null, null, [null], [null, 0])""" + sql "commit" + sql "sync" + order_qt_select1 """select * from $table""" + + // begin and rollback + sql "begin" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql "rollback" + sql "sync" + order_qt_select2 """select * from $table""" + + // begin 2 times and commit + sql "begin" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql "begin" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql "commit" + sql "sync" + order_qt_select3 """select * from $table""" + + // begin 2 times and rollback + sql "begin" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql "begin" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql "rollback" + sql "sync" + order_qt_select4 """select * from $table""" + + // write to table with mv + def tableMV = table + "_mv" + do { + sql """ DROP TABLE IF EXISTS $tableMV """ + sql """ + create table $tableMV ( + id int default '10', + c1 int default '10' + ) distributed by hash(id, c1) + properties('replication_num'="1"); + """ + createMV """ create materialized view mv_${tableMV} as select c1 from $tableMV; """ + sql "begin" + sql """insert into $tableMV values(1, 2), (3, 4)""" + sql """insert into $tableMV values(5, 6)""" + sql """insert into $tableMV values(7, 8)""" + sql "commit" + sql "sync" + order_qt_select5 """select * from $tableMV""" + order_qt_select6 """select c1 from $tableMV""" + } while (0); + + // ------------------- insert into select ------------------- + for (int j = 0; j < 3; j++) { + def tableName = table + "_" + j + sql """ DROP TABLE IF EXISTS $tableName """ + sql """ + create table $tableName ( + k1 int, + k2 double, + k3 varchar(100), + k4 array<int>, + k5 array<boolean> + ) distributed by hash(k1) buckets 1 + properties("replication_num" = "1"); + """ + } + + def result = sql """ show variables like 'enable_fallback_to_original_planner'; """ + logger.info("enable_fallback_to_original_planner: $result") + + // 1. insert into select to 3 tables: batch insert into select only supports nereids planner, and can't fallback + sql """ begin; """ + if (use_nereids_planner) { + sql """ insert into ${table}_0 select * from $table; """ + sql """ insert into ${table}_1 select * from $table; """ + sql """ insert into ${table}_2 select * from ${table}_0; """ + } else { + test { + sql """ insert into ${table}_0 select * from $table; """ + exception "Insert into ** select is not supported in a transaction" + } + } + sql """ commit; """ + sql "sync" + order_qt_select7 """select * from ${table}_0""" + order_qt_select8 """select * from ${table}_1""" + order_qt_select9 """select * from ${table}_2""" + + // 2. with different label + if (use_nereids_planner) { + def label = UUID.randomUUID().toString().replaceAll("-", "") + def label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql """ begin with label $label; """ + test { + sql """ insert into ${table}_0 with label $label2 select * from $table; """ + exception "Transaction insert expect label" + } + sql """ insert into ${table}_1 select * from $table; """ + sql """ insert into ${table}_2 select * from ${table}_0; """ + sql """ commit; """ + sql "sync" + order_qt_select10 """select * from ${table}_0""" + order_qt_select11 """select * from ${table}_1""" + order_qt_select12 """select * from ${table}_2""" + } + + // 3. insert into select and values + if (use_nereids_planner) { + sql """ begin; """ + sql """ insert into ${table}_0 select * from $table where k1 = 1; """ + test { + sql """insert into ${table}_1 values(1, 2.2, "abc", [], [])""" + exception "Transaction insert can not insert into values and insert into select at the same time" + } + sql """ insert into ${table}_1 select * from $table where k2 = 2.2 limit 1; """ + sql """ commit; """ + sql "sync" + order_qt_select13 """select * from ${table}_0""" + order_qt_select14 """select * from ${table}_1""" + order_qt_select15 """select * from ${table}_2""" + } + + // 4. insert into values and select + if (use_nereids_planner) { + sql """ begin; """ + sql """insert into ${table}_1 values(100, 2.2, "abc", [], [])""" + test { + sql """ insert into ${table}_0 select * from $table; """ + exception "Transaction insert can not insert into values and insert into select at the same time" + } + sql """insert into ${table}_1 values(101, 2.2, "abc", [], [])""" + sql """ commit; """ + sql "sync" + order_qt_select16 """select * from ${table}_0""" + order_qt_select17 """select * from ${table}_1""" + order_qt_select18 """select * from ${table}_2""" + } + + // 5. rollback + if (use_nereids_planner) { + def label = UUID.randomUUID().toString().replaceAll("-", "") + sql """ begin with label $label; """ + sql """ insert into ${table}_0 select * from $table where k1 = 1; """ + sql """ insert into ${table}_1 select * from $table where k2 = 2.2 limit 1; """ + sql """ rollback; """ + logger.info("rollback $label") + sql "sync" + order_qt_select19 """select * from ${table}_0""" + order_qt_select20 """select * from ${table}_1""" + order_qt_select21 """select * from ${table}_2""" + } + + // 6. insert select with error + if (use_nereids_planner) { + sql """ begin; """ + test { + sql """ insert into ${table}_0 select * from $tableMV; """ + exception "insert into cols should be corresponding to the query output" + } + sql """ insert into ${table}_1 select * from $table where k2 = 2.2 limit 1; """ + sql """ commit; """ + sql "sync" + order_qt_select22 """select * from ${table}_0""" + order_qt_select23 """select * from ${table}_1""" + order_qt_select24 """select * from ${table}_2""" + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org