This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch library-batch-queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 7eb9ed42de1695ba1e083c4d495d6cd72eed6206
Author: Wu Sheng <[email protected]>
AuthorDate: Sun Feb 15 10:23:37 2026 +0800

    Add named ThreadFactory to all anonymous Executors pool threads
    
    Replace default pool-N-thread-M naming with descriptive thread names
    across all Executors.newXxx() calls for easier thread dump analysis.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 docs/en/changes/changes.md                         | 31 +++++++++++++++++++++-
 .../oap/server/core/alarm/provider/AlarmCore.java  |  2 +-
 .../oap/server/core/cache/CacheUpdateTimer.java    |  2 +-
 .../core/config/group/EndpointNameGrouping.java    |  2 +-
 .../server/core/hierarchy/HierarchyService.java    |  2 +-
 .../ebpf/analyze/EBPFProfilingAnalyzer.java        |  6 ++++-
 .../core/remote/client/RemoteClientManager.java    |  2 +-
 .../oap/server/core/storage/PersistenceTimer.java  |  8 ++++--
 .../core/storage/ttl/DataTTLKeeperTimer.java       |  2 +-
 .../server/core/watermark/WatermarkWatcher.java    |  2 +-
 .../checker/provider/HealthCheckerProvider.java    |  2 +-
 .../v1/client/grpc/channel/ChannelManager.java     |  3 ++-
 .../oap/server/library/client/grpc/GRPCClient.java |  3 ++-
 .../library/util/MultipleFilesChangeMonitor.java   |  3 ++-
 14 files changed, 55 insertions(+), 15 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 1b067e364d..2eddb03c25 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -13,7 +13,8 @@
 * Upgrade Groovy to 5.0.3 in OAP backend.
 * Bump up nodejs to v24.13.0 for the latest UI(booster-ui) compiling.
 * Add `library-batch-queue` module — a partitioned, self-draining queue with 
type-based dispatch,
-  adaptive partitioning, and idle backoff. Designed to replace DataCarrier in 
high-fan-out scenarios.
+  adaptive partitioning, idle backoff, and throughput-weighted drain 
rebalancing (`DrainBalancer`).
+  Designed to replace DataCarrier in high-fan-out scenarios.
 * Replace DataCarrier with BatchQueue for L1 metrics aggregation, L2 metrics 
persistence, TopN persistence,
   all three exporters (gRPC metrics, Kafka trace, Kafka log), and gRPC remote 
client.
   All metric types (OAL + MAL) now share unified queues instead of separate 
OAL/MAL pools.
@@ -48,6 +49,34 @@
 * Replace BanyanDB Java client with native implementation.
 * Remove `bydb.dependencies.properties` and set the compatible BanyanDB API 
version number in `${SW_STORAGE_BANYANDB_COMPATIBLE_SERVER_API_VERSIONS}`.
 * Fix trace profiling query time range condition.
+* Add named ThreadFactory to all `Executors.newXxx()` calls to replace 
anonymous `pool-N-thread-M` thread names
+  with meaningful names for easier thread dump analysis. Complete OAP server 
thread inventory
+  (counts on an 8-core machine, exporters and JDBC are optional):
+
+  | Catalog | Thread Name | Count | Policy | Partitions |
+  |---------|-------------|-------|--------|------------|
+  | Data Pipeline | `BatchQueue-METRICS_L1_AGGREGATION-N` | 8 | 
`cpuCores(1.0)` | ~460 adaptive |
+  | Data Pipeline | `BatchQueue-METRICS_L2_PERSISTENCE-N` | 3 | 
`cpuCoresWithBase(1, 0.25)` | ~460 adaptive |
+  | Data Pipeline | `BatchQueue-TOPN_PERSISTENCE-N` | 1 | `fixed(1)` | ~4 
adaptive |
+  | Data Pipeline | `BatchQueue-GRPC_REMOTE_{host}_{port}-N` | 1 per peer | 
`fixed(1)` | `fixed(1)` |
+  | Data Pipeline | `BatchQueue-EXPORTER_GRPC_METRICS-N` | 1 | `fixed(1)` | 
`fixed(1)` |
+  | Data Pipeline | `BatchQueue-EXPORTER_KAFKA_TRACE-N` | 1 | `fixed(1)` | 
`fixed(1)` |
+  | Data Pipeline | `BatchQueue-EXPORTER_KAFKA_LOG-N` | 1 | `fixed(1)` | 
`fixed(1)` |
+  | Data Pipeline | `BatchQueue-JDBC_ASYNC_BATCH_PERSISTENT-N` | 4 
(configurable) | `fixed(N)` | `fixed(N)` |
+  | Scheduler | `RemoteClientManager` | 1 | scheduled | — |
+  | Scheduler | `PersistenceTimer` | 1 | scheduled | — |
+  | Scheduler | `PersistenceTimer-prepare-N` | 2 (configurable) | fixed pool | 
— |
+  | Scheduler | `DataTTLKeeper` | 1 | scheduled | — |
+  | Scheduler | `CacheUpdateTimer` | 1 | scheduled | — |
+  | Scheduler | `HierarchyAutoMatching` | 1 | scheduled | — |
+  | Scheduler | `WatermarkWatcher` | 1 | scheduled | — |
+  | Scheduler | `AlarmCore` | 1 | scheduled | — |
+  | Scheduler | `HealthChecker` | 1 | scheduled | — |
+  | Scheduler | `EndpointUriRecognition` | 1 (conditional) | scheduled | — |
+  | Scheduler | `FileChangeMonitor` | 1 | scheduled | — |
+  | Scheduler | `BanyanDB-ChannelManager` | 1 | scheduled | — |
+  | Scheduler | `GRPCClient-HealthCheck-{host}:{port}` | 1 per client | 
scheduled | — |
+  | Scheduler | `EBPFProfiling-N` | configurable | fixed pool | — |
 * Fix BanyanDB time range overflow in profile thread snapshot query.
 * `BrowserErrorLog`, OAP Server generated UUID to replace the original client 
side ID, because Browser scripts can't guarantee generated IDs are globally 
unique.
 * MQE: fix multiple labeled metric query and ensure no results are returned if 
no label value combinations match.
diff --git 
a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java
 
b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java
index 610ca3f30a..e20c14ec42 100644
--- 
a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java
+++ 
b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java
@@ -70,7 +70,7 @@ public class AlarmCore {
     public void start(List<AlarmCallback> allCallbacks) {
         LocalDateTime now = LocalDateTime.now();
         lastExecuteTime = now;
-        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> 
{
+        Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"AlarmCore")).scheduleAtFixedRate(() -> {
             try {
                 final List<AlarmMessage> alarmMessageList = new 
ArrayList<>(30);
                 LocalDateTime checkTime = LocalDateTime.now();
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
index 1344cf7f22..06eb42e2d9 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
@@ -59,7 +59,7 @@ public enum CacheUpdateTimer {
 
         final long timeInterval = 10;
 
-        Executors.newSingleThreadScheduledExecutor()
+        Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"CacheUpdateTimer"))
                  .scheduleAtFixedRate(
                      new RunnableWithExceptionProtection(() -> 
update(moduleDefineHolder), t -> log
                          .error("Cache update failure.", t)), 1, timeInterval, 
TimeUnit.SECONDS);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java
index 81de84d04f..c4ff5e714a 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java
@@ -210,7 +210,7 @@ public class EndpointNameGrouping implements 
EndpointNameGroupService {
         }
         this.quickUriGroupingRule = new QuickUriGroupingRule();
         HTTPUrlRecognitionConfig config = this.httpUrlRecognitionConfig;
-        Executors.newSingleThreadScheduledExecutor()
+        Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"EndpointUriRecognition"))
             .scheduleWithFixedDelay(
                 new RunnableWithExceptionProtection(
                     () -> {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java
index f1ea6b17de..783d5bc3fb 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java
@@ -136,7 +136,7 @@ public class HierarchyService implements 
org.apache.skywalking.oap.server.librar
         if (!this.isEnableHierarchy) {
             return;
         }
-        Executors.newSingleThreadScheduledExecutor()
+        Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"HierarchyAutoMatching"))
                  .scheduleWithFixedDelay(
                      new 
RunnableWithExceptionProtection(this::autoMatchingServiceRelation, t -> 
log.error(
                          "Scheduled auto matching service hierarchy from 
service traffic failure.", t)), 30, 20, TimeUnit.SECONDS);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java
index 379330cd40..496f1b9b8f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java
@@ -40,6 +40,7 @@ import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -60,7 +61,10 @@ public class EBPFProfilingAnalyzer {
     public EBPFProfilingAnalyzer(ModuleManager moduleManager, int 
maxDurationOfQuery, int fetchDataThreadPoolSize) {
         this.moduleManager = moduleManager;
         this.maxQueryTimeoutInSecond = maxDurationOfQuery;
-        this.fetchDataThreadPool = 
Executors.newFixedThreadPool(fetchDataThreadPoolSize);
+        final AtomicInteger fetchThreadSeq = new AtomicInteger(0);
+        this.fetchDataThreadPool = Executors.newFixedThreadPool(
+            fetchDataThreadPoolSize,
+            r -> new Thread(r, "EBPFProfiling-" + 
fetchThreadSeq.incrementAndGet()));
     }
 
     /**
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index f25ad05d29..3f633f6bb5 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -94,7 +94,7 @@ public class RemoteClientManager implements Service, 
ClusterWatcher {
 
     public void start() {
         Optional.ofNullable(sslContext).ifPresent(DynamicSslContext::start);
-        Executors.newSingleThreadScheduledExecutor()
+        Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"RemoteClientManager"))
                  .scheduleWithFixedDelay(new 
RunnableWithExceptionProtection(this::refresh, t -> log.error(
                      "Scheduled refresh Remote Clients failure.", t)), 1, 10, 
TimeUnit.SECONDS);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index f191057370..33460f4153 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
 import 
org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
@@ -89,9 +90,12 @@ public enum PersistenceTimer {
             0.5, 1, 3, 5, 10, 15, 20, 25, 50, 120
         );
 
-        prepareExecutorService = 
Executors.newFixedThreadPool(moduleConfig.getPrepareThreads());
+        final AtomicInteger prepareThreadSeq = new AtomicInteger(0);
+        prepareExecutorService = Executors.newFixedThreadPool(
+            moduleConfig.getPrepareThreads(),
+            r -> new Thread(r, "PersistenceTimer-prepare-" + 
prepareThreadSeq.incrementAndGet()));
         if (!isStarted) {
-            Executors.newSingleThreadScheduledExecutor()
+            Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"PersistenceTimer"))
                      .scheduleWithFixedDelay(
                          new RunnableWithExceptionProtection(
                              () -> extractDataAndSave(batchDAO).join(),
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 9d7d3627ef..27dc5bef2f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -60,7 +60,7 @@ public enum DataTTLKeeperTimer {
         this.clusterNodesQuery = 
moduleManager.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
         this.moduleConfig = moduleConfig;
 
-        Executors.newSingleThreadScheduledExecutor()
+        Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"DataTTLKeeper"))
                  .scheduleAtFixedRate(
                      new RunnableWithExceptionProtection(
                          this::delete,
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java
index db5c75a718..8f8104f7bd 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java
@@ -80,7 +80,7 @@ public class WatermarkWatcher {
                                                           
.getService(MetricsCreator.class);
         this.addListener(WatermarkGRPCInterceptor.INSTANCE);
 
-        Executors.newSingleThreadScheduledExecutor()
+        Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"WatermarkWatcher"))
                  .scheduleWithFixedDelay(this::watch, 0, 10, TimeUnit.SECONDS);
     }
 
diff --git 
a/oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java
 
b/oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java
index 6bec3853ad..13239c2475 100644
--- 
a/oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java
+++ 
b/oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java
@@ -80,7 +80,7 @@ public class HealthCheckerProvider extends ModuleProvider {
 
     @Override public void prepare() throws ServiceNotProvidedException, 
ModuleStartException {
         score.set(-1);
-        ses = Executors.newSingleThreadScheduledExecutor();
+        ses = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"HealthChecker"));
         healthQueryService = new HealthQueryService(score, details);
         this.registerServiceImplementation(HealthQueryService.class, 
healthQueryService);
     }
diff --git 
a/oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java
 
b/oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java
index 89a8d75685..7ffbbb5042 100644
--- 
a/oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java
+++ 
b/oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java
@@ -57,7 +57,8 @@ public class ChannelManager extends ManagedChannel {
 
     public static ChannelManager create(ChannelManagerSettings settings, 
ChannelFactory channelFactory)
             throws IOException {
-        return new ChannelManager(settings, channelFactory, 
Executors.newSingleThreadScheduledExecutor());
+        return new ChannelManager(settings, channelFactory,
+            Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"BanyanDB-ChannelManager")));
     }
 
     ChannelManager(ChannelManagerSettings settings, ChannelFactory 
channelFactory, ScheduledExecutorService executor) throws IOException {
diff --git 
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
index 1ab0ee3e33..679a7e47d3 100644
--- 
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
+++ 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
@@ -167,7 +167,8 @@ public class GRPCClient implements Client, HealthCheckable {
 
     private void checkHealth() {
         if (healthCheckExecutor == null) {
-            healthCheckExecutor = Executors.newSingleThreadScheduledExecutor();
+            healthCheckExecutor = Executors.newSingleThreadScheduledExecutor(
+                r -> new Thread(r, "GRPCClient-HealthCheck-" + host + ":" + 
port));
             healthCheckExecutor.scheduleAtFixedRate(healthCheckRunnable, 
initialDelay, period, TimeUnit.SECONDS
             );
         }
diff --git 
a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java
 
b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java
index ddf375d6c3..202ac9b0e0 100644
--- 
a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java
+++ 
b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java
@@ -134,7 +134,8 @@ public class MultipleFilesChangeMonitor {
         SCHEDULER_CHANGE_LOCK.lock();
         try {
             if (FILE_MONITOR_TASK_SCHEDULER == null) {
-                FILE_MONITOR_TASK_SCHEDULER = 
Executors.newSingleThreadScheduledExecutor()
+                FILE_MONITOR_TASK_SCHEDULER = 
Executors.newSingleThreadScheduledExecutor(
+                                                           r -> new Thread(r, 
"FileChangeMonitor"))
                                                        .scheduleAtFixedRate(
                                                            
MultipleFilesChangeMonitor::scanChanges, 1, 200,
                                                            
TimeUnit.MILLISECONDS

Reply via email to