This is an automated email from the ASF dual-hosted git repository.

gongchao pushed a commit to branch netty-worker
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git

commit 773a295f82d2b7facc9e7b24d5b2ddc1825e2d11
Author: tomsun28 <[email protected]>
AuthorDate: Sun Aug 25 21:37:56 2024 +0800

    [improve] update thread pool nums policy
    
    Signed-off-by: tomsun28 <[email protected]>
---
 .../java/org/apache/hertzbeat/alert/AlerterWorkerPool.java     | 10 +++++-----
 .../org/apache/hertzbeat/alert/calculate/CalculateAlarm.java   |  8 +++++---
 .../org/apache/hertzbeat/collector/dispatch/WorkerPool.java    |  6 ++++--
 .../org/apache/hertzbeat/common/support/CommonThreadPool.java  |  2 +-
 .../apache/hertzbeat/remoting/netty/NettyRemotingClient.java   |  6 +++++-
 .../apache/hertzbeat/remoting/netty/NettyRemotingServer.java   |  1 -
 .../org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java    |  4 ++--
 7 files changed, 22 insertions(+), 15 deletions(-)

diff --git 
a/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java 
b/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java
index 78afb1457..560d02362 100644
--- a/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java
+++ b/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java
@@ -18,8 +18,8 @@
 package org.apache.hertzbeat.alert;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -50,11 +50,11 @@ public class AlerterWorkerPool {
                 .setDaemon(true)
                 .setNameFormat("alerter-worker-%d")
                 .build();
-        workerExecutor = new ThreadPoolExecutor(6,
+        workerExecutor = new ThreadPoolExecutor(10,
                 10,
                 10,
                 TimeUnit.SECONDS,
-                new SynchronousQueue<>(),
+                new LinkedBlockingQueue<>(),
                 threadFactory,
                 new ThreadPoolExecutor.AbortPolicy());
     }
@@ -69,10 +69,10 @@ public class AlerterWorkerPool {
                 .setNameFormat("notify-worker-%d")
                 .build();
         notifyExecutor = new ThreadPoolExecutor(6,
-                10,
+                6,
                 10,
                 TimeUnit.SECONDS,
-                new SynchronousQueue<>(),
+                new LinkedBlockingQueue<>(),
                 threadFactory,
                 new ThreadPoolExecutor.AbortPolicy());
     }
diff --git 
a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
 
b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
index ec75b4c51..858c9e20b 100644
--- 
a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
+++ 
b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
@@ -69,6 +69,8 @@ import org.springframework.util.CollectionUtils;
 public class CalculateAlarm {
 
     private static final String SYSTEM_VALUE_ROW_COUNT = 
"system_value_row_count";
+    
+    private static final int CALCULATE_THREADS = 3;
 
     /**
      * The alarm in the process is triggered
@@ -129,9 +131,9 @@ public class CalculateAlarm {
                 }
             }
         };
-        workerPool.executeJob(runnable);
-        workerPool.executeJob(runnable);
-        workerPool.executeJob(runnable);
+        for (int i = 0; i < CALCULATE_THREADS; i++) {
+            workerPool.executeJob(runnable);
+        }
     }
 
     private void calculate(CollectRep.MetricsData metricsData) {
diff --git 
a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
 
b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
index 68c50f087..73fb391ba 100644
--- 
a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
+++ 
b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
@@ -50,8 +50,10 @@ public class WorkerPool implements DisposableBean {
                 .setDaemon(true)
                 .setNameFormat("collect-worker-%d")
                 .build();
-        workerExecutor = new ThreadPoolExecutor(100,
-                1024,
+        int coreSize = Math.max(2, Runtime.getRuntime().availableProcessors());
+        int maxSize = Runtime.getRuntime().availableProcessors() * 16;
+        workerExecutor = new ThreadPoolExecutor(coreSize,
+                maxSize,
                 10,
                 TimeUnit.SECONDS,
                 new SynchronousQueue<>(),
diff --git 
a/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
 
b/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
index 159d048b3..4a231230f 100644
--- 
a/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
+++ 
b/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
@@ -49,7 +49,7 @@ public class CommonThreadPool implements DisposableBean {
                 .setDaemon(true)
                 .setNameFormat("common-worker-%d")
                 .build();
-        workerExecutor = new ThreadPoolExecutor(2,
+        workerExecutor = new ThreadPoolExecutor(1,
                 Integer.MAX_VALUE,
                 10,
                 TimeUnit.SECONDS,
diff --git 
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
index 779be270f..a01c7fab8 100644
--- 
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
@@ -50,6 +50,8 @@ import org.apache.hertzbeat.remoting.event.NettyEventListener;
 @Slf4j
 public class NettyRemotingClient extends NettyRemotingAbstract implements 
RemotingClient {
 
+    private static final int DEFAULT_WORKER_THREAD_NUM = Math.min(4, 
Runtime.getRuntime().availableProcessors());
+    
     private final NettyClientConfig nettyClientConfig;
 
     private final CommonThreadPool threadPool;
@@ -79,7 +81,9 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                     .setDaemon(true)
                     .setNameFormat("netty-client-worker-%d")
                     .build();
-            this.workerGroup = new NioEventLoopGroup(threadFactory);
+            String envThreadNum = 
System.getProperty("hertzbeat.client.worker.thread.num"); 
+            int workerThreadNum = envThreadNum != null ? 
Integer.parseInt(envThreadNum) : DEFAULT_WORKER_THREAD_NUM;
+            this.workerGroup = new NioEventLoopGroup(workerThreadNum, 
threadFactory);
             this.bootstrap.group(workerGroup)
                     .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
this.nettyClientConfig.getConnectTimeoutMillis())
                     .channel(NioSocketChannel.class)
diff --git 
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
index a25b9b901..48062fadb 100644
--- 
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
@@ -77,7 +77,6 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
     @Override
     public void start() {
-
         this.threadPool.execute(() -> {
             int port = this.nettyServerConfig.getPort();
             ThreadFactory bossThreadFactory = new ThreadFactoryBuilder()
diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
index 1c3024554..14b77f9ba 100644
--- 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
@@ -48,8 +48,8 @@ public class WarehouseWorkerPool {
                 .setDaemon(true)
                 .setNameFormat("warehouse-worker-%d")
                 .build();
-        workerExecutor = new ThreadPoolExecutor(6,
-                10,
+        workerExecutor = new ThreadPoolExecutor(2,
+                Integer.MAX_VALUE,
                 10,
                 TimeUnit.SECONDS,
                 new SynchronousQueue<>(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to