xtern commented on code in PR #3953:
URL: https://github.com/apache/ignite-3/pull/3953#discussion_r1649069645
##########
modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcStatementSelfTest.java:
##########
@@ -837,4 +839,49 @@ public void testOpenCursorsWithDdl() throws Exception {
assertTrue(waitForCondition(() -> openCursors() == 0, 5_000));
}
+
+ @Test
+ public void testTimeout() throws Exception {
+ JdbcStatement igniteStmt = stmt.unwrap(JdbcStatement.class);
+
+ // No timeout
+
+ {
+ igniteStmt.timeout(0);
+
+ try (ResultSet rs = igniteStmt.executeQuery("SELECT * FROM
TABLE(SYSTEM_RANGE(1, 100))")) {
+ while (rs.next()) {
+ rs.getLong(1);
+ }
+ }
+ }
+
+ // Rise timeout
+
+ {
+ int timeoutMillis = ThreadLocalRandom.current().nextInt(10, 200);
+ igniteStmt.timeout(timeoutMillis);
+
+ assertThrowsSqlException(SQLException.class,
+ "Query timeout", () -> {
Review Comment:
This test may be flaky, I tried to run it with timeoutMillis=10, for example
and it fails, because planning timeout occurred.
```
Expected :a string containing "Query timeout"
Actual :"Planning of a query aborted due to planner timeout threshold is
reached"
```
##########
modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java:
##########
@@ -131,4 +139,99 @@ public void process(ResultSet<SqlRow> resultSet) {
resultSet.forEachRemaining(res::add);
}
}
+
+ @Test
+ public void testEarlyQueryTimeout() {
+ Statement stmt = igniteSql().statementBuilder()
+ .query("SELECT * FROM TABLE(SYSTEM_RANGE(1,
1000000000000000))")
+ .queryTimeout(1, TimeUnit.MILLISECONDS)
+ .build();
+
+ // Do not have enough time to do anything.
+ assertThrowsSqlException(Sql.EXECUTION_CANCELLED_ERR, "Query timeout",
() -> {
+ igniteSql().execute(null, stmt);
+ });
+ }
+
+ @Test
+ public void testQueryTimeout() {
+ Statement stmt = igniteSql().statementBuilder()
+ .query("SELECT * FROM TABLE(SYSTEM_RANGE(1,
1000000000000000))")
+ .queryTimeout(100, TimeUnit.MILLISECONDS)
+ .build();
+
+ ResultSet<SqlRow> rs;
+ while (true) {
+ try {
+ rs = igniteSql().execute(null, stmt);
+ break;
+ } catch (SqlException e) {
+ if (e.code() == Sql.PLANNING_TIMEOUT_ERR || e.code() ==
Sql.EXECUTION_CANCELLED_ERR) {
+ continue;
+ }
+ fail(e.getMessage());
+ }
+ }
+ ResultSet<SqlRow> resultSet = rs;
+ assertNotNull(resultSet);
+
+ // Read data until exception occurs.
+ assertThrowsSqlException(Sql.EXECUTION_CANCELLED_ERR, "Query timeout",
() -> {
+ while (resultSet.hasNext()) {
+ resultSet.next();
+ }
+ });
+ }
+
+ @Test
+ public void testQueryTimeoutIsPropagatedFromTheServer() throws Exception {
Review Comment:
I see that this test sometimes produces NPE during cancellation
```
[2024-06-24T10:38:03,199][WARN ][%issat_n_0%sql-planning-pool-1][task]
Volcano planning times out, cancels the subsequent optimization.
[2024-06-24T10:38:03,446][ERROR][%issat_n_0%sql-execution-pool-1][FailureProcessor]
Critical system error detected. Will be handled accordingly to configured
handler [hnd=NoOpFailureHandler [], failureCtx=FailureContext
[type=CRITICAL_ERROR, err=org.apache.ignite.lang.IgniteException: IGN-CMN-65535
TraceId:05602438-1bba-4f45-94e9-e63ab8c6d21f Unexpected error during execute
fragment 145 of query 6fe450b1-b2fb-45a8-9774-c1af41e7e82c]]
org.apache.ignite.lang.IgniteException: Unexpected error during execute
fragment 145 of query 6fe450b1-b2fb-45a8-9774-c1af41e7e82c
at
org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl.lambda$execute$0(QueryTaskExecutorImpl.java:96)
~[main/:?]
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
at java.base/java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.lang.NullPointerException
at
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl$DistributedQueryManager.acknowledgeFragment(ExecutionServiceImpl.java:839)
~[main/:?]
at
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl.onMessage(ExecutionServiceImpl.java:523)
~[main/:?]
at
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl.lambda$start$2(ExecutionServiceImpl.java:277)
~[main/:?]
at
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.onMessageInternal(MessageServiceImpl.java:151)
~[main/:?]
at
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.lambda$onMessage$1(MessageServiceImpl.java:117)
~[main/:?]
at
org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl.lambda$execute$0(QueryTaskExecutorImpl.java:83)
~[main/:?]
... 3 more
```
The test doesn't fail but, as I understand, there is a NPE-detector on
TeamCity which may not like this.
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java:
##########
@@ -32,21 +36,66 @@ public class QueryCancel {
private boolean canceled;
+ private boolean timeout;
+
/**
- * Adds a cancel action.
+ * Adds a cancel action. If operation has already been canceled, throws a
{@link QueryCancelledException}.
+ *
+ * <p>NOTE: If the operation is cancelled, this method will immediately
invoke the given action
+ * and then throw a {@link QueryCancelledException}.
*
- * @param clo Add cancel action.
+ * @param clo Cancel action.
*/
public synchronized void add(Cancellable clo) throws
QueryCancelledException {
assert clo != null;
if (canceled) {
- throw new QueryCancelledException();
+ // Immediately invoke a cancel action, if already cancelled.
+ // Otherwise the caller is required to catch
QueryCancelledException and call an action manually.
+ try {
+ clo.cancel(timeout);
+ } catch (Exception ignore) {
+ // Do nothing
+ }
+
+ String message = timeout ? QueryCancelledException.TIMEOUT_MSG :
QueryCancelledException.CANCEL_MSG;
+ throw new QueryCancelledException(message);
}
cancelActions.add(clo);
}
+ /**
+ * Removes the given callback.
+ *
+ * @param clo Callback.
+ */
+ public synchronized void remove(Cancellable clo) {
+ assert clo != null;
+
+ cancelActions.remove(clo);
+ }
+
+ /**
+ * Schedules a timeout action (a call to {@link #timeout()}) after {@code
timeoutMillis} milliseconds.
+ *
+ * @param scheduler Scheduler to trigger an action.
+ * @param timeoutMillis Timeout in milliseconds.
+ * @return Future that will be completed when the timeout is reached.
+ */
+ public CompletableFuture<Void> addTimeout(ScheduledExecutorService
scheduler, long timeoutMillis) {
+ CompletableFuture<Void> fut = new CompletableFuture<>();
+ fut.thenAccept((r) -> timeout());
+
+ ScheduledFuture<?> f = scheduler.schedule(() -> {
+ fut.complete(null);
+ }, timeoutMillis, MILLISECONDS);
+
+ add((timeout) -> f.cancel(false));
Review Comment:
Do I understand correctly that this code is needed to cancel a future when
the request has completed without a timeout?
Then it might be worth adding
```
if (!timeout) f.cancel(false)
```
because the logic for canceling a future may not be so trivial :thinking:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]