xtern commented on code in PR #2846:
URL: https://github.com/apache/ignite-3/pull/2846#discussion_r1422280021
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -682,66 +661,98 @@ private static void validateDynamicParameters(int
expectedParamsCount, Object[]
}
}
+ /** Returns count of opened cursors. */
+ @TestOnly
+ public int openedCursors() {
+ return openedCursors.size();
+ }
+
private class MultiStatementHandler {
private final String schemaName;
- private final IgniteTransactions transactions;
- private final @Nullable InternalTransaction explicitTransaction;
- private final Queue<ScriptStatementParameters> statements;
+ private final Queue<ScriptStatement> statements;
+ private final ScriptTransactionContext scriptTxCtx;
MultiStatementHandler(
String schemaName,
- IgniteTransactions transactions,
- @Nullable InternalTransaction explicitTransaction,
+ QueryTransactionContext txCtx,
List<ParsedResult> parsedResults,
Object[] params
) {
this.schemaName = schemaName;
- this.transactions = transactions;
- this.explicitTransaction = explicitTransaction;
this.statements = prepareStatementsQueue(parsedResults, params);
+ this.scriptTxCtx = new ScriptTransactionContext(txCtx);
}
- CompletableFuture<AsyncSqlCursor<InternalSqlRow>> processNext() {
- if (statements == null) {
- // TODO https://issues.apache.org/jira/browse/IGNITE-20463
Each tx control statement must return an empty cursor.
- return nullCompletedFuture();
+ /**
+ * Returns a queue. each element of which represents parameters
required to execute a single statement of the script.
+ */
+ private Queue<ScriptStatement>
prepareStatementsQueue(List<ParsedResult> parsedResults, Object[] params) {
+ assert !parsedResults.isEmpty();
+
+ int paramsCount =
parsedResults.stream().mapToInt(ParsedResult::dynamicParamsCount).sum();
+
+ validateDynamicParameters(paramsCount, params);
+
+ ScriptStatement[] results = new
ScriptStatement[parsedResults.size()];
+
+ // We fill parameters in reverse order, because each script
statement
+ // requires a reference to the future of the next statement.
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> prevCursorFuture
= null;
+ for (int i = parsedResults.size() - 1; i >= 0; i--) {
+ ParsedResult result = parsedResults.get(i);
+
+ Object[] params0 = Arrays.copyOfRange(params, paramsCount -
result.dynamicParamsCount(), paramsCount);
+ paramsCount -= result.dynamicParamsCount();
+
+ results[i] = new ScriptStatement(result, params0,
prevCursorFuture);
+ prevCursorFuture = results[i].cursorFuture;
}
- ScriptStatementParameters parameters = statements.poll();
+ return new ArrayBlockingQueue<>(results.length, false,
List.of(results));
+ }
+
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> processNext() {
+ ScriptStatement scriptStatement = statements.poll();
- assert parameters != null;
+ assert scriptStatement != null;
- ParsedResult parsedResult = parameters.parsedResult;
- Object[] dynamicParams = parameters.dynamicParams;
- CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture =
parameters.cursorFuture;
- CompletableFuture<AsyncSqlCursor<InternalSqlRow>> nextCursorFuture
= parameters.nextStatementFuture;
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture =
scriptStatement.cursorFuture;
try {
if (cursorFuture.isDone()) {
return cursorFuture;
}
- QueryTransactionWrapper txWrapper =
wrapTxOrStartImplicit(parsedResult.queryType(), transactions,
explicitTransaction);
+ ParsedResult parsedResult = scriptStatement.parsedResult;
- QueryCancel cancel = new QueryCancel();
+ QueryTransactionWrapper txWrapper =
scriptTxCtx.getOrStartImplicit(parsedResult.queryType());
- executeParsedStatement(schemaName, parsedResult, txWrapper,
cancel, dynamicParams, true, nextCursorFuture)
- .whenComplete((res, ex) -> {
+ scriptTxCtx.registerCursor(parsedResult.queryType(),
cursorFuture);
+
+ executeStatement(parsedResult, txWrapper,
scriptStatement.dynamicParams, scriptStatement.nextStatementFuture)
Review Comment:
I don't quite understand the case you describe.
The implicit transaction at line 728 can only be started if this is NOT a
control statement.
For TX control statement we return NOOP wrapper and transaction will be
started/committed below (line 771)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]