xtern commented on code in PR #2789:
URL: https://github.com/apache/ignite-3/pull/2789#discussion_r1390840535


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -535,4 +604,179 @@ private static void validateParsedStatement(
             }
         }
     }
+
+    private class MultiStatementHandler {
+        private final String schemaName;
+        private final IgniteTransactions transactions;
+        private final @Nullable InternalTransaction explicitTransaction;
+        private final List<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
cursorFutures;
+        private final Queue<ScriptStatementParameters> statements;
+
+        MultiStatementHandler(
+                String schemaName,
+                IgniteTransactions transactions,
+                @Nullable InternalTransaction explicitTransaction,
+                List<ParsedResult> parsedResults,
+                Object[] params
+        ) {
+            this.schemaName = schemaName;
+            this.transactions = transactions;
+            this.explicitTransaction = explicitTransaction;
+
+            List<ScriptStatementParameters> statementsList = 
splitDynamicParameters(parsedResults, params);
+
+            // All statements that need to be processed.
+            statements = new ArrayBlockingQueue<>(statementsList.size(), 
false, statementsList);
+
+            // Statement futures that return a cursor.
+            cursorFutures = statementsList.stream()
+                    .filter(data -> data.parsedResult.queryType() != 
SqlQueryType.TX_CONTROL)
+                    .map(data -> data.cursorFuture)
+                    .collect(toUnmodifiableList());
+        }
+
+        AsyncSqlCursorIterator<List<Object>> asyncCursorIterator() {
+            Iterator<CompletableFuture<AsyncSqlCursor<List<Object>>>> delegate 
= cursorFutures.iterator();
+
+            return new AsyncSqlCursorIterator<>() {
+                @Override
+                public boolean hasNext() {
+                    return delegate.hasNext();
+                }
+
+                @Override
+                public CompletableFuture<AsyncSqlCursor<List<Object>>> next() {
+                    return delegate.next();
+                }
+            };
+        }
+
+        void processNext() {
+            ScriptStatementParameters parameters = statements.poll();
+
+            if (parameters == null) {
+                return;
+            }
+
+            ParsedResult parsedResult = parameters.parsedResult;
+            Object[] dynamicParams = parameters.dynamicParams;
+            CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture = 
parameters.cursorFuture;
+
+            try {
+                // TODO https://issues.apache.org/jira/browse/IGNITE-20463
+                if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
+                    taskExecutor.execute(this::processNext);
+                    return;
+                }
+
+                if (cursorFuture.isDone()) {
+                    return;
+                }
+
+                QueryTransactionWrapper txWrapper = 
wrapTxOrStartImplicit(parsedResult.queryType(), transactions, 
explicitTransaction);
+
+                executeParsedStatementAndWaitPrefetch(txWrapper, parsedResult, 
dynamicParams)
+                        .whenComplete((res, ex) -> {
+                            if (ex != null) {
+                                txWrapper.rollback();
+                                cursorFuture.completeExceptionally(ex);
+                                cancelAll(ex);
+
+                                return;
+                            }
+
+                            if (parsedResult.queryType() == SqlQueryType.DML 
|| parsedResult.queryType() == SqlQueryType.DDL) {
+                                txWrapper.commitImplicit();

Review Comment:
   :thinking:  Fixed: Implicit transaction is now committed after prefetching 
for all query types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to