korlov42 commented on code in PR #2789:
URL: https://github.com/apache/ignite-3/pull/2789#discussion_r1389042011
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java:
##########
@@ -53,4 +54,33 @@ CompletableFuture<AsyncSqlCursor<List<Object>>>
querySingleAsync(
String qry,
Object... params
);
+
+ /**
+ * Execute the multi-statement query with given schema name and parameters.
+ *
+ * @param properties User query properties. See {@link QueryProperty} for
available properties.
+ * @param transactions Transactions facade.
+ * @param transaction A transaction to use for query execution. If null,
an implicit transaction
+ * will be started by provided transactions facade.
+ * @param qry Single statement SQL query.
Review Comment:
> @param qry Single statement SQL query.
Multi
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -535,4 +604,179 @@ private static void validateParsedStatement(
}
}
}
+
+ private class MultiStatementHandler {
+ private final String schemaName;
+ private final IgniteTransactions transactions;
+ private final @Nullable InternalTransaction explicitTransaction;
+ private final List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
cursorFutures;
+ private final Queue<ScriptStatementParameters> statements;
+
+ MultiStatementHandler(
+ String schemaName,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction explicitTransaction,
+ List<ParsedResult> parsedResults,
+ Object[] params
+ ) {
+ this.schemaName = schemaName;
+ this.transactions = transactions;
+ this.explicitTransaction = explicitTransaction;
+
+ List<ScriptStatementParameters> statementsList =
splitDynamicParameters(parsedResults, params);
+
+ // All statements that need to be processed.
+ statements = new ArrayBlockingQueue<>(statementsList.size(),
false, statementsList);
+
+ // Statement futures that return a cursor.
+ cursorFutures = statementsList.stream()
+ .filter(data -> data.parsedResult.queryType() !=
SqlQueryType.TX_CONTROL)
+ .map(data -> data.cursorFuture)
+ .collect(toUnmodifiableList());
+ }
+
+ AsyncSqlCursorIterator<List<Object>> asyncCursorIterator() {
+ Iterator<CompletableFuture<AsyncSqlCursor<List<Object>>>> delegate
= cursorFutures.iterator();
+
+ return new AsyncSqlCursorIterator<>() {
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>> next() {
+ return delegate.next();
+ }
+ };
+ }
+
+ void processNext() {
+ ScriptStatementParameters parameters = statements.poll();
+
+ if (parameters == null) {
+ return;
+ }
+
+ ParsedResult parsedResult = parameters.parsedResult;
+ Object[] dynamicParams = parameters.dynamicParams;
+ CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture =
parameters.cursorFuture;
+
+ try {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20463
+ if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
+ taskExecutor.execute(this::processNext);
+ return;
+ }
+
+ if (cursorFuture.isDone()) {
+ return;
+ }
+
+ QueryTransactionWrapper txWrapper =
wrapTxOrStartImplicit(parsedResult.queryType(), transactions,
explicitTransaction);
+
+ executeParsedStatementAndWaitPrefetch(txWrapper, parsedResult,
dynamicParams)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ txWrapper.rollback();
+ cursorFuture.completeExceptionally(ex);
+ cancelAll(ex);
+
+ return;
+ }
+
+ if (parsedResult.queryType() == SqlQueryType.DML
|| parsedResult.queryType() == SqlQueryType.DDL) {
+ txWrapper.commitImplicit();
+ }
+
+ cursorFuture.complete(res);
+ });
+ } catch (Exception e) {
+ cursorFuture.completeExceptionally(e);
+
+ cancelAll(e);
+ }
+ }
+
+ private CompletableFuture<AsyncSqlCursor<List<Object>>>
executeParsedStatementAndWaitPrefetch(
+ QueryTransactionWrapper txWrapper,
+ ParsedResult parsedResult,
+ Object[] dynamicParams
+ ) {
+ CompletableFuture<Void> prefetchFut = new CompletableFuture<>();
+ QueryCancel cancel = new QueryCancel();
+
+ return executeParsedStatement(schemaName, parsedResult, txWrapper,
cancel, dynamicParams, new PrefetchCallback(prefetchFut))
+ // Wait for the prefetch, otherwise the user will be able
to execute
+ // a dependent statement before the current one completes.
+ .thenCompose(cur -> prefetchFut.thenApply(ignored -> cur));
+ }
+
+ private List<ScriptStatementParameters>
splitDynamicParameters(List<ParsedResult> parsedResults, Object[] params) {
+ int totalDynamicParams =
parsedResults.stream().mapToInt(ParsedResult::dynamicParamsCount).sum();
+
+ List<ScriptStatementParameters> results = new
ArrayList<>(parsedResults.size());
+
+ validateDynamicParameters(totalDynamicParams, params);
+
+ int pos = 0;
+
+ for (ParsedResult result : parsedResults) {
+ Object[] params0 = Arrays.copyOfRange(params, pos, pos +
result.dynamicParamsCount());
+
+ results.add(new ScriptStatementParameters(result, params0));
+
+ pos += result.dynamicParamsCount();
+ }
+
+ return Collections.unmodifiableList(results);
+ }
+
+ private void cancelAll(Throwable cause) {
+ for (ScriptStatementParameters parameters : statements) {
+ CompletableFuture<AsyncSqlCursor<List<Object>>> fut =
parameters.cursorFuture;
+
+ if (fut == null || fut.isDone()) {
+ continue;
+ }
+
+ fut.completeExceptionally(new SqlException(
+ EXECUTION_CANCELLED_ERR,
+ "The script statement execution was canceled due to an
error in the previous statement.",
+ cause
+ ));
+ }
+ }
+
+ private class PrefetchCallback implements QueryPrefetchCallback {
+ private final CompletableFuture<Void> prefetchFuture;
+
+ PrefetchCallback(CompletableFuture<Void> prefetchFuture) {
+ this.prefetchFuture = prefetchFuture;
+ }
+
+ @Override
+ public void onPrefetchComplete(@Nullable Throwable ex) {
+ if (ex == null) {
+ prefetchFuture.complete(null);
+
+ processNext();
Review Comment:
It's better to keep all decision in a single place: if we make decision to
proceed execution in PrefetchCallback, then we should interrupt execution in
case of any error also here... Or we can move invocation of `processNext()` to
the MultiStatementHandler
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -535,4 +604,179 @@ private static void validateParsedStatement(
}
}
}
+
+ private class MultiStatementHandler {
+ private final String schemaName;
+ private final IgniteTransactions transactions;
+ private final @Nullable InternalTransaction explicitTransaction;
+ private final List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
cursorFutures;
+ private final Queue<ScriptStatementParameters> statements;
+
+ MultiStatementHandler(
+ String schemaName,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction explicitTransaction,
+ List<ParsedResult> parsedResults,
+ Object[] params
+ ) {
+ this.schemaName = schemaName;
+ this.transactions = transactions;
+ this.explicitTransaction = explicitTransaction;
+
+ List<ScriptStatementParameters> statementsList =
splitDynamicParameters(parsedResults, params);
+
+ // All statements that need to be processed.
+ statements = new ArrayBlockingQueue<>(statementsList.size(),
false, statementsList);
+
+ // Statement futures that return a cursor.
+ cursorFutures = statementsList.stream()
+ .filter(data -> data.parsedResult.queryType() !=
SqlQueryType.TX_CONTROL)
+ .map(data -> data.cursorFuture)
+ .collect(toUnmodifiableList());
+ }
+
+ AsyncSqlCursorIterator<List<Object>> asyncCursorIterator() {
+ Iterator<CompletableFuture<AsyncSqlCursor<List<Object>>>> delegate
= cursorFutures.iterator();
+
+ return new AsyncSqlCursorIterator<>() {
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>> next() {
+ return delegate.next();
+ }
+ };
+ }
+
+ void processNext() {
+ ScriptStatementParameters parameters = statements.poll();
+
+ if (parameters == null) {
+ return;
+ }
+
+ ParsedResult parsedResult = parameters.parsedResult;
+ Object[] dynamicParams = parameters.dynamicParams;
+ CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture =
parameters.cursorFuture;
+
+ try {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20463
+ if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
+ taskExecutor.execute(this::processNext);
+ return;
+ }
+
+ if (cursorFuture.isDone()) {
+ return;
+ }
+
+ QueryTransactionWrapper txWrapper =
wrapTxOrStartImplicit(parsedResult.queryType(), transactions,
explicitTransaction);
+
+ executeParsedStatementAndWaitPrefetch(txWrapper, parsedResult,
dynamicParams)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ txWrapper.rollback();
+ cursorFuture.completeExceptionally(ex);
+ cancelAll(ex);
+
+ return;
+ }
+
+ if (parsedResult.queryType() == SqlQueryType.DML
|| parsedResult.queryType() == SqlQueryType.DDL) {
+ txWrapper.commitImplicit();
+ }
+
+ cursorFuture.complete(res);
+ });
+ } catch (Exception e) {
+ cursorFuture.completeExceptionally(e);
+
+ cancelAll(e);
+ }
+ }
+
+ private CompletableFuture<AsyncSqlCursor<List<Object>>>
executeParsedStatementAndWaitPrefetch(
+ QueryTransactionWrapper txWrapper,
+ ParsedResult parsedResult,
+ Object[] dynamicParams
+ ) {
+ CompletableFuture<Void> prefetchFut = new CompletableFuture<>();
+ QueryCancel cancel = new QueryCancel();
+
+ return executeParsedStatement(schemaName, parsedResult, txWrapper,
cancel, dynamicParams, new PrefetchCallback(prefetchFut))
+ // Wait for the prefetch, otherwise the user will be able
to execute
+ // a dependent statement before the current one completes.
+ .thenCompose(cur -> prefetchFut.thenApply(ignored -> cur));
+ }
+
+ private List<ScriptStatementParameters>
splitDynamicParameters(List<ParsedResult> parsedResults, Object[] params) {
+ int totalDynamicParams =
parsedResults.stream().mapToInt(ParsedResult::dynamicParamsCount).sum();
+
+ List<ScriptStatementParameters> results = new
ArrayList<>(parsedResults.size());
+
+ validateDynamicParameters(totalDynamicParams, params);
+
+ int pos = 0;
+
+ for (ParsedResult result : parsedResults) {
+ Object[] params0 = Arrays.copyOfRange(params, pos, pos +
result.dynamicParamsCount());
+
+ results.add(new ScriptStatementParameters(result, params0));
+
+ pos += result.dynamicParamsCount();
+ }
+
+ return Collections.unmodifiableList(results);
+ }
+
+ private void cancelAll(Throwable cause) {
+ for (ScriptStatementParameters parameters : statements) {
+ CompletableFuture<AsyncSqlCursor<List<Object>>> fut =
parameters.cursorFuture;
+
+ if (fut == null || fut.isDone()) {
+ continue;
+ }
+
+ fut.completeExceptionally(new SqlException(
+ EXECUTION_CANCELLED_ERR,
+ "The script statement execution was canceled due to an
error in the previous statement.",
Review Comment:
nitpicking: I would omit first `statement` in the message
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -535,4 +604,179 @@ private static void validateParsedStatement(
}
}
}
+
+ private class MultiStatementHandler {
+ private final String schemaName;
+ private final IgniteTransactions transactions;
+ private final @Nullable InternalTransaction explicitTransaction;
+ private final List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
cursorFutures;
+ private final Queue<ScriptStatementParameters> statements;
+
+ MultiStatementHandler(
+ String schemaName,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction explicitTransaction,
+ List<ParsedResult> parsedResults,
+ Object[] params
+ ) {
+ this.schemaName = schemaName;
+ this.transactions = transactions;
+ this.explicitTransaction = explicitTransaction;
+
+ List<ScriptStatementParameters> statementsList =
splitDynamicParameters(parsedResults, params);
+
+ // All statements that need to be processed.
+ statements = new ArrayBlockingQueue<>(statementsList.size(),
false, statementsList);
+
+ // Statement futures that return a cursor.
+ cursorFutures = statementsList.stream()
+ .filter(data -> data.parsedResult.queryType() !=
SqlQueryType.TX_CONTROL)
+ .map(data -> data.cursorFuture)
+ .collect(toUnmodifiableList());
+ }
+
+ AsyncSqlCursorIterator<List<Object>> asyncCursorIterator() {
+ Iterator<CompletableFuture<AsyncSqlCursor<List<Object>>>> delegate
= cursorFutures.iterator();
+
+ return new AsyncSqlCursorIterator<>() {
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>> next() {
+ return delegate.next();
+ }
+ };
+ }
+
+ void processNext() {
+ ScriptStatementParameters parameters = statements.poll();
+
+ if (parameters == null) {
+ return;
+ }
+
+ ParsedResult parsedResult = parameters.parsedResult;
+ Object[] dynamicParams = parameters.dynamicParams;
+ CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture =
parameters.cursorFuture;
+
+ try {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20463
+ if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
+ taskExecutor.execute(this::processNext);
+ return;
+ }
+
+ if (cursorFuture.isDone()) {
+ return;
+ }
+
+ QueryTransactionWrapper txWrapper =
wrapTxOrStartImplicit(parsedResult.queryType(), transactions,
explicitTransaction);
+
+ executeParsedStatementAndWaitPrefetch(txWrapper, parsedResult,
dynamicParams)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ txWrapper.rollback();
+ cursorFuture.completeExceptionally(ex);
+ cancelAll(ex);
+
+ return;
+ }
+
+ if (parsedResult.queryType() == SqlQueryType.DML
|| parsedResult.queryType() == SqlQueryType.DDL) {
+ txWrapper.commitImplicit();
Review Comment:
why do we commit transaction only in case of DML | DDL statements?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -535,4 +604,179 @@ private static void validateParsedStatement(
}
}
}
+
+ private class MultiStatementHandler {
+ private final String schemaName;
+ private final IgniteTransactions transactions;
+ private final @Nullable InternalTransaction explicitTransaction;
+ private final List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
cursorFutures;
+ private final Queue<ScriptStatementParameters> statements;
+
+ MultiStatementHandler(
+ String schemaName,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction explicitTransaction,
+ List<ParsedResult> parsedResults,
+ Object[] params
+ ) {
+ this.schemaName = schemaName;
+ this.transactions = transactions;
+ this.explicitTransaction = explicitTransaction;
+
+ List<ScriptStatementParameters> statementsList =
splitDynamicParameters(parsedResults, params);
+
+ // All statements that need to be processed.
+ statements = new ArrayBlockingQueue<>(statementsList.size(),
false, statementsList);
+
+ // Statement futures that return a cursor.
+ cursorFutures = statementsList.stream()
+ .filter(data -> data.parsedResult.queryType() !=
SqlQueryType.TX_CONTROL)
+ .map(data -> data.cursorFuture)
+ .collect(toUnmodifiableList());
+ }
+
+ AsyncSqlCursorIterator<List<Object>> asyncCursorIterator() {
+ Iterator<CompletableFuture<AsyncSqlCursor<List<Object>>>> delegate
= cursorFutures.iterator();
+
+ return new AsyncSqlCursorIterator<>() {
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>> next() {
+ return delegate.next();
+ }
+ };
+ }
+
+ void processNext() {
+ ScriptStatementParameters parameters = statements.poll();
+
+ if (parameters == null) {
+ return;
+ }
+
+ ParsedResult parsedResult = parameters.parsedResult;
+ Object[] dynamicParams = parameters.dynamicParams;
+ CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture =
parameters.cursorFuture;
+
+ try {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20463
+ if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
+ taskExecutor.execute(this::processNext);
+ return;
+ }
+
+ if (cursorFuture.isDone()) {
+ return;
+ }
+
+ QueryTransactionWrapper txWrapper =
wrapTxOrStartImplicit(parsedResult.queryType(), transactions,
explicitTransaction);
+
+ executeParsedStatementAndWaitPrefetch(txWrapper, parsedResult,
dynamicParams)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ txWrapper.rollback();
+ cursorFuture.completeExceptionally(ex);
+ cancelAll(ex);
+
+ return;
+ }
+
+ if (parsedResult.queryType() == SqlQueryType.DML
|| parsedResult.queryType() == SqlQueryType.DDL) {
+ txWrapper.commitImplicit();
+ }
+
+ cursorFuture.complete(res);
+ });
+ } catch (Exception e) {
+ cursorFuture.completeExceptionally(e);
+
+ cancelAll(e);
+ }
+ }
+
+ private CompletableFuture<AsyncSqlCursor<List<Object>>>
executeParsedStatementAndWaitPrefetch(
+ QueryTransactionWrapper txWrapper,
+ ParsedResult parsedResult,
+ Object[] dynamicParams
+ ) {
+ CompletableFuture<Void> prefetchFut = new CompletableFuture<>();
+ QueryCancel cancel = new QueryCancel();
+
+ return executeParsedStatement(schemaName, parsedResult, txWrapper,
cancel, dynamicParams, new PrefetchCallback(prefetchFut))
+ // Wait for the prefetch, otherwise the user will be able
to execute
+ // a dependent statement before the current one completes.
+ .thenCompose(cur -> prefetchFut.thenApply(ignored -> cur));
+ }
+
+ private List<ScriptStatementParameters>
splitDynamicParameters(List<ParsedResult> parsedResults, Object[] params) {
Review Comment:
I'm wondering whether you have decided to split dynamic params before or
after the test `queryWithDynamicParameters` has failed...
The problem here is that splitting changes the indices of particular
parameter. See usage of `IgniteMethod#CONTEXT_GET_PARAMETER_VALUE`, we use`
dynamicParam.getIndex()` to lookup value of a param. But we can't omit
splitting either, because for plans from a plan cache the indices will be
broken as well...
I think, the proper way to address this problem will be changing
ParserServiceImpl: let's adjust indices of SqlDynamicParam after parsing the
script.
To see the problem I'm talking about, just change script in
`queryWithDynamicParameters` like this:
```
CREATE TABLE test (id INT PRIMARY KEY, val INT DEFAULT 3);
INSERT INTO test VALUES(?, ?);
INSERT INTO test VALUES(?);
```
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java:
##########
@@ -53,4 +54,33 @@ CompletableFuture<AsyncSqlCursor<List<Object>>>
querySingleAsync(
String qry,
Object... params
);
+
+ /**
+ * Execute the multi-statement query with given schema name and parameters.
+ *
+ * @param properties User query properties. See {@link QueryProperty} for
available properties.
+ * @param transactions Transactions facade.
+ * @param transaction A transaction to use for query execution. If null,
an implicit transaction
+ * will be started by provided transactions facade.
+ * @param qry Single statement SQL query.
+ * @param params Query parameters.
+ * @return Sql cursor.
+ *
+ * @throws IgniteException in case of an error.
+ */
+ CompletableFuture<AsyncSqlCursorIterator<List<Object>>> queryScriptAsync(
Review Comment:
looks creepy :)
perhaps, it would be better to introduce AsyncIterator with semantic similar
to AsyncResultSet (which is, in fact, async iterator indeed):
```
interface AsyncIterator<T> {
boolean hasNext();
CompletableFuture<AsyncIterator<T>> fetchNext();
T current();
}
AsyncIterator<AsyncSqlCursor<List<Object>>> queryScriptAsync(...);
```
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]