This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 655cf0fbb5 IGNITE-21869 Prevent thread hijacking via IgniteSql (#3512)
655cf0fbb5 is described below

commit 655cf0fbb53a8f160bcd2891ae24b68335d57532
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Mon Apr 1 17:03:10 2024 +0400

    IGNITE-21869 Prevent thread hijacking via IgniteSql (#3512)
---
 .../org/apache/ignite/internal/app/IgniteImpl.java |  11 +-
 .../sql/threading/ItSqlApiThreadingTest.java       | 237 +++++++++++++++++++++
 .../internal/sql/api/AntiHijackAsyncResultSet.java |  74 +++++++
 .../internal/sql/api/AntiHijackIgniteSql.java      | 190 +++++++++++++++++
 .../ignite/internal/sql/api/IgniteSqlImpl.java     |   1 +
 5 files changed, 510 insertions(+), 3 deletions(-)

diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 91adcae0cb..b4ca5bf0b2 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -173,6 +173,7 @@ import 
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguratio
 import 
org.apache.ignite.internal.security.authentication.AuthenticationManager;
 import 
org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
 import org.apache.ignite.internal.security.configuration.SecurityConfiguration;
+import org.apache.ignite.internal.sql.api.AntiHijackIgniteSql;
 import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
 import 
org.apache.ignite.internal.sql.configuration.distributed.SqlDistributedConfiguration;
 import 
org.apache.ignite.internal.sql.configuration.local.SqlLocalConfiguration;
@@ -758,7 +759,7 @@ public class IgniteImpl implements Ignite {
                 catalogManager,
                 observableTimestampTracker,
                 placementDriverMgr.placementDriver(),
-                this::sql,
+                this::bareSql,
                 resourcesRegistry,
                 rebalanceScheduler,
                 lowWatermark,
@@ -1213,10 +1214,14 @@ public class IgniteImpl implements Ignite {
         return new AntiHijackIgniteTransactions(transactions, 
asyncContinuationExecutor);
     }
 
+    private IgniteSql bareSql() {
+        return sql;
+    }
+
     /** {@inheritDoc} */
     @Override
     public IgniteSql sql() {
-        return sql;
+        return new AntiHijackIgniteSql(sql, asyncContinuationExecutor);
     }
 
     /** {@inheritDoc} */
@@ -1251,7 +1256,7 @@ public class IgniteImpl implements Ignite {
 
     @Override
     public IgniteCatalog catalog(Options options) {
-        return new IgniteCatalogSqlImpl(sql(), options);
+        return new IgniteCatalogSqlImpl(sql, options);
     }
 
     /**
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
new file mode 100644
index 0000000000..ff6f5b841b
--- /dev/null
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.threading;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.ignite.internal.PublicApiThreadingTests.anIgniteThread;
+import static 
org.apache.ignite.internal.PublicApiThreadingTests.asyncContinuationPool;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.either;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.PublicApiThreadingTests;
+import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+@SuppressWarnings("resource")
+class ItSqlApiThreadingTest extends ClusterPerClassIntegrationTest {
+    private static final String TABLE_NAME = "test";
+
+    private static final String SELECT_QUERY = "SELECT * FROM " + TABLE_NAME;
+    private static final String UPDATE_QUERY = "UPDATE " + TABLE_NAME + " SET 
val = val WHERE id = ?";
+
+    private static final int MORE_THAN_DEFAULT_STATEMENT_PAGE_SIZE = 2048;
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @BeforeAll
+    void createTable() {
+        sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val 
VARCHAR)");
+
+        // Putting more than the doubled default query page size rows to make 
sure that CriteriaQuerySource#query() returns a non-closed
+        // cursor even after we call its second page.
+        // TODO: Instead, configure pageSize=1 on each #query() call when 
https://issues.apache.org/jira/browse/IGNITE-18647 is fixed.
+        Map<Integer, String> valuesForQuerying = IntStream.range(1, 1 + 2 * 
MORE_THAN_DEFAULT_STATEMENT_PAGE_SIZE)
+                .boxed()
+                .collect(toMap(identity(), Object::toString));
+        plainKeyValueView().putAll(null, valuesForQuerying);
+    }
+
+    private static KeyValueView<Integer, String> plainKeyValueView() {
+        return testTable().keyValueView(Integer.class, String.class);
+    }
+
+    private static Table testTable() {
+        return CLUSTER.aliveNode().tables().table(TABLE_NAME);
+    }
+
+    private static IgniteSql igniteSqlForInternalUse() {
+        return Wrappers.unwrap(igniteSqlForPublicUse(), IgniteSqlImpl.class);
+    }
+
+    @ParameterizedTest
+    @EnumSource(SqlAsyncOperation.class)
+    void sqlFuturesCompleteInContinuationsPool(SqlAsyncOperation operation) {
+        CompletableFuture<Thread> completerFuture = 
forcingSwitchFromUserThread(
+                () -> operation.executeOn(igniteSqlForPublicUse())
+                        .thenApply(unused -> currentThread())
+        );
+
+        assertThat(completerFuture, willBe(asyncContinuationPool()));
+    }
+
+    private static IgniteSql igniteSqlForPublicUse() {
+        return CLUSTER.aliveNode().sql();
+    }
+
+    private static <T> T forcingSwitchFromUserThread(Supplier<? extends T> 
action) {
+        return 
PublicApiThreadingTests.forcingSwitchFromUserThread(CLUSTER.aliveNode(), 
action);
+    }
+
+    @ParameterizedTest
+    @EnumSource(SqlAsyncOperation.class)
+    void 
sqlFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(SqlAsyncOperation
 operation) {
+        CompletableFuture<Thread> completerFuture = 
forcingSwitchFromUserThread(
+                () -> operation.executeOn(igniteSqlForInternalUse())
+                        .thenApply(unused -> currentThread())
+        );
+
+        assertThat(completerFuture, willBe(anIgniteThread()));
+    }
+
+    @ParameterizedTest
+    @EnumSource(AsyncResultSetAsyncOperation.class)
+    void 
asyncResultSetFuturesCompleteInContinuationsPool(AsyncResultSetAsyncOperation 
operation) throws Exception {
+        AsyncResultSet<?> firstPage = fetchFirstPage(igniteSqlForPublicUse());
+
+        CompletableFuture<Thread> completerFuture = 
operation.executeOn(firstPage)
+                        .thenApply(unused -> currentThread());
+
+        // The future might get completed in the calling thread as we don't 
force a wait inside Ignite
+        // (because we cannot do this with fetching next page or closing).
+        assertThat(completerFuture, willBe(
+                either(is(currentThread())).or(asyncContinuationPool())
+        ));
+    }
+
+    private static AsyncResultSet<SqlRow> fetchFirstPage(IgniteSql igniteSql)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return igniteSql.executeAsync(null, SELECT_QUERY).get(10, SECONDS);
+    }
+
+    @ParameterizedTest
+    @EnumSource(AsyncResultSetAsyncOperation.class)
+    void 
asyncResultSetFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(AsyncResultSetAsyncOperation
 operation)
+            throws Exception {
+        AsyncResultSet<?> firstPage = 
fetchFirstPage(igniteSqlForInternalUse());
+
+        CompletableFuture<Thread> completerFuture = 
operation.executeOn(firstPage)
+                .thenApply(unused -> currentThread());
+
+        // The future might get completed in the calling thread as we don't 
force a wait inside Ignite
+        // (because we cannot do this with fetching next page or closing).
+        assertThat(completerFuture, willBe(
+                either(is(currentThread())).or(anIgniteThread())
+        ));
+    }
+
+    /**
+     * This test differs from {@link 
#asyncResultSetFuturesCompleteInContinuationsPool(AsyncResultSetAsyncOperation)}
 in that it obtains
+     * the future to test from a call on a ResultSet obtained from a 
ResultSet, not from a view.
+     */
+    @ParameterizedTest
+    @EnumSource(AsyncResultSetAsyncOperation.class)
+    void 
asyncResultSetFuturesAfterFetchCompleteInContinuationsPool(AsyncResultSetAsyncOperation
 operation) throws Exception {
+        AsyncResultSet<?> firstPage = fetchFirstPage(igniteSqlForPublicUse());
+        AsyncResultSet<?> secondPage = firstPage.fetchNextPage().get(10, 
SECONDS);
+
+        CompletableFuture<Thread> completerFuture = 
operation.executeOn(secondPage)
+                .thenApply(unused -> currentThread());
+
+        // The future might get completed in the calling thread as we don't 
force a wait inside Ignite
+        // (because we cannot do this with fetching next page or closing).
+        assertThat(completerFuture, willBe(
+                either(is(currentThread())).or(asyncContinuationPool())
+        ));
+    }
+
+    /**
+     * This test differs from
+     * {@link 
#asyncResultSetFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(AsyncResultSetAsyncOperation)}
 in that
+     * it obtains the future to test from a call on a ResultSet obtained from 
a ResultSet, not from a view.
+     */
+    @ParameterizedTest
+    @EnumSource(AsyncResultSetAsyncOperation.class)
+    void 
asyncResultSetFuturesAfterFetchFromInternalCallsAreNotResubmittedToContinuationsPool(AsyncResultSetAsyncOperation
 operation)
+            throws Exception {
+        AsyncResultSet<?> firstPage = 
fetchFirstPage(igniteSqlForInternalUse());
+        AsyncResultSet<?> secondPage = firstPage.fetchNextPage().get(10, 
SECONDS);
+
+        CompletableFuture<Thread> completerFuture = 
operation.executeOn(secondPage)
+                .thenApply(unused -> currentThread());
+
+        // The future might get completed in the calling thread as we don't 
force a wait inside Ignite
+        // (because we cannot do this with fetching next page or closing).
+        assertThat(completerFuture, willBe(
+                either(is(currentThread())).or(anIgniteThread())
+        ));
+    }
+
+    private enum SqlAsyncOperation {
+        EXECUTE_QUERY_ASYNC(sql -> sql.executeAsync(null, SELECT_QUERY)),
+        EXECUTE_STATEMENT_ASYNC(sql -> sql.executeAsync(null, 
sql.createStatement(SELECT_QUERY))),
+        // TODO: IGNITE-18695 - uncomment 2 following lines.
+        // EXECUTE_QUERY_WITH_MAPPER_ASYNC(sql -> sql.executeAsync(null, 
(Mapper<?>) null, SELECT_QUERY)),
+        // EXECUTE_STATEMENT_WITH_MAPPER_ASYNC(sql -> sql.executeAsync(null, 
(Mapper<?>) null, sql.createStatement(SELECT_QUERY))),
+        EXECUTE_BATCH_QUERY_ASYNC(sql -> sql.executeBatchAsync(null, 
UPDATE_QUERY, BatchedArguments.of(10_000))),
+        // TODO: IGNITE-21872 - uncomment the following lines.
+        // EXECUTE_BATCH_STATEMENT_ASYNC(
+        //         sql -> sql.executeBatchAsync(null, 
sql.createStatement(UPDATE_QUERY), BatchedArguments.of(10_000))
+        // ),
+        EXECUTE_SCRIPT_ASYNC(sql -> sql.executeScriptAsync(SELECT_QUERY));
+
+        private final Function<IgniteSql, CompletableFuture<?>> action;
+
+        SqlAsyncOperation(Function<IgniteSql, CompletableFuture<?>> action) {
+            this.action = action;
+        }
+
+        CompletableFuture<?> executeOn(IgniteSql sql) {
+            return action.apply(sql);
+        }
+    }
+
+    private enum AsyncResultSetAsyncOperation {
+        FETCH_NEXT_PAGE(resultSet -> resultSet.fetchNextPage()),
+        CLOSE(resultSet -> resultSet.closeAsync());
+
+        private final Function<AsyncResultSet<Object>, CompletableFuture<?>> 
action;
+
+        AsyncResultSetAsyncOperation(Function<AsyncResultSet<Object>, 
CompletableFuture<?>> action) {
+            this.action = action;
+        }
+
+        CompletableFuture<?> executeOn(AsyncResultSet<?> resultSet) {
+            return action.apply((AsyncResultSet<Object>) resultSet);
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackAsyncResultSet.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackAsyncResultSet.java
new file mode 100644
index 0000000000..4c65d2a8ef
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackAsyncResultSet.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static 
org.apache.ignite.internal.thread.PublicApiThreading.preventThreadHijack;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.table.AntiHijackAsyncCursor;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around {@link AsyncResultSet} which prevents Ignite internal 
threads from being hijacked by the user via asynchronous methods.
+ *
+ * @see PublicApiThreading
+ */
+public class AntiHijackAsyncResultSet<T> extends AntiHijackAsyncCursor<T> 
implements AsyncResultSet<T> {
+    private final AsyncResultSet<T> resultSet;
+    private final Executor asyncContinuationExecutor;
+
+    /**
+     * Constructor.
+     */
+    public AntiHijackAsyncResultSet(AsyncResultSet<T> resultSet, Executor 
asyncContinuationExecutor) {
+        super(resultSet, asyncContinuationExecutor);
+
+        this.resultSet = resultSet;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
+    }
+
+    @Override
+    public @Nullable ResultSetMetadata metadata() {
+        return resultSet.metadata();
+    }
+
+    @Override
+    public boolean hasRowSet() {
+        return resultSet.hasRowSet();
+    }
+
+    @Override
+    public long affectedRows() {
+        return resultSet.affectedRows();
+    }
+
+    @Override
+    public boolean wasApplied() {
+        return resultSet.wasApplied();
+    }
+
+    @Override
+    public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
+        return preventThreadHijack(resultSet.fetchNextPage(), 
asyncContinuationExecutor)
+                .thenApply(resultSet -> new 
AntiHijackAsyncResultSet<>(resultSet, asyncContinuationExecutor));
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackIgniteSql.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackIgniteSql.java
new file mode 100644
index 0000000000..24e29c55d4
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackIgniteSql.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around {@link IgniteSql} that adds protection agains thread 
hijacking by users.
+ */
+public class AntiHijackIgniteSql implements IgniteSql, Wrapper {
+    private final IgniteSql sql;
+    private final Executor asyncContinuationExecutor;
+
+    public AntiHijackIgniteSql(IgniteSql sql, Executor 
asyncContinuationExecutor) {
+        this.sql = sql;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
+    }
+
+    @Override
+    public Statement createStatement(String query) {
+        return sql.createStatement(query);
+    }
+
+    @Override
+    public StatementBuilder statementBuilder() {
+        return sql.statementBuilder();
+    }
+
+    @Override
+    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String 
query, @Nullable Object... arguments) {
+        return sql.execute(transaction, query, arguments);
+    }
+
+    @Override
+    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, 
Statement statement, @Nullable Object... arguments) {
+        return sql.execute(transaction, statement, arguments);
+    }
+
+    @Override
+    public <T> ResultSet<T> execute(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return sql.execute(transaction, mapper, query, arguments);
+    }
+
+    @Override
+    public <T> ResultSet<T> execute(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return sql.execute(transaction, mapper, statement, arguments);
+    }
+
+    @Override
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return preventThreadHijackForResultSet(sql.executeAsync(transaction, 
query, arguments));
+    }
+
+    @Override
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return preventThreadHijackForResultSet(sql.executeAsync(transaction, 
statement, arguments));
+    }
+
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return preventThreadHijackForResultSet(sql.executeAsync(transaction, 
mapper, query, arguments));
+    }
+
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return preventThreadHijackForResultSet(sql.executeAsync(transaction, 
mapper, statement, arguments));
+    }
+
+    @Override
+    public ReactiveResultSet executeReactive(@Nullable Transaction 
transaction, String query, @Nullable Object... arguments) {
+        return sql.executeReactive(transaction, query, arguments);
+    }
+
+    @Override
+    public ReactiveResultSet executeReactive(@Nullable Transaction 
transaction, Statement statement, @Nullable Object... arguments) {
+        return sql.executeReactive(transaction, statement, arguments);
+    }
+
+    @Override
+    public long[] executeBatch(@Nullable Transaction transaction, String 
dmlQuery, BatchedArguments batch) {
+        return sql.executeBatch(transaction, dmlQuery, batch);
+    }
+
+    @Override
+    public long[] executeBatch(@Nullable Transaction transaction, Statement 
dmlStatement, BatchedArguments batch) {
+        return sql.executeBatch(transaction, dmlStatement, batch);
+    }
+
+    @Override
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
+        return preventThreadHijack(sql.executeBatchAsync(transaction, query, 
batch));
+    }
+
+    @Override
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
+        return preventThreadHijack(sql.executeBatchAsync(transaction, 
statement, batch));
+    }
+
+    @Override
+    public Publisher<Long> executeBatchReactive(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
+        return sql.executeBatchReactive(transaction, query, batch);
+    }
+
+    @Override
+    public Publisher<Long> executeBatchReactive(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
+        return sql.executeBatchReactive(transaction, statement, batch);
+    }
+
+    @Override
+    public void executeScript(String query, @Nullable Object... arguments) {
+        sql.executeScript(query, arguments);
+    }
+
+    @Override
+    public CompletableFuture<Void> executeScriptAsync(String query, @Nullable 
Object... arguments) {
+        return preventThreadHijack(sql.executeScriptAsync(query, arguments));
+    }
+
+    private <T> CompletableFuture<AsyncResultSet<T>> 
preventThreadHijackForResultSet(CompletableFuture<AsyncResultSet<T>> 
originalFuture) {
+        return preventThreadHijack(originalFuture)
+                .thenApply(resultSet -> new 
AntiHijackAsyncResultSet<>(resultSet, asyncContinuationExecutor));
+    }
+
+    private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> 
originalFuture) {
+        return PublicApiThreading.preventThreadHijack(originalFuture, 
asyncContinuationExecutor);
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> classToUnwrap) {
+        return classToUnwrap.cast(sql);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index b88c948ccf..69bd26fc9b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -417,6 +417,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
+        // TODO: IGNITE-21872 - implement.
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 

Reply via email to