This is an automated email from the ASF dual-hosted git repository. gongchao pushed a commit to branch netty-worker in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
commit 773a295f82d2b7facc9e7b24d5b2ddc1825e2d11 Author: tomsun28 <[email protected]> AuthorDate: Sun Aug 25 21:37:56 2024 +0800 [improve] update thread pool nums policy Signed-off-by: tomsun28 <[email protected]> --- .../java/org/apache/hertzbeat/alert/AlerterWorkerPool.java | 10 +++++----- .../org/apache/hertzbeat/alert/calculate/CalculateAlarm.java | 8 +++++--- .../org/apache/hertzbeat/collector/dispatch/WorkerPool.java | 6 ++++-- .../org/apache/hertzbeat/common/support/CommonThreadPool.java | 2 +- .../apache/hertzbeat/remoting/netty/NettyRemotingClient.java | 6 +++++- .../apache/hertzbeat/remoting/netty/NettyRemotingServer.java | 1 - .../org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java | 4 ++-- 7 files changed, 22 insertions(+), 15 deletions(-) diff --git a/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java b/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java index 78afb1457..560d02362 100644 --- a/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java +++ b/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java @@ -18,8 +18,8 @@ package org.apache.hertzbeat.alert; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -50,11 +50,11 @@ public class AlerterWorkerPool { .setDaemon(true) .setNameFormat("alerter-worker-%d") .build(); - workerExecutor = new ThreadPoolExecutor(6, + workerExecutor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, - new SynchronousQueue<>(), + new LinkedBlockingQueue<>(), threadFactory, new ThreadPoolExecutor.AbortPolicy()); } @@ -69,10 +69,10 @@ public class AlerterWorkerPool { .setNameFormat("notify-worker-%d") .build(); notifyExecutor = new ThreadPoolExecutor(6, - 10, + 6, 10, TimeUnit.SECONDS, - new SynchronousQueue<>(), + new LinkedBlockingQueue<>(), threadFactory, new ThreadPoolExecutor.AbortPolicy()); } diff --git a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java index ec75b4c51..858c9e20b 100644 --- a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java +++ b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java @@ -69,6 +69,8 @@ import org.springframework.util.CollectionUtils; public class CalculateAlarm { private static final String SYSTEM_VALUE_ROW_COUNT = "system_value_row_count"; + + private static final int CALCULATE_THREADS = 3; /** * The alarm in the process is triggered @@ -129,9 +131,9 @@ public class CalculateAlarm { } } }; - workerPool.executeJob(runnable); - workerPool.executeJob(runnable); - workerPool.executeJob(runnable); + for (int i = 0; i < CALCULATE_THREADS; i++) { + workerPool.executeJob(runnable); + } } private void calculate(CollectRep.MetricsData metricsData) { diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java index 68c50f087..73fb391ba 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java @@ -50,8 +50,10 @@ public class WorkerPool implements DisposableBean { .setDaemon(true) .setNameFormat("collect-worker-%d") .build(); - workerExecutor = new ThreadPoolExecutor(100, - 1024, + int coreSize = Math.max(2, Runtime.getRuntime().availableProcessors()); + int maxSize = Runtime.getRuntime().availableProcessors() * 16; + workerExecutor = new ThreadPoolExecutor(coreSize, + maxSize, 10, TimeUnit.SECONDS, new SynchronousQueue<>(), diff --git a/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java b/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java index 159d048b3..4a231230f 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java +++ b/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java @@ -49,7 +49,7 @@ public class CommonThreadPool implements DisposableBean { .setDaemon(true) .setNameFormat("common-worker-%d") .build(); - workerExecutor = new ThreadPoolExecutor(2, + workerExecutor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, diff --git a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java index 779be270f..a01c7fab8 100644 --- a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java @@ -50,6 +50,8 @@ import org.apache.hertzbeat.remoting.event.NettyEventListener; @Slf4j public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { + private static final int DEFAULT_WORKER_THREAD_NUM = Math.min(4, Runtime.getRuntime().availableProcessors()); + private final NettyClientConfig nettyClientConfig; private final CommonThreadPool threadPool; @@ -79,7 +81,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti .setDaemon(true) .setNameFormat("netty-client-worker-%d") .build(); - this.workerGroup = new NioEventLoopGroup(threadFactory); + String envThreadNum = System.getProperty("hertzbeat.client.worker.thread.num"); + int workerThreadNum = envThreadNum != null ? Integer.parseInt(envThreadNum) : DEFAULT_WORKER_THREAD_NUM; + this.workerGroup = new NioEventLoopGroup(workerThreadNum, threadFactory); this.bootstrap.group(workerGroup) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.nettyClientConfig.getConnectTimeoutMillis()) .channel(NioSocketChannel.class) diff --git a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java index a25b9b901..48062fadb 100644 --- a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java @@ -77,7 +77,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void start() { - this.threadPool.execute(() -> { int port = this.nettyServerConfig.getPort(); ThreadFactory bossThreadFactory = new ThreadFactoryBuilder() diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java index 1c3024554..14b77f9ba 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java @@ -48,8 +48,8 @@ public class WarehouseWorkerPool { .setDaemon(true) .setNameFormat("warehouse-worker-%d") .build(); - workerExecutor = new ThreadPoolExecutor(6, - 10, + workerExecutor = new ThreadPoolExecutor(2, + Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<>(), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
