This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 049c0937b [improve] update thread pool nums policy (#2606)
049c0937b is described below
commit 049c0937bc89f2ce2ba800c5abd62f8a4748ca9b
Author: tomsun28 <[email protected]>
AuthorDate: Sat Aug 31 16:07:36 2024 +0800
[improve] update thread pool nums policy (#2606)
Signed-off-by: tomsun28 <[email protected]>
Co-authored-by: shown <[email protected]>
Co-authored-by: Calvin <[email protected]>
---
.../org/apache/hertzbeat/alert/AlerterWorkerPool.java | 10 +++++-----
.../hertzbeat/alert/calculate/CalculateAlarm.java | 8 +++++---
.../hertzbeat/collector/dispatch/WorkerPool.java | 6 ++++--
.../hertzbeat/common/constants/CommonConstants.java | 2 +-
.../hertzbeat/common/support/CommonThreadPool.java | 2 +-
.../hertzbeat/common/support/CommonThreadPoolTest.java | 18 ++++++++----------
.../manager/controller/AccountController.java | 12 ++++++++----
.../manager/controller/AccountControllerTest.java | 4 ++--
.../hertzbeat/remoting/netty/NettyRemotingClient.java | 6 +++++-
.../hertzbeat/remoting/netty/NettyRemotingServer.java | 1 -
.../hertzbeat/warehouse/WarehouseWorkerPool.java | 4 ++--
11 files changed, 41 insertions(+), 32 deletions(-)
diff --git
a/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java
b/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java
index 78afb1457..560d02362 100644
--- a/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java
+++ b/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java
@@ -18,8 +18,8 @@
package org.apache.hertzbeat.alert;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -50,11 +50,11 @@ public class AlerterWorkerPool {
.setDaemon(true)
.setNameFormat("alerter-worker-%d")
.build();
- workerExecutor = new ThreadPoolExecutor(6,
+ workerExecutor = new ThreadPoolExecutor(10,
10,
10,
TimeUnit.SECONDS,
- new SynchronousQueue<>(),
+ new LinkedBlockingQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
@@ -69,10 +69,10 @@ public class AlerterWorkerPool {
.setNameFormat("notify-worker-%d")
.build();
notifyExecutor = new ThreadPoolExecutor(6,
- 10,
+ 6,
10,
TimeUnit.SECONDS,
- new SynchronousQueue<>(),
+ new LinkedBlockingQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
diff --git
a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
index ec75b4c51..858c9e20b 100644
---
a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
+++
b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java
@@ -69,6 +69,8 @@ import org.springframework.util.CollectionUtils;
public class CalculateAlarm {
private static final String SYSTEM_VALUE_ROW_COUNT =
"system_value_row_count";
+
+ private static final int CALCULATE_THREADS = 3;
/**
* The alarm in the process is triggered
@@ -129,9 +131,9 @@ public class CalculateAlarm {
}
}
};
- workerPool.executeJob(runnable);
- workerPool.executeJob(runnable);
- workerPool.executeJob(runnable);
+ for (int i = 0; i < CALCULATE_THREADS; i++) {
+ workerPool.executeJob(runnable);
+ }
}
private void calculate(CollectRep.MetricsData metricsData) {
diff --git
a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
index 68c50f087..73fb391ba 100644
---
a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
+++
b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java
@@ -50,8 +50,10 @@ public class WorkerPool implements DisposableBean {
.setDaemon(true)
.setNameFormat("collect-worker-%d")
.build();
- workerExecutor = new ThreadPoolExecutor(100,
- 1024,
+ int coreSize = Math.max(2, Runtime.getRuntime().availableProcessors());
+ int maxSize = Runtime.getRuntime().availableProcessors() * 16;
+ workerExecutor = new ThreadPoolExecutor(coreSize,
+ maxSize,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
diff --git
a/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
b/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
index e9d3a75ef..166d812d7 100644
---
a/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
+++
b/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
@@ -55,7 +55,7 @@ public interface CommonConstants {
/**
* Response status code: Incorrect login account password
*/
- byte MONITOR_LOGIN_FAILED_CODE = 0x05;
+ byte LOGIN_FAILED_CODE = 0x05;
/**
* Monitoring status 0: Paused, 1: Up, 2: Down
diff --git
a/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
b/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
index 159d048b3..4a231230f 100644
---
a/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
+++
b/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java
@@ -49,7 +49,7 @@ public class CommonThreadPool implements DisposableBean {
.setDaemon(true)
.setNameFormat("common-worker-%d")
.build();
- workerExecutor = new ThreadPoolExecutor(2,
+ workerExecutor = new ThreadPoolExecutor(1,
Integer.MAX_VALUE,
10,
TimeUnit.SECONDS,
diff --git
a/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java
b/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java
index 211b3ed28..daa09493a 100644
---
a/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java
+++
b/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java
@@ -17,15 +17,6 @@
package org.apache.hertzbeat.common.support;
-import java.lang.reflect.Field;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -33,6 +24,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import java.lang.reflect.Field;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
/**
* test for {@link CommonThreadPool}
@@ -101,7 +99,7 @@ class CommonThreadPoolTest {
ThreadPoolExecutor workerExecutor = (ThreadPoolExecutor)
workerExecutorField.get(pool);
assertNotNull(workerExecutor);
- assertEquals(2, workerExecutor.getCorePoolSize());
+ assertEquals(1, workerExecutor.getCorePoolSize());
assertEquals(Integer.MAX_VALUE, workerExecutor.getMaximumPoolSize());
assertEquals(10, workerExecutor.getKeepAliveTime(TimeUnit.SECONDS));
assertTrue(workerExecutor.getQueue() instanceof SynchronousQueue);
diff --git
a/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java
b/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java
index f01731cab..23ae16d77 100644
---
a/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java
+++
b/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java
@@ -17,8 +17,9 @@
package org.apache.hertzbeat.manager.controller;
-import static
org.apache.hertzbeat.common.constants.CommonConstants.MONITOR_LOGIN_FAILED_CODE;
+import static
org.apache.hertzbeat.common.constants.CommonConstants.LOGIN_FAILED_CODE;
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
+import io.jsonwebtoken.ExpiredJwtException;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -58,7 +59,7 @@ public class AccountController {
try {
return
ResponseEntity.ok(Message.success(accountService.authGetToken(loginDto)));
} catch (AuthenticationException e) {
- return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE,
e.getMessage()));
+ return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE,
e.getMessage()));
}
}
@@ -70,10 +71,13 @@ public class AccountController {
try {
return
ResponseEntity.ok(Message.success(accountService.refreshToken(refreshToken)));
} catch (AuthenticationException e) {
- return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE,
e.getMessage()));
+ return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE,
e.getMessage()));
+ } catch (ExpiredJwtException expiredJwtException) {
+ log.warn("{}", expiredJwtException.getMessage());
+ return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, "Refresh
Token Expired"));
} catch (Exception e) {
log.error("Exception occurred during token refresh: {}",
e.getClass().getName(), e);
- return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE,
"Refresh Token Expired or Error"));
+ return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, "Refresh
Token Error"));
}
}
}
diff --git
a/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java
b/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java
index cb50ea0e1..6a3befb0b 100644
---
a/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java
+++
b/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java
@@ -85,7 +85,7 @@ class AccountControllerTest {
this.mockMvc.perform(MockMvcRequestBuilders.post("/api/account/auth/form")
.contentType(MediaType.APPLICATION_JSON)
.content(JsonUtil.toJson(loginDto)))
- .andExpect(jsonPath("$.code").value((int)
CommonConstants.MONITOR_LOGIN_FAILED_CODE))
+ .andExpect(jsonPath("$.code").value((int)
CommonConstants.LOGIN_FAILED_CODE))
.andReturn();
}
@@ -95,7 +95,7 @@ class AccountControllerTest {
Mockito.when(accountService.refreshToken(refreshToken)).thenThrow(new
AuthenticationException());
this.mockMvc.perform(MockMvcRequestBuilders.get("/api/account/auth/refresh/{refreshToken}",
refreshToken))
- .andExpect(jsonPath("$.code").value((int)
CommonConstants.MONITOR_LOGIN_FAILED_CODE))
+ .andExpect(jsonPath("$.code").value((int)
CommonConstants.LOGIN_FAILED_CODE))
.andReturn();
}
}
diff --git
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
index 779be270f..a01c7fab8 100644
---
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
@@ -50,6 +50,8 @@ import org.apache.hertzbeat.remoting.event.NettyEventListener;
@Slf4j
public class NettyRemotingClient extends NettyRemotingAbstract implements
RemotingClient {
+ private static final int DEFAULT_WORKER_THREAD_NUM = Math.min(4,
Runtime.getRuntime().availableProcessors());
+
private final NettyClientConfig nettyClientConfig;
private final CommonThreadPool threadPool;
@@ -79,7 +81,9 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
.setDaemon(true)
.setNameFormat("netty-client-worker-%d")
.build();
- this.workerGroup = new NioEventLoopGroup(threadFactory);
+ String envThreadNum =
System.getProperty("hertzbeat.client.worker.thread.num");
+ int workerThreadNum = envThreadNum != null ?
Integer.parseInt(envThreadNum) : DEFAULT_WORKER_THREAD_NUM;
+ this.workerGroup = new NioEventLoopGroup(workerThreadNum,
threadFactory);
this.bootstrap.group(workerGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
this.nettyClientConfig.getConnectTimeoutMillis())
.channel(NioSocketChannel.class)
diff --git
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
index a25b9b901..48062fadb 100644
---
a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
@@ -77,7 +77,6 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public void start() {
-
this.threadPool.execute(() -> {
int port = this.nettyServerConfig.getPort();
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder()
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
index 1c3024554..14b77f9ba 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java
@@ -48,8 +48,8 @@ public class WarehouseWorkerPool {
.setDaemon(true)
.setNameFormat("warehouse-worker-%d")
.build();
- workerExecutor = new ThreadPoolExecutor(6,
- 10,
+ workerExecutor = new ThreadPoolExecutor(2,
+ Integer.MAX_VALUE,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]