This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch library-batch-queue in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 7eb9ed42de1695ba1e083c4d495d6cd72eed6206 Author: Wu Sheng <[email protected]> AuthorDate: Sun Feb 15 10:23:37 2026 +0800 Add named ThreadFactory to all anonymous Executors pool threads Replace default pool-N-thread-M naming with descriptive thread names across all Executors.newXxx() calls for easier thread dump analysis. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- docs/en/changes/changes.md | 31 +++++++++++++++++++++- .../oap/server/core/alarm/provider/AlarmCore.java | 2 +- .../oap/server/core/cache/CacheUpdateTimer.java | 2 +- .../core/config/group/EndpointNameGrouping.java | 2 +- .../server/core/hierarchy/HierarchyService.java | 2 +- .../ebpf/analyze/EBPFProfilingAnalyzer.java | 6 ++++- .../core/remote/client/RemoteClientManager.java | 2 +- .../oap/server/core/storage/PersistenceTimer.java | 8 ++++-- .../core/storage/ttl/DataTTLKeeperTimer.java | 2 +- .../server/core/watermark/WatermarkWatcher.java | 2 +- .../checker/provider/HealthCheckerProvider.java | 2 +- .../v1/client/grpc/channel/ChannelManager.java | 3 ++- .../oap/server/library/client/grpc/GRPCClient.java | 3 ++- .../library/util/MultipleFilesChangeMonitor.java | 3 ++- 14 files changed, 55 insertions(+), 15 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 1b067e364d..2eddb03c25 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -13,7 +13,8 @@ * Upgrade Groovy to 5.0.3 in OAP backend. * Bump up nodejs to v24.13.0 for the latest UI(booster-ui) compiling. * Add `library-batch-queue` module — a partitioned, self-draining queue with type-based dispatch, - adaptive partitioning, and idle backoff. Designed to replace DataCarrier in high-fan-out scenarios. + adaptive partitioning, idle backoff, and throughput-weighted drain rebalancing (`DrainBalancer`). + Designed to replace DataCarrier in high-fan-out scenarios. * Replace DataCarrier with BatchQueue for L1 metrics aggregation, L2 metrics persistence, TopN persistence, all three exporters (gRPC metrics, Kafka trace, Kafka log), and gRPC remote client. All metric types (OAL + MAL) now share unified queues instead of separate OAL/MAL pools. @@ -48,6 +49,34 @@ * Replace BanyanDB Java client with native implementation. * Remove `bydb.dependencies.properties` and set the compatible BanyanDB API version number in `${SW_STORAGE_BANYANDB_COMPATIBLE_SERVER_API_VERSIONS}`. * Fix trace profiling query time range condition. +* Add named ThreadFactory to all `Executors.newXxx()` calls to replace anonymous `pool-N-thread-M` thread names + with meaningful names for easier thread dump analysis. Complete OAP server thread inventory + (counts on an 8-core machine, exporters and JDBC are optional): + + | Catalog | Thread Name | Count | Policy | Partitions | + |---------|-------------|-------|--------|------------| + | Data Pipeline | `BatchQueue-METRICS_L1_AGGREGATION-N` | 8 | `cpuCores(1.0)` | ~460 adaptive | + | Data Pipeline | `BatchQueue-METRICS_L2_PERSISTENCE-N` | 3 | `cpuCoresWithBase(1, 0.25)` | ~460 adaptive | + | Data Pipeline | `BatchQueue-TOPN_PERSISTENCE-N` | 1 | `fixed(1)` | ~4 adaptive | + | Data Pipeline | `BatchQueue-GRPC_REMOTE_{host}_{port}-N` | 1 per peer | `fixed(1)` | `fixed(1)` | + | Data Pipeline | `BatchQueue-EXPORTER_GRPC_METRICS-N` | 1 | `fixed(1)` | `fixed(1)` | + | Data Pipeline | `BatchQueue-EXPORTER_KAFKA_TRACE-N` | 1 | `fixed(1)` | `fixed(1)` | + | Data Pipeline | `BatchQueue-EXPORTER_KAFKA_LOG-N` | 1 | `fixed(1)` | `fixed(1)` | + | Data Pipeline | `BatchQueue-JDBC_ASYNC_BATCH_PERSISTENT-N` | 4 (configurable) | `fixed(N)` | `fixed(N)` | + | Scheduler | `RemoteClientManager` | 1 | scheduled | — | + | Scheduler | `PersistenceTimer` | 1 | scheduled | — | + | Scheduler | `PersistenceTimer-prepare-N` | 2 (configurable) | fixed pool | — | + | Scheduler | `DataTTLKeeper` | 1 | scheduled | — | + | Scheduler | `CacheUpdateTimer` | 1 | scheduled | — | + | Scheduler | `HierarchyAutoMatching` | 1 | scheduled | — | + | Scheduler | `WatermarkWatcher` | 1 | scheduled | — | + | Scheduler | `AlarmCore` | 1 | scheduled | — | + | Scheduler | `HealthChecker` | 1 | scheduled | — | + | Scheduler | `EndpointUriRecognition` | 1 (conditional) | scheduled | — | + | Scheduler | `FileChangeMonitor` | 1 | scheduled | — | + | Scheduler | `BanyanDB-ChannelManager` | 1 | scheduled | — | + | Scheduler | `GRPCClient-HealthCheck-{host}:{port}` | 1 per client | scheduled | — | + | Scheduler | `EBPFProfiling-N` | configurable | fixed pool | — | * Fix BanyanDB time range overflow in profile thread snapshot query. * `BrowserErrorLog`, OAP Server generated UUID to replace the original client side ID, because Browser scripts can't guarantee generated IDs are globally unique. * MQE: fix multiple labeled metric query and ensure no results are returned if no label value combinations match. diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java index 610ca3f30a..e20c14ec42 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java @@ -70,7 +70,7 @@ public class AlarmCore { public void start(List<AlarmCallback> allCallbacks) { LocalDateTime now = LocalDateTime.now(); lastExecuteTime = now; - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "AlarmCore")).scheduleAtFixedRate(() -> { try { final List<AlarmMessage> alarmMessageList = new ArrayList<>(30); LocalDateTime checkTime = LocalDateTime.now(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java index 1344cf7f22..06eb42e2d9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java @@ -59,7 +59,7 @@ public enum CacheUpdateTimer { final long timeInterval = 10; - Executors.newSingleThreadScheduledExecutor() + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "CacheUpdateTimer")) .scheduleAtFixedRate( new RunnableWithExceptionProtection(() -> update(moduleDefineHolder), t -> log .error("Cache update failure.", t)), 1, timeInterval, TimeUnit.SECONDS); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java index 81de84d04f..c4ff5e714a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java @@ -210,7 +210,7 @@ public class EndpointNameGrouping implements EndpointNameGroupService { } this.quickUriGroupingRule = new QuickUriGroupingRule(); HTTPUrlRecognitionConfig config = this.httpUrlRecognitionConfig; - Executors.newSingleThreadScheduledExecutor() + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "EndpointUriRecognition")) .scheduleWithFixedDelay( new RunnableWithExceptionProtection( () -> { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java index f1ea6b17de..783d5bc3fb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java @@ -136,7 +136,7 @@ public class HierarchyService implements org.apache.skywalking.oap.server.librar if (!this.isEnableHierarchy) { return; } - Executors.newSingleThreadScheduledExecutor() + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "HierarchyAutoMatching")) .scheduleWithFixedDelay( new RunnableWithExceptionProtection(this::autoMatchingServiceRelation, t -> log.error( "Scheduled auto matching service hierarchy from service traffic failure.", t)), 30, 20, TimeUnit.SECONDS); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java index 379330cd40..496f1b9b8f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java @@ -40,6 +40,7 @@ import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -60,7 +61,10 @@ public class EBPFProfilingAnalyzer { public EBPFProfilingAnalyzer(ModuleManager moduleManager, int maxDurationOfQuery, int fetchDataThreadPoolSize) { this.moduleManager = moduleManager; this.maxQueryTimeoutInSecond = maxDurationOfQuery; - this.fetchDataThreadPool = Executors.newFixedThreadPool(fetchDataThreadPoolSize); + final AtomicInteger fetchThreadSeq = new AtomicInteger(0); + this.fetchDataThreadPool = Executors.newFixedThreadPool( + fetchDataThreadPoolSize, + r -> new Thread(r, "EBPFProfiling-" + fetchThreadSeq.incrementAndGet())); } /** diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java index f25ad05d29..3f633f6bb5 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java @@ -94,7 +94,7 @@ public class RemoteClientManager implements Service, ClusterWatcher { public void start() { Optional.ofNullable(sslContext).ifPresent(DynamicSslContext::start); - Executors.newSingleThreadScheduledExecutor() + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "RemoteClientManager")) .scheduleWithFixedDelay(new RunnableWithExceptionProtection(this::refresh, t -> log.error( "Scheduled refresh Remote Clients failure.", t)), 1, 10, TimeUnit.SECONDS); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java index f191057370..33460f4153 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.CoreModuleConfig; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; @@ -89,9 +90,12 @@ public enum PersistenceTimer { 0.5, 1, 3, 5, 10, 15, 20, 25, 50, 120 ); - prepareExecutorService = Executors.newFixedThreadPool(moduleConfig.getPrepareThreads()); + final AtomicInteger prepareThreadSeq = new AtomicInteger(0); + prepareExecutorService = Executors.newFixedThreadPool( + moduleConfig.getPrepareThreads(), + r -> new Thread(r, "PersistenceTimer-prepare-" + prepareThreadSeq.incrementAndGet())); if (!isStarted) { - Executors.newSingleThreadScheduledExecutor() + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "PersistenceTimer")) .scheduleWithFixedDelay( new RunnableWithExceptionProtection( () -> extractDataAndSave(batchDAO).join(), diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java index 9d7d3627ef..27dc5bef2f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java @@ -60,7 +60,7 @@ public enum DataTTLKeeperTimer { this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class); this.moduleConfig = moduleConfig; - Executors.newSingleThreadScheduledExecutor() + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "DataTTLKeeper")) .scheduleAtFixedRate( new RunnableWithExceptionProtection( this::delete, diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java index db5c75a718..8f8104f7bd 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java @@ -80,7 +80,7 @@ public class WatermarkWatcher { .getService(MetricsCreator.class); this.addListener(WatermarkGRPCInterceptor.INSTANCE); - Executors.newSingleThreadScheduledExecutor() + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "WatermarkWatcher")) .scheduleWithFixedDelay(this::watch, 0, 10, TimeUnit.SECONDS); } diff --git a/oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java b/oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java index 6bec3853ad..13239c2475 100644 --- a/oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java +++ b/oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java @@ -80,7 +80,7 @@ public class HealthCheckerProvider extends ModuleProvider { @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { score.set(-1); - ses = Executors.newSingleThreadScheduledExecutor(); + ses = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "HealthChecker")); healthQueryService = new HealthQueryService(score, details); this.registerServiceImplementation(HealthQueryService.class, healthQueryService); } diff --git a/oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java b/oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java index 89a8d75685..7ffbbb5042 100644 --- a/oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java +++ b/oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java @@ -57,7 +57,8 @@ public class ChannelManager extends ManagedChannel { public static ChannelManager create(ChannelManagerSettings settings, ChannelFactory channelFactory) throws IOException { - return new ChannelManager(settings, channelFactory, Executors.newSingleThreadScheduledExecutor()); + return new ChannelManager(settings, channelFactory, + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "BanyanDB-ChannelManager"))); } ChannelManager(ChannelManagerSettings settings, ChannelFactory channelFactory, ScheduledExecutorService executor) throws IOException { diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java index 1ab0ee3e33..679a7e47d3 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java @@ -167,7 +167,8 @@ public class GRPCClient implements Client, HealthCheckable { private void checkHealth() { if (healthCheckExecutor == null) { - healthCheckExecutor = Executors.newSingleThreadScheduledExecutor(); + healthCheckExecutor = Executors.newSingleThreadScheduledExecutor( + r -> new Thread(r, "GRPCClient-HealthCheck-" + host + ":" + port)); healthCheckExecutor.scheduleAtFixedRate(healthCheckRunnable, initialDelay, period, TimeUnit.SECONDS ); } diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java index ddf375d6c3..202ac9b0e0 100644 --- a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java +++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java @@ -134,7 +134,8 @@ public class MultipleFilesChangeMonitor { SCHEDULER_CHANGE_LOCK.lock(); try { if (FILE_MONITOR_TASK_SCHEDULER == null) { - FILE_MONITOR_TASK_SCHEDULER = Executors.newSingleThreadScheduledExecutor() + FILE_MONITOR_TASK_SCHEDULER = Executors.newSingleThreadScheduledExecutor( + r -> new Thread(r, "FileChangeMonitor")) .scheduleAtFixedRate( MultipleFilesChangeMonitor::scanChanges, 1, 200, TimeUnit.MILLISECONDS
