xtern commented on code in PR #2789:
URL: https://github.com/apache/ignite-3/pull/2789#discussion_r1390838756


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -535,4 +604,179 @@ private static void validateParsedStatement(
             }
         }
     }
+
+    private class MultiStatementHandler {
+        private final String schemaName;
+        private final IgniteTransactions transactions;
+        private final @Nullable InternalTransaction explicitTransaction;
+        private final List<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
cursorFutures;
+        private final Queue<ScriptStatementParameters> statements;
+
+        MultiStatementHandler(
+                String schemaName,
+                IgniteTransactions transactions,
+                @Nullable InternalTransaction explicitTransaction,
+                List<ParsedResult> parsedResults,
+                Object[] params
+        ) {
+            this.schemaName = schemaName;
+            this.transactions = transactions;
+            this.explicitTransaction = explicitTransaction;
+
+            List<ScriptStatementParameters> statementsList = 
splitDynamicParameters(parsedResults, params);
+
+            // All statements that need to be processed.
+            statements = new ArrayBlockingQueue<>(statementsList.size(), 
false, statementsList);
+
+            // Statement futures that return a cursor.
+            cursorFutures = statementsList.stream()
+                    .filter(data -> data.parsedResult.queryType() != 
SqlQueryType.TX_CONTROL)
+                    .map(data -> data.cursorFuture)
+                    .collect(toUnmodifiableList());
+        }
+
+        AsyncSqlCursorIterator<List<Object>> asyncCursorIterator() {
+            Iterator<CompletableFuture<AsyncSqlCursor<List<Object>>>> delegate 
= cursorFutures.iterator();
+
+            return new AsyncSqlCursorIterator<>() {
+                @Override
+                public boolean hasNext() {
+                    return delegate.hasNext();
+                }
+
+                @Override
+                public CompletableFuture<AsyncSqlCursor<List<Object>>> next() {
+                    return delegate.next();
+                }
+            };
+        }
+
+        void processNext() {
+            ScriptStatementParameters parameters = statements.poll();
+
+            if (parameters == null) {
+                return;
+            }
+
+            ParsedResult parsedResult = parameters.parsedResult;
+            Object[] dynamicParams = parameters.dynamicParams;
+            CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture = 
parameters.cursorFuture;
+
+            try {
+                // TODO https://issues.apache.org/jira/browse/IGNITE-20463
+                if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
+                    taskExecutor.execute(this::processNext);
+                    return;
+                }
+
+                if (cursorFuture.isDone()) {
+                    return;
+                }
+
+                QueryTransactionWrapper txWrapper = 
wrapTxOrStartImplicit(parsedResult.queryType(), transactions, 
explicitTransaction);
+
+                executeParsedStatementAndWaitPrefetch(txWrapper, parsedResult, 
dynamicParams)
+                        .whenComplete((res, ex) -> {
+                            if (ex != null) {
+                                txWrapper.rollback();
+                                cursorFuture.completeExceptionally(ex);
+                                cancelAll(ex);
+
+                                return;
+                            }
+
+                            if (parsedResult.queryType() == SqlQueryType.DML 
|| parsedResult.queryType() == SqlQueryType.DDL) {
+                                txWrapper.commitImplicit();
+                            }
+
+                            cursorFuture.complete(res);
+                        });
+            } catch (Exception e) {
+                cursorFuture.completeExceptionally(e);
+
+                cancelAll(e);
+            }
+        }
+
+        private CompletableFuture<AsyncSqlCursor<List<Object>>> 
executeParsedStatementAndWaitPrefetch(
+                QueryTransactionWrapper txWrapper,
+                ParsedResult parsedResult,
+                Object[] dynamicParams
+        ) {
+            CompletableFuture<Void> prefetchFut = new CompletableFuture<>();
+            QueryCancel cancel = new QueryCancel();
+
+            return executeParsedStatement(schemaName, parsedResult, txWrapper, 
cancel, dynamicParams, new PrefetchCallback(prefetchFut))
+                    // Wait for the prefetch, otherwise the user will be able 
to execute
+                    // a dependent statement before the current one completes.
+                    .thenCompose(cur -> prefetchFut.thenApply(ignored -> cur));
+        }
+
+        private List<ScriptStatementParameters> 
splitDynamicParameters(List<ParsedResult> parsedResults, Object[] params) {

Review Comment:
   Thank you, completely missed this. 
   Fixed. Adjustment is done now in `ScriptParseResult`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to