AMashenkov commented on a change in pull request #9544:
URL: https://github.com/apache/ignite/pull/9544#discussion_r740124115



##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
##########
@@ -35,52 +39,65 @@
     private final String name;
 
     /** Active flag (used to skip commands in inactive cluster.) */
-    private volatile boolean active;
+    private final AtomicBoolean active = new AtomicBoolean(false);

Review comment:
       ```suggestion
       private volatile boolean active;
   ```

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
##########
@@ -90,25 +107,39 @@ public void deactivate(Runnable r) {
      * Run task on busy lock.
      *
      * @param r Task to run.
-     * @return {@code true} if task was succesfully scheduled, {@code false} - 
otherwise (due to inactive state)
+     * @return {@code true} if task was succesfully scheduled, {@code false} - 
otherwise (due to inactive state).
      */
     public boolean busyRun(Runnable r) {
-        if (!busyLock.enterBusy())
+        return busyRun(r, busyLock);
+    }
+
+    /**
+     * Run task under specified busyLock.
+     *
+     * @param r Task to run.
+     * @param taskLock BusyLock to use.
+     * @return {@code true} if task was succesfully scheduled, {@code false} - 
otherwise (due to inactive state).
+     */
+    private boolean busyRun(Runnable r, GridBusyLock taskLock) {
+        if (!taskLock.enterBusy())
             return false;
 
         try {
-            if (!active)
+            if (!active.get())

Review comment:
       ```suggestion
               if (!active)
   ```

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
##########
@@ -135,6 +194,25 @@ public boolean busyRun(Runnable r) {
      * @param r Task to execute.
      */
     public void execute(Runnable r) {
-        pool.execute(() -> busyRun(r));
+        GridBusyLock lock = busyLock;
+
+        pool.execute(() -> busyRun(r, lock));
+    }
+
+    /**
+     * Execute cancellable task in thread pool under busy lock. Track task to 
cancel on executor stop.
+     *
+     * @param ct Cancellable task to execute.
+     */
+    public void execute(CancellableTask ct) {
+        GridBusyLock lock = busyLock;
+
+        if (busyRun(() -> cancellableTasks.put(ct, ct), lock)) {
+            pool.execute(() -> {
+                busyRun(ct, lock);
+
+                cancellableTasks.remove(ct);
+            });
+        }

Review comment:
       ```suggestion
       public void execute(CancellableTask ct) {
           submit(ct)
   ```

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
##########
@@ -35,52 +39,65 @@
     private final String name;
 
     /** Active flag (used to skip commands in inactive cluster.) */
-    private volatile boolean active;
+    private final AtomicBoolean active = new AtomicBoolean(false);
 
     /** Lock protection of started gathering during deactivation. */
-    private final GridBusyLock busyLock = new GridBusyLock();
+    private volatile GridBusyLock busyLock = new GridBusyLock();
 
     /** Executor pool. */
     private final IgniteThreadPoolExecutor pool;
 
+    /** Cancellable tasks. */
+    private final ConcurrentMap<CancellableTask, Object> cancellableTasks = 
new ConcurrentHashMap<>();
+
+    /** External stopping supplier. */
+    Supplier<Boolean> stopping;
+
     /**
      * Constructor.
      *
      * @param name Executor name.
      * @param pool Underlying thread pool executor.
+     * @param stopping External stopping state supplier.
      * @param logSupplier Log supplier.
      */
-    public BusyExecutor(String name, IgniteThreadPoolExecutor pool, 
Function<Class<?>, IgniteLogger> logSupplier) {
+    public BusyExecutor(
+        String name,
+        IgniteThreadPoolExecutor pool,
+        Supplier<Boolean> stopping,
+        Function<Class<?>, IgniteLogger> logSupplier) {
         this.name = name;
         this.pool = pool;
+        this.stopping = stopping;
         this.log = logSupplier.apply(StatisticsProcessor.class);
+        busyLock.block();
     }
 
     /**
      * Allow operations.
      */
-    public void activate() {
-        active = true;
+    public synchronized void activate() {
+
+        busyLock = new GridBusyLock();
+        active.set(true);

Review comment:
       ```suggestion
           busyLock = new GridBusyLock();
           
           active = true;
   ```

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
##########
@@ -35,52 +39,65 @@
     private final String name;
 
     /** Active flag (used to skip commands in inactive cluster.) */
-    private volatile boolean active;
+    private final AtomicBoolean active = new AtomicBoolean(false);

Review comment:
       Write access to active is already synchronized.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to