xtern commented on code in PR #3953:
URL: https://github.com/apache/ignite-3/pull/3953#discussion_r1654764752


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java:
##########
@@ -17,45 +17,136 @@
 
 package org.apache.ignite.internal.sql.engine;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.util.Cancellable;
+import org.apache.ignite.internal.util.FastTimestamps;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Holds query cancel state.
  */
 public class QueryCancel {
     private final List<Cancellable> cancelActions = new ArrayList<>(3);
 
-    private boolean canceled;
+    private Reason reason;
+
+    private volatile Instant deadline;
+
+    private volatile CompletableFuture<Void> timeoutFut;
 
     /**
-     * Adds a cancel action.
+     * Adds a cancel action. If operation has already been canceled, throws a 
{@link QueryCancelledException}.
+     *
+     * <p>NOTE: If the operation is cancelled, this method will immediately 
invoke the given action
+     * and then throw a {@link QueryCancelledException}.
      *
-     * @param clo Add cancel action.
+     * @param clo Cancel action.
      */
     public synchronized void add(Cancellable clo) throws 
QueryCancelledException {
         assert clo != null;
 
-        if (canceled) {
-            throw new QueryCancelledException();
+        if (reason != null) {
+            boolean timeout = reason == Reason.TIMEOUT;
+
+            // Immediately invoke a cancel action, if already cancelled.
+            // Otherwise the caller is required to catch 
QueryCancelledException and call an action manually.
+            try {
+                clo.cancel(timeout);
+            } catch (Exception ignore) {
+                // Do nothing
+            }
+
+            String message = timeout ? QueryCancelledException.TIMEOUT_MSG : 
QueryCancelledException.CANCEL_MSG;
+            throw new QueryCancelledException(message);
         }
 
         cancelActions.add(clo);
     }
 
+    /**
+     * Removes the given callback.
+     *
+     * @param clo Callback.
+     */
+    public synchronized void remove(Cancellable clo) {
+        assert clo != null;
+
+        cancelActions.remove(clo);
+    }
+
+    /**
+     * Schedules a timeout after {@code timeoutMillis} milliseconds.
+     * Call be called only once.
+     *
+     * @param scheduler Scheduler to trigger an action.
+     * @param timeoutMillis Timeout in milliseconds.
+     * @return Future that will be completed when the timeout is reached.
+     */
+    public synchronized CompletableFuture<Void> 
setTimeout(ScheduledExecutorService scheduler, long timeoutMillis) {
+        assert reason == null : "Cannot set a timeout when cancelled";
+        assert timeoutFut == null : "Timeout has already been set";
+
+        CompletableFuture<Void> fut = new CompletableFuture<>();
+        fut.thenAccept((r) -> doCancel(Reason.TIMEOUT));
+
+        ScheduledFuture<?> f = scheduler.schedule(() -> {
+            fut.complete(null);
+        }, timeoutMillis, MILLISECONDS);
+
+        add((timeout) -> {
+            // Cancel the future if we didn't timeout,
+            // since in the case of a timeout it is already completed.
+            if (!timeout) {
+                f.cancel(false);
+            }
+        });
+
+        this.timeoutFut = fut;
+        this.deadline = 
Instant.ofEpochMilli(FastTimestamps.coarseCurrentTimeMillis() + timeoutMillis);
+        return fut;
+    }
+
+    /**
+     * Returns the deadline of the operation.
+     *
+     * <p>Can be null if a query has no timeout.
+     */
+    public @Nullable CompletableFuture<Void> timeoutFuture() {
+        return timeoutFut;
+    }
+
+    /**
+     * Returns the deadline of the operation.
+     *
+     * <p>Can be null if a query has no timeout.
+     */
+    public @Nullable Instant deadline() {

Review Comment:
   this method can be removed



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java:
##########
@@ -801,11 +846,14 @@ private void acknowledgeFragment(String nodeName, long 
fragmentId, @Nullable Thr
 
                 if (rootFragmentId0 != null && fragmentId == rootFragmentId0) {
                     root.completeExceptionally(ex);
+                } else if (root == null) {

Review Comment:
   Let's add TODO with link to this issue 



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -596,14 +624,22 @@ private CompletableFuture<AsyncSqlCursor<InternalSqlRow>> 
queryScript(
     ) {
         String schemaName = properties.get(QueryProperty.DEFAULT_SCHEMA);
         ZoneId timeZoneId = properties.get(QueryProperty.TIME_ZONE_ID);
+        Long queryTimeout = 
properties.getOrDefault(QueryProperty.QUERY_TIMEOUT, 0L);
+
+        Instant deadline;
+        if (queryTimeout != 0) {
+            deadline = Instant.now().plusMillis(queryTimeout);

Review Comment:
   Looks like all similar places with Instant.now().plus in SqlQueryProcessor 
should be replaced with
   ```
   Instant.ofEpochMilli(FastTimestamps.coarseCurrentTimeMillis() + 
timeoutMillis);
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanningContext.java:
##########
@@ -151,7 +151,7 @@ public List<MetadataHandler<?>> handlers(Class<? extends 
MetadataHandler<?>> hnd
     private final long plannerTimeout;
 
     /** Flag indicated if planning has been canceled due to timeout. */
-    private boolean timeouted = false;
+    private final AtomicBoolean timeouted = new AtomicBoolean();

Review Comment:
   Do we still need this change?



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java:
##########
@@ -801,11 +846,14 @@ private void acknowledgeFragment(String nodeName, long 
fragmentId, @Nullable Thr
 
                 if (rootFragmentId0 != null && fragmentId == rootFragmentId0) {
                     root.completeExceptionally(ex);
+                } else if (root == null) {

Review Comment:
   Let's add TODO with link to issue 
https://issues.apache.org/jira/browse/IGNITE-22585



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanningContext.java:
##########
@@ -151,7 +151,7 @@ public List<MetadataHandler<?>> handlers(Class<? extends 
MetadataHandler<?>> hnd
     private final long plannerTimeout;
 
     /** Flag indicated if planning has been canceled due to timeout. */
-    private boolean timeouted = false;
+    private final AtomicBoolean timeouted = new AtomicBoolean();

Review Comment:
   Do we still need this change? :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to