This is an automated email from the ASF dual-hosted git repository.

wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new 509be4e  [SCB-1753]accessor problem fix: LinkedBlockingQueueEx queue 
features
509be4e is described below

commit 509be4ee98eedacef43fb8739ea6636e36478873
Author: liubao <bi...@qq.com>
AuthorDate: Mon Aug 3 20:55:10 2020 +0800

    [SCB-1753]accessor problem fix: LinkedBlockingQueueEx queue features
---
 .../servicecomb/core/executor/GroupExecutor.java   |  2 +-
 .../core/executor/LinkedBlockingQueueEx.java       | 63 ++++++----------------
 .../core/executor/ThreadPoolExecutorEx.java        | 13 ++++-
 .../core/executor/TestThreadPoolExecutorEx.java    | 14 +++--
 .../it/edge/handler/ExceptionConvertHandler.java   |  2 -
 5 files changed, 40 insertions(+), 54 deletions(-)

diff --git 
a/core/src/main/java/org/apache/servicecomb/core/executor/GroupExecutor.java 
b/core/src/main/java/org/apache/servicecomb/core/executor/GroupExecutor.java
index 41371b4..535b8a2 100644
--- a/core/src/main/java/org/apache/servicecomb/core/executor/GroupExecutor.java
+++ b/core/src/main/java/org/apache/servicecomb/core/executor/GroupExecutor.java
@@ -87,7 +87,7 @@ public class GroupExecutor implements Executor, Closeable {
           maxThreads,
           maxIdleInSecond,
           TimeUnit.SECONDS,
-          new LinkedBlockingQueueEx<>(maxQueueSize),
+          new LinkedBlockingQueueEx(maxQueueSize),
           factory);
       executorList.add(executor);
     }
diff --git 
a/core/src/main/java/org/apache/servicecomb/core/executor/LinkedBlockingQueueEx.java
 
b/core/src/main/java/org/apache/servicecomb/core/executor/LinkedBlockingQueueEx.java
index 57b8450..aba525f 100644
--- 
a/core/src/main/java/org/apache/servicecomb/core/executor/LinkedBlockingQueueEx.java
+++ 
b/core/src/main/java/org/apache/servicecomb/core/executor/LinkedBlockingQueueEx.java
@@ -16,81 +16,50 @@
  */
 package org.apache.servicecomb.core.executor;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.RejectedExecutionException;
 
-public class LinkedBlockingQueueEx<E> extends LinkedBlockingQueue<E> {
+public class LinkedBlockingQueueEx extends LinkedBlockingQueue<Runnable> {
   private static final long serialVersionUID = -1L;
 
-  private static final int COUNT_BITS = Integer.SIZE - 3;
-
-  private static final int CAPACITY = (1 << COUNT_BITS) - 1;
-
-  private static int workerCountOf(int c) {
-    return c & CAPACITY;
-  }
-
-  private static Method addWrokerMethod;
-
   private transient volatile ThreadPoolExecutorEx owner = null;
 
-  private AtomicInteger ctl;
-
   public LinkedBlockingQueueEx(int capacity) {
     super(capacity);
   }
 
   public void setOwner(ThreadPoolExecutorEx owner) {
     this.owner = owner;
-    try {
-      addWrokerMethod = 
ThreadPoolExecutor.class.getDeclaredMethod("addWorker", Runnable.class, 
boolean.class);
-      addWrokerMethod.setAccessible(true);
-
-      Field field = ThreadPoolExecutor.class.getDeclaredField("ctl");
-      field.setAccessible(true);
-      ctl = (AtomicInteger) field.get(owner);
-    } catch (Throwable e) {
-      throw new IllegalStateException("failed to init queue.", e);
-    }
   }
 
   @Override
-  public boolean offer(E runnable) {
+  public boolean offer(Runnable runnable) {
     // task can come before owner available
     if (owner == null) {
       return super.offer(runnable);
     }
-
     // can not create more thread, just queue the task
-    if (workerCountOf(ctl.get()) == owner.getMaximumPoolSize()) {
+    if (owner.getPoolSize() == owner.getMaximumPoolSize()) {
       return super.offer(runnable);
     }
     // no need to create more thread, just queue the task
-    if (owner.getNotFinished() <= workerCountOf(ctl.get())) {
+    if (owner.getNotFinished() <= owner.getPoolSize()) {
       return super.offer(runnable);
     }
     // all threads are busy, and can create new thread, not queue the task
-    if (workerCountOf(ctl.get()) < owner.getMaximumPoolSize()) {
-      try {
-        // low frequency event, reflect is no problem
-        if (!(Boolean) addWrokerMethod.invoke(owner, runnable, false)) {
-          // failed to create new thread, queue the task
-          // if failed to queue the task, owner will try to addWorker again,
-          // if still failed, the will reject the task
-          return super.offer(runnable);
-        }
-
-        // create new thread successfully, treat it as queue success
-        return true;
-      } catch (Throwable e) {
-        // reflection exception, should never happened
-        return super.offer(runnable);
-      }
+    if (owner.getPoolSize() < owner.getMaximumPoolSize()) {
+      return false;
     }
+    return super.offer(runnable);
+  }
 
+  /*
+   * when task is rejected (thread pool if full), force the item onto queue.
+   */
+  public boolean force(Runnable runnable) {
+    if (owner == null || owner.isShutdown()) {
+      throw new RejectedExecutionException("queue is not running.");
+    }
     return super.offer(runnable);
   }
 }
diff --git 
a/core/src/main/java/org/apache/servicecomb/core/executor/ThreadPoolExecutorEx.java
 
b/core/src/main/java/org/apache/servicecomb/core/executor/ThreadPoolExecutorEx.java
index bbd7f23..c8e5b11 100644
--- 
a/core/src/main/java/org/apache/servicecomb/core/executor/ThreadPoolExecutorEx.java
+++ 
b/core/src/main/java/org/apache/servicecomb/core/executor/ThreadPoolExecutorEx.java
@@ -42,7 +42,18 @@ public class ThreadPoolExecutorEx extends ThreadPoolExecutor 
{
   @Override
   public void execute(Runnable command) {
     submittedCount.incrementAndGet();
-    super.execute(command);
+    try {
+      super.execute(command);
+    } catch (RejectedExecutionException e) {
+      if (getQueue() instanceof LinkedBlockingQueueEx) {
+        final LinkedBlockingQueueEx queue = (LinkedBlockingQueueEx) getQueue();
+        if (!queue.force(command)) {
+          throw new RejectedExecutionException("thread pool queue is full");
+        }
+      } else {
+        throw e;
+      }
+    }
   }
 
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
diff --git 
a/core/src/test/java/org/apache/servicecomb/core/executor/TestThreadPoolExecutorEx.java
 
b/core/src/test/java/org/apache/servicecomb/core/executor/TestThreadPoolExecutorEx.java
index 12f421a..eaf4133 100644
--- 
a/core/src/test/java/org/apache/servicecomb/core/executor/TestThreadPoolExecutorEx.java
+++ 
b/core/src/test/java/org/apache/servicecomb/core/executor/TestThreadPoolExecutorEx.java
@@ -52,8 +52,8 @@ public class TestThreadPoolExecutorEx {
     }
   }
 
-  ThreadPoolExecutorEx executorEx = new ThreadPoolExecutorEx(2, 4, 60, 
TimeUnit.SECONDS,
-      new LinkedBlockingQueueEx<>(2), Executors.defaultThreadFactory());
+  ThreadPoolExecutorEx executorEx = new ThreadPoolExecutorEx(2, 4, 2, 
TimeUnit.SECONDS,
+      new LinkedBlockingQueueEx(2), Executors.defaultThreadFactory());
 
   public TestTask submitTask() {
     TestTask task = new TestTask();
@@ -139,11 +139,18 @@ public class TestThreadPoolExecutorEx {
     t4.quit();
     t5.quit();
     t6.quit();
+    waitForResult(2, executorEx::getPoolSize);
     executorEx.shutdown();
   }
 
   private void waitForResult(int expect, IntSupplier supplier) {
+    long max = 30000;
+    long waited = 0;
+
     for (; ; ) {
+      if (waited > max) {
+        throw new IllegalStateException("timed out waiting.");
+      }
       int actual = supplier.getAsInt();
       if (expect == actual) {
         return;
@@ -151,7 +158,8 @@ public class TestThreadPoolExecutorEx {
 
       LOGGER.info("waiting for thread result, expect:{}, actual: {}.", expect, 
actual);
       try {
-        TimeUnit.MILLISECONDS.sleep(100);
+        TimeUnit.MILLISECONDS.sleep(200);
+        waited += 200;
       } catch (InterruptedException e) {
         throw new IllegalStateException(e);
       }
diff --git 
a/integration-tests/it-edge/src/main/java/org/apache/servicecomb/it/edge/handler/ExceptionConvertHandler.java
 
b/integration-tests/it-edge/src/main/java/org/apache/servicecomb/it/edge/handler/ExceptionConvertHandler.java
index c03fc65..bc9ac39 100644
--- 
a/integration-tests/it-edge/src/main/java/org/apache/servicecomb/it/edge/handler/ExceptionConvertHandler.java
+++ 
b/integration-tests/it-edge/src/main/java/org/apache/servicecomb/it/edge/handler/ExceptionConvertHandler.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.it.edge.handler;
 
-import java.util.concurrent.TimeoutException;
-
 import javax.ws.rs.core.Response.Status;
 
 import org.apache.servicecomb.core.Handler;

Reply via email to