This is an automated email from the ASF dual-hosted git repository.

gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new 049c0937b [improve] update thread pool nums policy (#2606)
049c0937b is described below

commit 049c0937bc89f2ce2ba800c5abd62f8a4748ca9b
Author: tomsun28 <[email protected]>
AuthorDate: Sat Aug 31 16:07:36 2024 +0800

    [improve] update thread pool nums policy (#2606)
    
    Signed-off-by: tomsun28 <[email protected]>
    Co-authored-by: shown <[email protected]>
    Co-authored-by: Calvin <[email protected]>
---
 .../org/apache/hertzbeat/alert/AlerterWorkerPool.java  | 10 +++++-----
 .../hertzbeat/alert/calculate/CalculateAlarm.java      |  8 +++++---
 .../hertzbeat/collector/dispatch/WorkerPool.java       |  6 ++++--
 .../hertzbeat/common/constants/CommonConstants.java    |  2 +-
 .../hertzbeat/common/support/CommonThreadPool.java     |  2 +-
 .../hertzbeat/common/support/CommonThreadPoolTest.java | 18 ++++++++----------
 .../manager/controller/AccountController.java          | 12 ++++++++----
 .../manager/controller/AccountControllerTest.java      |  4 ++--
 .../hertzbeat/remoting/netty/NettyRemotingClient.java  |  6 +++++-
 .../hertzbeat/remoting/netty/NettyRemotingServer.java  |  1 -
 .../hertzbeat/warehouse/WarehouseWorkerPool.java       |  4 ++--
 11 files changed, 41 insertions(+), 32 deletions(-)

diff --git 
a/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java 
b/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java
index 78afb1457..560d02362 100644
--- a/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java
+++ b/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java
@@ -18,8 +18,8 @@
 package org.apache.hertzbeat.alert;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -50,11 +50,11 @@ public class AlerterWorkerPool {
                 .setDaemon(true)
                 .setNameFormat("alerter-worker-%d")
                 .build();
-        workerExecutor = new ThreadPoolExecutor(6,
+        workerExecutor = new ThreadPoolExecutor(10,
                 10,
                 10,
                 TimeUnit.SECONDS,
-                new SynchronousQueue<>(),
+                new LinkedBlockingQueue<>(),
                 threadFactory,
                 new ThreadPoolExecutor.AbortPolicy());
     }
@@ -69,10 +69,10 @@ public class AlerterWorkerPool {
                 .setNameFormat("notify-worker-%d")
                 .build();
         notifyExecutor = new ThreadPoolExecutor(6,
-                10,
+                6,
                 10,
                 TimeUnit.SECONDS,
-                new SynchronousQueue<>(),
+                new LinkedBlockingQueue<>(),
                 threadFactory,
                 new ThreadPoolExecutor.AbortPolicy());
     }
diff --git 
a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
 
b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
index ec75b4c51..858c9e20b 100644
--- 
a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
+++ 
b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
@@ -69,6 +69,8 @@ import org.springframework.util.CollectionUtils;
 public class CalculateAlarm {
 
     private static final String SYSTEM_VALUE_ROW_COUNT = 
"system_value_row_count";
+    
+    private static final int CALCULATE_THREADS = 3;
 
     /**
      * The alarm in the process is triggered
@@ -129,9 +131,9 @@ public class CalculateAlarm {
                 }
             }
         };
-        workerPool.executeJob(runnable);
-        workerPool.executeJob(runnable);
-        workerPool.executeJob(runnable);
+        for (int i = 0; i < CALCULATE_THREADS; i++) {
+            workerPool.executeJob(runnable);
+        }
     }
 
     private void calculate(CollectRep.MetricsData metricsData) {
diff --git 
a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
 
b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
index 68c50f087..73fb391ba 100644
--- 
a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
+++ 
b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
@@ -50,8 +50,10 @@ public class WorkerPool implements DisposableBean {
                 .setDaemon(true)
                 .setNameFormat("collect-worker-%d")
                 .build();
-        workerExecutor = new ThreadPoolExecutor(100,
-                1024,
+        int coreSize = Math.max(2, Runtime.getRuntime().availableProcessors());
+        int maxSize = Runtime.getRuntime().availableProcessors() * 16;
+        workerExecutor = new ThreadPoolExecutor(coreSize,
+                maxSize,
                 10,
                 TimeUnit.SECONDS,
                 new SynchronousQueue<>(),
diff --git 
a/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
 
b/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
index e9d3a75ef..166d812d7 100644
--- 
a/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
+++ 
b/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
@@ -55,7 +55,7 @@ public interface CommonConstants {
     /**
      * Response status code: Incorrect login account password
      */
-    byte MONITOR_LOGIN_FAILED_CODE = 0x05;
+    byte LOGIN_FAILED_CODE = 0x05;
     
     /**
      * Monitoring status 0: Paused, 1: Up, 2: Down
diff --git 
a/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
 
b/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
index 159d048b3..4a231230f 100644
--- 
a/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
+++ 
b/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
@@ -49,7 +49,7 @@ public class CommonThreadPool implements DisposableBean {
                 .setDaemon(true)
                 .setNameFormat("common-worker-%d")
                 .build();
-        workerExecutor = new ThreadPoolExecutor(2,
+        workerExecutor = new ThreadPoolExecutor(1,
                 Integer.MAX_VALUE,
                 10,
                 TimeUnit.SECONDS,
diff --git 
a/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java
 
b/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java
index 211b3ed28..daa09493a 100644
--- 
a/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java
+++ 
b/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java
@@ -17,15 +17,6 @@
 
 package org.apache.hertzbeat.common.support;
 
-import java.lang.reflect.Field;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -33,6 +24,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import java.lang.reflect.Field;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 /**
  * test for {@link CommonThreadPool}
@@ -101,7 +99,7 @@ class CommonThreadPoolTest {
         ThreadPoolExecutor workerExecutor = (ThreadPoolExecutor) 
workerExecutorField.get(pool);
 
         assertNotNull(workerExecutor);
-        assertEquals(2, workerExecutor.getCorePoolSize());
+        assertEquals(1, workerExecutor.getCorePoolSize());
         assertEquals(Integer.MAX_VALUE, workerExecutor.getMaximumPoolSize());
         assertEquals(10, workerExecutor.getKeepAliveTime(TimeUnit.SECONDS));
         assertTrue(workerExecutor.getQueue() instanceof SynchronousQueue);
diff --git 
a/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java
 
b/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java
index f01731cab..23ae16d77 100644
--- 
a/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java
+++ 
b/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java
@@ -17,8 +17,9 @@
 
 package org.apache.hertzbeat.manager.controller;
 
-import static 
org.apache.hertzbeat.common.constants.CommonConstants.MONITOR_LOGIN_FAILED_CODE;
+import static 
org.apache.hertzbeat.common.constants.CommonConstants.LOGIN_FAILED_CODE;
 import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
+import io.jsonwebtoken.ExpiredJwtException;
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.tags.Tag;
@@ -58,7 +59,7 @@ public class AccountController {
         try {
             return 
ResponseEntity.ok(Message.success(accountService.authGetToken(loginDto)));
         } catch (AuthenticationException e) {
-            return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, 
e.getMessage()));
+            return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, 
e.getMessage()));
         }
     }
 
@@ -70,10 +71,13 @@ public class AccountController {
         try {
             return 
ResponseEntity.ok(Message.success(accountService.refreshToken(refreshToken)));
         } catch (AuthenticationException e) {
-            return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, 
e.getMessage()));
+            return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, 
e.getMessage()));
+        } catch (ExpiredJwtException expiredJwtException) {
+            log.warn("{}", expiredJwtException.getMessage());
+            return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, "Refresh 
Token Expired"));
         } catch (Exception e) {
             log.error("Exception occurred during token refresh: {}", 
e.getClass().getName(), e);
-            return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, 
"Refresh Token Expired or Error"));
+            return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, "Refresh 
Token Error"));
         }
     }
 }
diff --git 
a/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java
 
b/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java
index cb50ea0e1..6a3befb0b 100644
--- 
a/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java
+++ 
b/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java
@@ -85,7 +85,7 @@ class AccountControllerTest {
         
this.mockMvc.perform(MockMvcRequestBuilders.post("/api/account/auth/form")
                         .contentType(MediaType.APPLICATION_JSON)
                         .content(JsonUtil.toJson(loginDto)))
-                .andExpect(jsonPath("$.code").value((int) 
CommonConstants.MONITOR_LOGIN_FAILED_CODE))
+                .andExpect(jsonPath("$.code").value((int) 
CommonConstants.LOGIN_FAILED_CODE))
                 .andReturn();
     }
 
@@ -95,7 +95,7 @@ class AccountControllerTest {
         Mockito.when(accountService.refreshToken(refreshToken)).thenThrow(new 
AuthenticationException());
         
this.mockMvc.perform(MockMvcRequestBuilders.get("/api/account/auth/refresh/{refreshToken}",
                         refreshToken))
-                .andExpect(jsonPath("$.code").value((int) 
CommonConstants.MONITOR_LOGIN_FAILED_CODE))
+                .andExpect(jsonPath("$.code").value((int) 
CommonConstants.LOGIN_FAILED_CODE))
                 .andReturn();
     }
 }
diff --git 
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
index 779be270f..a01c7fab8 100644
--- 
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
@@ -50,6 +50,8 @@ import org.apache.hertzbeat.remoting.event.NettyEventListener;
 @Slf4j
 public class NettyRemotingClient extends NettyRemotingAbstract implements 
RemotingClient {
 
+    private static final int DEFAULT_WORKER_THREAD_NUM = Math.min(4, 
Runtime.getRuntime().availableProcessors());
+    
     private final NettyClientConfig nettyClientConfig;
 
     private final CommonThreadPool threadPool;
@@ -79,7 +81,9 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                     .setDaemon(true)
                     .setNameFormat("netty-client-worker-%d")
                     .build();
-            this.workerGroup = new NioEventLoopGroup(threadFactory);
+            String envThreadNum = 
System.getProperty("hertzbeat.client.worker.thread.num"); 
+            int workerThreadNum = envThreadNum != null ? 
Integer.parseInt(envThreadNum) : DEFAULT_WORKER_THREAD_NUM;
+            this.workerGroup = new NioEventLoopGroup(workerThreadNum, 
threadFactory);
             this.bootstrap.group(workerGroup)
                     .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
this.nettyClientConfig.getConnectTimeoutMillis())
                     .channel(NioSocketChannel.class)
diff --git 
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
index a25b9b901..48062fadb 100644
--- 
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
@@ -77,7 +77,6 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
     @Override
     public void start() {
-
         this.threadPool.execute(() -> {
             int port = this.nettyServerConfig.getPort();
             ThreadFactory bossThreadFactory = new ThreadFactoryBuilder()
diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
index 1c3024554..14b77f9ba 100644
--- 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
@@ -48,8 +48,8 @@ public class WarehouseWorkerPool {
                 .setDaemon(true)
                 .setNameFormat("warehouse-worker-%d")
                 .build();
-        workerExecutor = new ThreadPoolExecutor(6,
-                10,
+        workerExecutor = new ThreadPoolExecutor(2,
+                Integer.MAX_VALUE,
                 10,
                 TimeUnit.SECONDS,
                 new SynchronousQueue<>(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to