This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 0c3a0cfce4a HBASE-28244 ProcedureTestingUtility.restart is broken 
sometimes after HBASE-28199 (#5563)
0c3a0cfce4a is described below

commit 0c3a0cfce4a95a4597d7e79a8fc35bbe784f2fa9
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Wed Dec 13 14:52:12 2023 +0800

    HBASE-28244 ProcedureTestingUtility.restart is broken sometimes after 
HBASE-28199 (#5563)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    (cherry picked from commit 29bfc610d0433f720a34bc47aadca1433bbb1882)
---
 .../hadoop/hbase/procedure2/ProcedureExecutor.java | 46 ++++++++++++++++------
 .../hbase/procedure2/ProcedureFutureUtil.java      | 13 +++++-
 2 files changed, 46 insertions(+), 13 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 5aa11811122..e01a27d7467 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -605,15 +606,23 @@ public class ProcedureExecutor<TEnvironment> {
     this.threadGroup = new ThreadGroup("PEWorkerGroup");
     this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, 
"ProcExecTimeout");
     this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, 
threadGroup, "WorkerMonitor");
+    ThreadFactory backingThreadFactory = new ThreadFactory() {
 
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(threadGroup, r);
+      }
+    };
     int size = Math.max(2, Runtime.getRuntime().availableProcessors());
-    ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, 
TimeUnit.MINUTES,
-      new LinkedBlockingQueue<Runnable>(), new 
ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat(getClass().getSimpleName() + 
"-Async-Task-Executor-%d").build());
+    ThreadPoolExecutor executor =
+      new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, new 
LinkedBlockingQueue<Runnable>(),
+        new ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat(getClass().getSimpleName() + 
"-Async-Task-Executor-%d")
+          .setThreadFactory(backingThreadFactory).build());
     executor.allowCoreThreadTimeOut(true);
     this.asyncTaskExecutor = executor;
-    forceUpdateExecutor = Executors.newSingleThreadExecutor(
-      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
+    forceUpdateExecutor = Executors.newFixedThreadPool(1, new 
ThreadFactoryBuilder().setDaemon(true)
+      
.setNameFormat("Force-Update-PEWorker-%d").setThreadFactory(backingThreadFactory).build());
     store.registerListener(new ProcedureStoreListener() {
 
       @Override
@@ -684,10 +693,10 @@ public class ProcedureExecutor<TEnvironment> {
   }
 
   public void stop() {
-    if (!running.getAndSet(false)) {
-      return;
-    }
-
+    // it is possible that we fail in init, while loading procedures, so we 
will not set running to
+    // true but we should have already started the ProcedureScheduler, and 
also the two
+    // ExecutorServices, so here we do not check running state, just stop them
+    running.set(false);
     LOG.info("Stopping");
     scheduler.stop();
     timeoutExecutor.sendStopSignal();
@@ -708,14 +717,29 @@ public class ProcedureExecutor<TEnvironment> {
     for (WorkerThread worker : workerThreads) {
       worker.awaitTermination();
     }
+    try {
+      if (!forceUpdateExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn("There are still pending tasks in forceUpdateExecutor");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("interrupted while waiting for forceUpdateExecutor 
termination", e);
+      Thread.currentThread().interrupt();
+    }
+    try {
+      if (!asyncTaskExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn("There are still pending tasks in asyncTaskExecutor");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("interrupted while waiting for asyncTaskExecutor termination", 
e);
+      Thread.currentThread().interrupt();
+    }
 
     // Destroy the Thread Group for the executors
     // TODO: Fix. #join is not place to destroy resources.
     try {
       threadGroup.destroy();
     } catch (IllegalThreadStateException e) {
-      LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", 
this.threadGroup,
-        e.getMessage());
+      LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", 
this.threadGroup, e);
       // This dumps list of threads on STDOUT.
       this.threadGroup.list();
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java
index 8ca4cba245d..997063c3097 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.procedure2;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.commons.lang3.mutable.MutableBoolean;
@@ -61,6 +62,14 @@ public final class ProcedureFutureUtil {
     throws IOException, ProcedureSuspendedException {
     MutableBoolean completed = new MutableBoolean(false);
     Thread currentThread = Thread.currentThread();
+    // This is for testing. In ProcedureTestingUtility, we will restart a 
ProcedureExecutor and
+    // reuse it, for performance, so we need to make sure that all the 
procedure have been stopped.
+    // But here, the callback of this future is not executed in a PEWorker, so 
in ProcedureExecutor
+    // we have no way to stop it. So here, we will get the asyncTaskExecutor 
first, in the PEWorker
+    // thread, where the ProcedureExecutor should have not been stopped yet, 
then when calling the
+    // callback, if the ProcedureExecutor have already been stopped and 
restarted, the
+    // asyncTaskExecutor will also be shutdown so we can not add anything back 
to the scheduler.
+    ExecutorService asyncTaskExecutor = env.getAsyncTaskExecutor();
     FutureUtils.addListener(future, (r, e) -> {
       if (Thread.currentThread() == currentThread) {
         LOG.debug("The future has completed while adding callback, give up 
suspending procedure {}",
@@ -77,7 +86,7 @@ public final class ProcedureFutureUtil {
       // And what makes things worse is that, we persist procedure state to 
master local region,
       // where the AsyncFSWAL implementation will use the same netty's event 
loop for dealing with
       // I/O, which could even cause dead lock.
-      env.getAsyncTaskExecutor().execute(() -> {
+      asyncTaskExecutor.execute(() -> {
         // should acquire procedure execution lock to make sure that the 
procedure executor has
         // finished putting this procedure to the WAITING_TIMEOUT state, 
otherwise there could be
         // race and cause unexpected result
@@ -89,7 +98,7 @@ public final class ProcedureFutureUtil {
         } catch (IOException ioe) {
           LOG.error("Error while acquiring execution lock for procedure {}"
             + " when trying to wake it up, aborting...", proc, ioe);
-          env.getMasterServices().abort("Can not acquire procedure execution 
lock", e);
+          env.getMasterServices().abort("Can not acquire procedure execution 
lock", ioe);
           return;
         }
         try {

Reply via email to