xtern commented on code in PR #2789:
URL: https://github.com/apache/ignite-3/pull/2789#discussion_r1390841849
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -535,4 +604,179 @@ private static void validateParsedStatement(
}
}
}
+
+ private class MultiStatementHandler {
+ private final String schemaName;
+ private final IgniteTransactions transactions;
+ private final @Nullable InternalTransaction explicitTransaction;
+ private final List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
cursorFutures;
+ private final Queue<ScriptStatementParameters> statements;
+
+ MultiStatementHandler(
+ String schemaName,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction explicitTransaction,
+ List<ParsedResult> parsedResults,
+ Object[] params
+ ) {
+ this.schemaName = schemaName;
+ this.transactions = transactions;
+ this.explicitTransaction = explicitTransaction;
+
+ List<ScriptStatementParameters> statementsList =
splitDynamicParameters(parsedResults, params);
+
+ // All statements that need to be processed.
+ statements = new ArrayBlockingQueue<>(statementsList.size(),
false, statementsList);
+
+ // Statement futures that return a cursor.
+ cursorFutures = statementsList.stream()
+ .filter(data -> data.parsedResult.queryType() !=
SqlQueryType.TX_CONTROL)
+ .map(data -> data.cursorFuture)
+ .collect(toUnmodifiableList());
+ }
+
+ AsyncSqlCursorIterator<List<Object>> asyncCursorIterator() {
+ Iterator<CompletableFuture<AsyncSqlCursor<List<Object>>>> delegate
= cursorFutures.iterator();
+
+ return new AsyncSqlCursorIterator<>() {
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>> next() {
+ return delegate.next();
+ }
+ };
+ }
+
+ void processNext() {
+ ScriptStatementParameters parameters = statements.poll();
+
+ if (parameters == null) {
+ return;
+ }
+
+ ParsedResult parsedResult = parameters.parsedResult;
+ Object[] dynamicParams = parameters.dynamicParams;
+ CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture =
parameters.cursorFuture;
+
+ try {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20463
+ if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
+ taskExecutor.execute(this::processNext);
+ return;
+ }
+
+ if (cursorFuture.isDone()) {
+ return;
+ }
+
+ QueryTransactionWrapper txWrapper =
wrapTxOrStartImplicit(parsedResult.queryType(), transactions,
explicitTransaction);
+
+ executeParsedStatementAndWaitPrefetch(txWrapper, parsedResult,
dynamicParams)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ txWrapper.rollback();
+ cursorFuture.completeExceptionally(ex);
+ cancelAll(ex);
+
+ return;
+ }
+
+ if (parsedResult.queryType() == SqlQueryType.DML
|| parsedResult.queryType() == SqlQueryType.DDL) {
+ txWrapper.commitImplicit();
+ }
+
+ cursorFuture.complete(res);
+ });
+ } catch (Exception e) {
+ cursorFuture.completeExceptionally(e);
+
+ cancelAll(e);
+ }
+ }
+
+ private CompletableFuture<AsyncSqlCursor<List<Object>>>
executeParsedStatementAndWaitPrefetch(
+ QueryTransactionWrapper txWrapper,
+ ParsedResult parsedResult,
+ Object[] dynamicParams
+ ) {
+ CompletableFuture<Void> prefetchFut = new CompletableFuture<>();
+ QueryCancel cancel = new QueryCancel();
+
+ return executeParsedStatement(schemaName, parsedResult, txWrapper,
cancel, dynamicParams, new PrefetchCallback(prefetchFut))
+ // Wait for the prefetch, otherwise the user will be able
to execute
+ // a dependent statement before the current one completes.
+ .thenCompose(cur -> prefetchFut.thenApply(ignored -> cur));
+ }
+
+ private List<ScriptStatementParameters>
splitDynamicParameters(List<ParsedResult> parsedResults, Object[] params) {
+ int totalDynamicParams =
parsedResults.stream().mapToInt(ParsedResult::dynamicParamsCount).sum();
+
+ List<ScriptStatementParameters> results = new
ArrayList<>(parsedResults.size());
+
+ validateDynamicParameters(totalDynamicParams, params);
+
+ int pos = 0;
+
+ for (ParsedResult result : parsedResults) {
+ Object[] params0 = Arrays.copyOfRange(params, pos, pos +
result.dynamicParamsCount());
+
+ results.add(new ScriptStatementParameters(result, params0));
+
+ pos += result.dynamicParamsCount();
+ }
+
+ return Collections.unmodifiableList(results);
+ }
+
+ private void cancelAll(Throwable cause) {
+ for (ScriptStatementParameters parameters : statements) {
+ CompletableFuture<AsyncSqlCursor<List<Object>>> fut =
parameters.cursorFuture;
+
+ if (fut == null || fut.isDone()) {
+ continue;
+ }
+
+ fut.completeExceptionally(new SqlException(
+ EXECUTION_CANCELLED_ERR,
+ "The script statement execution was canceled due to an
error in the previous statement.",
+ cause
+ ));
+ }
+ }
+
+ private class PrefetchCallback implements QueryPrefetchCallback {
+ private final CompletableFuture<Void> prefetchFuture;
+
+ PrefetchCallback(CompletableFuture<Void> prefetchFuture) {
+ this.prefetchFuture = prefetchFuture;
+ }
+
+ @Override
+ public void onPrefetchComplete(@Nullable Throwable ex) {
+ if (ex == null) {
+ prefetchFuture.complete(null);
+
+ processNext();
Review Comment:
Thanks, `processNext()` moved to `CompletableFuture` listener.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]