korlov42 commented on code in PR #5143: URL: https://github.com/apache/ignite-3/pull/5143#discussion_r1939488412
########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java: ########## @@ -63,35 +65,64 @@ public DdlCommandHandler( this.clockService = clockService; } + /** + * Submits given list of commands at once. + * + * <p>The whole list is submitted atomically. The result of applying of any individual command in case of conditional statements + * may be checked by calling {@link CatalogApplyResult#isApplied(int)} providing 0-based index of command in question. If exception + * is thrown during processing of any command from batch, then none of the commands will be applied. + * + * @param batch A batch of command to execute. + * @return Future containing result of applying a list of commands to catalog. + */ + public CompletableFuture<CatalogApplyResult> handle(List<CatalogCommand> batch) { + CompletableFuture<CatalogApplyResult> fut = catalogManager.execute(batch); + + List<AbstractCreateIndexCommand> createIndexCommands = batch.stream() + .filter(AbstractCreateIndexCommand.class::isInstance) + .map(AbstractCreateIndexCommand.class::cast) + .collect(Collectors.toList()); + + if (createIndexCommands.isEmpty()) { + return fut; + } + + return fut.thenCompose(applyResult -> + inBusyLock(busyLock, () -> { + List<CompletableFuture<CatalogApplyResult>> toWait = createIndexCommands.stream() + .map(cmd -> waitTillIndexBecomesAvailableOrRemoved(cmd, applyResult)) + .collect(Collectors.toList()); + + return CompletableFutures.allOf(toWait).thenApply(none -> applyResult); + }) + ); + } + /** * Handles ddl commands. * * @param cmd Catalog command. - * @return Future representing pending completion of the operation. If the command execution resulted in a modification of the catalog, - * the result will be the activation timestamp of the new catalog version, if the command did not result in a change of the - * catalog, the result will be {@code null}. + * @return Future containing result of applying a commands to catalog. */ - public CompletableFuture<@Nullable Long> handle(CatalogCommand cmd) { + public CompletableFuture<CatalogApplyResult> handle(CatalogCommand cmd) { CompletableFuture<CatalogApplyResult> fut = catalogManager.execute(cmd); if (cmd instanceof AbstractCreateIndexCommand) { fut = fut.thenCompose(applyResult -> inBusyLock(busyLock, () -> waitTillIndexBecomesAvailableOrRemoved((AbstractCreateIndexCommand) cmd, applyResult))); } - return fut.thenApply(applyResult -> applyResult.isApplied(0) ? applyResult.getCatalogTime() : null); + return fut; } - private CompletionStage<CatalogApplyResult> waitTillIndexBecomesAvailableOrRemoved( + private CompletableFuture<CatalogApplyResult> waitTillIndexBecomesAvailableOrRemoved( AbstractCreateIndexCommand cmd, CatalogApplyResult catalogApplyResult ) { if (!catalogApplyResult.isApplied(0)) { Review Comment: good catch! Fixed ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java: ########## @@ -241,6 +254,229 @@ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> executeChildQuery( }); } + CompletableFuture<AsyncSqlCursor<InternalSqlRow>> executeChildBatch( + Query parent, + QueryTransactionContext scriptTxContext, + int batchOffset, + List<ParsedResultWithNextCursorFuture> batch + ) { + if (IgniteUtils.assertionsEnabled()) { + int offsetInBatch = 0; + for (ParsedResultWithNextCursorFuture item : batch) { + assert item.parsedQuery.queryType() == SqlQueryType.DDL + : item.parsedQuery.queryType() + " at statement #" + (batchOffset + offsetInBatch); + + offsetInBatch++; + } + } + + if (!busyLock.enterBusy()) { + return failedFuture(new NodeStoppingException()); + } + + List<Query> queries = new ArrayList<>(batch.size()); + + try { + int offsetInBatch = 0; + for (ParsedResultWithNextCursorFuture item : batch) { + Query query = new Query( + Instant.ofEpochMilli(clockService.now().getPhysical()), + parent, + item.parsedQuery, + batchOffset + offsetInBatch, + idGenerator.next(), + scriptTxContext, + ArrayUtils.OBJECT_EMPTY_ARRAY, + item.nextCursorFuture + ); + + offsetInBatch++; + + trackQuery(query, null); + + queries.add(query); + + parent.cancel.attach(query.cancel); + } + } catch (QueryCancelledException ex) { + queries.forEach(query -> query.onError(ex)); + + return failedFuture(ex); + } finally { + busyLock.leaveBusy(); + } + + List<CompletableFuture<?>> preparedQueryFutures = new ArrayList<>(batch.size()); + for (Query query : queries) { + preparedQueryFutures.add(Programs.SCRIPT_ITEM_PREPARATION.run(query)); + } + + return CompletableFutures.allOf(preparedQueryFutures) + .handle((none, ignored) -> { + List<DdlPlan> dllPlans = new ArrayList<>(); Review Comment: thanks, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org