This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch library-batch-queue in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 9bfc79d2baa8e867edd2700ce0ae8e9533244b43 Author: Wu Sheng <[email protected]> AuthorDate: Sun Feb 15 09:21:07 2026 +0800 Add CLAUDE.md and DESIGN.md for library-batch-queue module CLAUDE.md documents the module's design principles, architecture, dispatch modes, scheduler modes, key classes, and current usage across the codebase. DESIGN.md describes the throughput-weighted partition rebalancing feature: two-phase handoff protocol with cycle-count fencing for safe concurrent-free reassignment, targeting L2 persistence (primary) and L1 aggregation (secondary) queues. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../server-library/library-batch-queue/CLAUDE.md | 166 +++ .../server-library/library-batch-queue/DESIGN.md | 1318 +++----------------- 2 files changed, 352 insertions(+), 1132 deletions(-) diff --git a/oap-server/server-library/library-batch-queue/CLAUDE.md b/oap-server/server-library/library-batch-queue/CLAUDE.md new file mode 100644 index 0000000000..4b13a2999a --- /dev/null +++ b/oap-server/server-library/library-batch-queue/CLAUDE.md @@ -0,0 +1,166 @@ +# library-batch-queue + +A partitioned, self-draining queue with type-based dispatch. Replaces the legacy `DataCarrier` across the OAP server. + +## Core Design Principles + +1. **Describe workload, not threads.** Callers declare intent (`cpuCores(1.0)`, `adaptive()`) and the queue resolves concrete thread/partition counts at runtime. +2. **One queue per concern, many types per queue.** Metrics aggregation, persistence, and export each get one shared queue. Individual metric types register handlers and share the queue's thread pool. +3. **Partition-level isolation.** Each partition is an independent `ArrayBlockingQueue`. The default `typeHash` selector routes all items of the same class to the same partition, so drain-loop dispatch grouping is effectively free. +4. **Adaptive backoff.** Idle drain loops double their sleep interval (`minIdleMs * 2^idleCount`, capped at `maxIdleMs`), resetting on first non-empty drain. No busy-waiting. + +## Architecture + +``` +Producer threads Drain threads (scheduler) + | | + | produce(data) | drainLoop(taskIndex) + | | | | + | +-- select partition (typeHash) | +-- drainTo(combined) from assigned partitions + | +-- put/offer into partition | +-- dispatch(combined) + | | | + | | +-- single consumer? -> consumer.consume(batch) + | | +-- handler map? -> group by class, handler.consume(subBatch) + | | +-- loop until empty, then re-schedule with backoff +``` + +## Two Dispatch Modes + +### Single consumer mode +Set `config.consumer(handler)`. The entire drained batch goes to one callback. No class-based grouping. +Use for: homogeneous queues where all items are the same type (JDBC batch, single exporter). + +### Handler map mode +Call `queue.addHandler(TypeA.class, handlerA)` per type. Drained items are grouped by `getClass()` and dispatched to matching handlers. Unregistered types are logged and dropped. +Use for: shared queues where many metric types coexist (L1 aggregation, L2 persistence, TopN). + +## Scheduler Modes + +### Dedicated scheduler +The queue owns a `ScheduledThreadPool`. Each thread is assigned a fixed subset of partitions (round-robin). Multiple threads drain concurrently. + +```java +BatchQueueConfig.builder() + .threads(ThreadPolicy.cpuCores(1.0)) // own thread pool + .partitions(PartitionPolicy.adaptive()) + ... +``` + +### Shared scheduler +Multiple queues share one `ScheduledThreadPool` (reference-counted, auto-shutdown). Each queue gets 1 drain task on the shared pool. Useful for low-throughput I/O queues. + +```java +BatchQueueConfig.builder() + .sharedScheduler("exporter", ThreadPolicy.fixed(1)) // shared pool + .partitions(PartitionPolicy.fixed(1)) + ... +``` + +## Key Classes + +| Class | Role | +|-------|------| +| `BatchQueue<T>` | The queue itself. Holds partitions, runs drain loops, dispatches to consumers/handlers. | +| `BatchQueueManager` | Global registry. Creates queues by name, manages shared schedulers with ref-counting. | +| `BatchQueueConfig<T>` | Builder for queue configuration (threads, partitions, buffer, strategy, consumer). | +| `ThreadPolicy` | Resolves thread count: `fixed(N)`, `cpuCores(mult)`, `cpuCoresWithBase(base, mult)`. | +| `PartitionPolicy` | Resolves partition count: `fixed(N)`, `threadMultiply(N)`, `adaptive()`. | +| `PartitionSelector<T>` | Routes items to partitions. Default `typeHash()` groups by class. | +| `HandlerConsumer<T>` | Callback for processing a batch. Has optional `onIdle()` for flush-on-idle. | +| `BufferStrategy` | `BLOCKING` (put, waits) or `IF_POSSIBLE` (offer, drops if full). | +| `BatchQueueStats` | Point-in-time snapshot of queue usage. `totalUsed()`, `topN(n)`, per-partition stats. | +| `QueueErrorHandler<T>` | Optional error callback. If absent, errors are logged. | + +## ThreadPolicy + +```java +ThreadPolicy.fixed(4) // exactly 4 threads +ThreadPolicy.cpuCores(1.0) // 1 thread per CPU core +ThreadPolicy.cpuCoresWithBase(1, 0.25) // 1 + 0.25 * cores (e.g., 3 on 8-core) +``` + +Always resolves to >= 1. + +## PartitionPolicy + +```java +PartitionPolicy.fixed(4) // exactly 4 partitions +PartitionPolicy.threadMultiply(2) // 2 * thread count +PartitionPolicy.adaptive() // grows with addHandler() calls +``` + +Adaptive growth (default multiplier 25, with 8 threads -> threshold 200): +- 0 handlers -> 8 partitions (= thread count) +- 100 handlers -> 100 partitions (1:1) +- 500 handlers -> 350 partitions (200 + 300/2) + +## Usage in the Codebase + +### L1 Metrics Aggregation (`MetricsAggregateWorker`) +``` +threads: cpuCores(1.0) -- 8 threads on 8-core +partitions: adaptive() -- grows with metric types (~460 for typical OAL+MAL) +bufferSize: 20,000 per partition +strategy: IF_POSSIBLE +idleMs: 1..50 +mode: handler map (one handler per metric class) +``` + +### L2 Metrics Persistence (`MetricsPersistentMinWorker`) +``` +threads: cpuCoresWithBase(1, 0.25) -- 3 threads on 8-core +partitions: adaptive() -- grows with metric types +bufferSize: 2,000 per partition +strategy: IF_POSSIBLE +idleMs: 1..50 +mode: handler map (one handler per metric class) +``` + +### TopN Persistence (`TopNWorker`) +``` +threads: fixed(1) +partitions: adaptive() -- grows with TopN types +bufferSize: 1,000 per partition +strategy: IF_POSSIBLE +idleMs: 10..100 +mode: handler map (one handler per TopN class) +``` + +### gRPC Remote Client (`GRPCRemoteClient`) +``` +threads: fixed(1) +partitions: fixed(1) +bufferSize: configurable (channelSize * bufferSize) +strategy: BLOCKING +idleMs: 1..50 +mode: single consumer (sends over gRPC stream) +``` + +### Exporters (gRPC metrics, Kafka trace, Kafka log) +``` +threads: fixed(1) each +partitions: fixed(1) each +bufferSize: configurable (default 20,000) +strategy: BLOCKING (gRPC) / IF_POSSIBLE (Kafka) +idleMs: 1..200 +mode: single consumer +``` + +### JDBC Batch DAO (`JDBCBatchDAO`) +``` +threads: fixed(N) where N = asyncBatchPersistentPoolSize (default 4) +partitions: fixed(N) (1 partition per thread) +bufferSize: 10,000 per partition +strategy: BLOCKING +idleMs: 1..20 +mode: single consumer (JDBC batch flush) +``` + +## Lifecycle + +1. `BatchQueueManager.create(name, config)` -- creates and starts drain loops immediately +2. `queue.addHandler(type, handler)` -- registers type handler (adaptive: may grow partitions) +3. `queue.produce(data)` -- routes to partition, blocks or drops per strategy +4. Drain loops run continuously, dispatching batches to consumers/handlers +5. `BatchQueueManager.shutdown(name)` -- stops drain, final flush, releases scheduler +6. `BatchQueueManager.shutdownAll()` -- called during OAP server shutdown diff --git a/oap-server/server-library/library-batch-queue/DESIGN.md b/oap-server/server-library/library-batch-queue/DESIGN.md index bc107dbbaf..e5525c9d0b 100644 --- a/oap-server/server-library/library-batch-queue/DESIGN.md +++ b/oap-server/server-library/library-batch-queue/DESIGN.md @@ -1,1205 +1,259 @@ -# library-batch-queue Design Proposal +# Throughput-Weighted Partition Rebalancing -## Goal +## Problem -Replace `library-datacarrier-queue` with a unified, simpler batch queue library that reduces thread -usage while preserving all required capabilities. +`BatchQueue` assigns partitions to drain threads with a static round-robin mapping +(`buildAssignments`). Combined with `typeHash()` partition selection, each metric +class is pinned to exactly one partition and one drain thread. -## Problem Statement +In a typical SkyWalking deployment, OAL metrics generate far more data than MAL +metrics. With hundreds of metric types of varying throughput, the static assignment +creates **unbalanced drain threads**: some threads are overloaded with hot OAL +partitions while others are nearly idle draining cold MAL partitions. -DataCarrier has two consumption modes with separate code paths: +The imbalance is invisible for low-throughput queues (exporters, TopN, JDBC) but +significant for **L1 aggregation** and **L2 persistence** queues, which have +`cpuCores(1.0)` or more threads and `adaptive()` partitions scaling to hundreds. -1. **Simple mode** (`ConsumeDriver`): Each DataCarrier gets dedicated consumer thread(s). - Used by TopNWorker, GRPCRemoteClient, JDBCBatchDAO, exporters. -2. **Pool mode** (`BulkConsumePool`): Multiple DataCarriers share a thread pool. - Used by MetricsAggregateWorker (L1) and MetricsPersistentMinWorker (L2). +## Design: Periodic Throughput-Weighted Reassignment -On an 8-core production machine, this creates **47+ DataCarrier consumer threads**: +A background rebalancer periodically measures per-partition throughput, then +reassigns partitions to threads to equalize total load per thread. -| Source | Threads | Mode | -|--------------------------------|---------|--------| -| L1 OAL aggregation pool | 24 | Pool | -| L2 OAL persistent pool | 2 | Pool | -| L1 MAL aggregation pool | 2 | Pool | -| L2 MAL persistent pool | 1 | Pool | -| TopNWorker (per type) | 5-10 | Simple | -| GRPCRemoteClient (per peer) | 2-4 | Simple | -| JDBCBatchDAO | 2-4 | Simple | -| Exporters (gRPC/Kafka) | 0-3 | Simple | -| **Total** | **~38-48** | | - -Key issues: -- Simple mode wastes threads: each queue gets a dedicated thread even though most are idle - (sleeping in 200ms polling loop). -- Pool mode creates one DataCarrier (with Channels) per metric type. With 100+ metric types, - pool threads must iterate through all assigned channels even when most are empty — wasted CPU. -- Two completely separate code paths for the same produce-consume pattern. -- Pool mode has static assignment — no rebalancing after initial allocation. - -## Design - -### Architecture Overview +### Data flow overview ``` - BatchQueueManager (global singleton registry + lazy shared schedulers) - │ - │ Shared schedulers (created lazily on first queue reference): - │ ├── "IO_POOL" ──> ScheduledExecutorService (cpuCores(0.5) → 4 threads on 8-core) - │ │ Created when first queue calls sharedScheduler("IO_POOL", cpuCores(0.5)). - │ │ Shared by all I/O queues: - │ │ - GRPCRemoteClient.* (gRPC to peer OAP nodes) - │ │ - GRPCMetricsExporter (gRPC metrics export) - │ │ - KafkaLogExporter (Kafka log export) - │ │ - KafkaTraceExporter (Kafka trace export) - │ │ - JDBCBatchDAO (JDBC batch writes) - │ │ - │ - │ Queues: - │ - ├── "METRICS_L1_AGGREGATION" ──> BatchQueue<Metrics> - │ │ threads: cpuCores(1.0) → 8 on 8-core - │ │ partitions: threadMultiply(2) → 16 on 8-core - │ │ strategy: IF_POSSIBLE - │ │ handlerMap: { ServiceRespTimeMetrics.class -> handler-A (OAL), - │ │ ServiceCpmMetrics.class -> handler-B (OAL), - │ │ MeterMetrics_xxx.class -> handler-C (MAL), ... } - │ └── OAL and MAL metrics share the same L1 queue and thread pool. - │ - ├── "METRICS_L2_PERSISTENT" ──> BatchQueue<Metrics> - │ │ threads: cpuCores(0.25) → 2 on 8-core - │ │ partitions: threadMultiply(2) → 4 on 8-core - │ │ strategy: BLOCKING - │ │ handlerMap: { ServiceRespTimeMetrics.class -> handler-D (OAL), - │ │ MeterMetrics_xxx.class -> handler-E (MAL), ... } - │ └── OAL and MAL metrics share the same L2 queue and thread pool. - │ - ├── "TOPN_WORKER" ──> BatchQueue<TopN> - │ │ threads: fixed(1) - │ │ partitions: fixed(1) - │ │ strategy: BLOCKING - │ │ handlerMap: { DatabaseSlowStatement.class -> handler-F, - │ │ DatabaseSlowSql.class -> handler-G, ... } - │ └── drain: drainTo → groupBy(class) → dispatch to handler - │ (TopN is in-memory ranking computation — all types share one thread) - │ - ├── "GRPCRemoteClient.peer1" ──> BatchQueue<RemoteMessage> (shared="IO_POOL", partitions=1, BLOCKING) - │ │ scheduler: shared "IO_POOL" - │ │ consumer: direct consumer for RemoteMessage - │ └── drain: drainTo → direct consumer - │ - └── "JDBCBatchDAO" ──> BatchQueue<PrepareRequest> (shared="IO_POOL", partitions=1, BLOCKING) - │ scheduler: shared "IO_POOL" - │ consumer: direct consumer for PrepareRequest - └── same +Producer threads Drain threads Rebalancer (periodic) + | | | + | produce(data) | drainLoop(taskIndex) | every rebalanceIntervalMs: + | | | | | 1. snapshot throughput counters + | +-- typeHash() | +-- read partitionOwner 2. reset counters + | +-- put/offer | | skip if != me | 3. LPT assign partitions + | +-- bump counter | +-- drainTo + dispatch | 4. two-phase handoff + | | +-- bump cycleCount | ``` -### Core Concept - -**One queue type, one config, two scheduler modes.** - -- **`BatchQueueManager`** is the global singleton registry. It also manages named shared - schedulers for low-throughput queues. Users call `createIfAbsent(name, config)` to - get a named `BatchQueue`. -- **`BatchQueue<T>`** has N partitions (configurable, default 1) and a handler map. - Producers round-robin data into partitions. On drain, each batch is **grouped by - message class** and dispatched to the handler registered for that class. -- **Handler registration** via `queue.addHandler(Class, HandlerConsumer)`. - Each worker provides its own handler instance for its specific type. - -The handler map pattern works the same way regardless of partition count: -- `threadMultiply(2)` with 100+ handlers → metrics aggregation (many types, shared partitions) -- `partitions=1` with N handlers → TopN (multiple types, low throughput, shared 1 thread) -- `partitions=1` with 1 consumer → I/O queue (gRPC client, exporter, JDBC) +### Throughput counters -No need for separate queue classes. The difference is just configuration. - -### Why Shared Partitions + Handler Map - -In the old BulkConsumePool model with 100+ metric types: - -``` -Pool Thread-0 assigned: [service_resp_time channels, service_cpm channels, ...] -Pool Thread-1 assigned: [endpoint_resp_time channels, endpoint_cpm channels, ...] -... -Each thread iterates ALL assigned channels per cycle, even if most are empty. -``` - -In the new model: - -``` -Partition-0: mixed data from all metric types (round-robin) -Partition-1: mixed data from all metric types -... -Partition-N: mixed data from all metric types - -On drain of Partition-K: - batch = drainTo(list) // all data, mixed types - grouped = batch.groupBy(item.getClass()) // group by metric class - for each (class, items) in grouped: - handler = handlerMap.get(class) // lookup registered handler - handler.consume(items) // dispatch to the right worker -``` - -Benefits: -- Partitions are created based on parallelism needs, not metric count. - 16 partitions (8 threads * 2) serve 100+ metric types. -- No empty channel iteration — every partition gets data. -- Handlers are registered on-demand. Adding a new metric type is just - `addHandler(NewMetrics.class, handlerInstance)`. -- Each handler still processes only its own metric type's data in isolation. -- I/O queues use the same structure with `partitions=1` and a direct consumer. - -### API +Each partition has an `AtomicLong` counter, incremented on every `produce()` call. +The rebalancer snapshots and resets all counters each interval. ```java -// ── Metrics aggregation (dedicated pool, many types, handler map dispatch) ── - -BatchQueue<Metrics> l1Queue = BatchQueueManager.createIfAbsent( - "METRICS_L1_AGGREGATION", - BatchQueueConfig.<Metrics>builder() - .threads(ThreadPolicy.cpuCores(1.0)) // 1x CPU cores (e.g. 8 on 8-core) - .partitions(PartitionPolicy.threadMultiply(2)) // 2x resolved threads = 16 on 8-core - .bufferSize(10_000) - .strategy(BufferStrategy.IF_POSSIBLE) - .errorHandler((data, t) -> log.error(t.getMessage(), t)) - .build() -); - -// Each MetricsAggregateWorker registers its inner class handler for its metric class. -// Called per metric type in MetricsStreamProcessor.create() (100+ times): -l1Queue.addHandler(metricsClass, new L1Handler()); // L1Handler is worker's inner class - -// Produce — data goes into a partition by round-robin -// Adaptive backoff ensures fast re-poll (minIdleMs) when data is flowing. -l1Queue.produce(metricsData); +AtomicLongArray partitionThroughput; // one slot per partition -// ── TopN (shared queue — all TopN types share one thread, handler map dispatch) ── - -BatchQueue<TopN> topNQueue = BatchQueueManager.createIfAbsent( - "TOPN_WORKER", - BatchQueueConfig.<TopN>builder() - .threads(ThreadPolicy.fixed(1)) // all TopN types share 1 thread - .partitions(PartitionPolicy.fixed(1)) - .bufferSize(1000) - .strategy(BufferStrategy.BLOCKING) - .errorHandler((data, t) -> log.error(t.getMessage(), t)) - .build() -); - -// Each TopNWorker registers its handler for its specific TopN class (5-10 types): -topNQueue.addHandler(topNClass, new TopNHandler()); // TopNHandler is worker's inner class - -topNQueue.produce(topNData); - -// ── I/O queues (shared scheduler, single consumer) ── -// sharedScheduler() specifies both name and ThreadPolicy. BatchQueueManager creates the -// shared ScheduledExecutorService on first reference, reuses it for subsequent queues. -// No separate createSharedScheduler() call needed — no startup ordering dependency. -// -// All these queues share the same "IO_POOL" scheduler: -// GRPCRemoteClient.* — gRPC streaming to peer OAP nodes -// GRPCMetricsExporter — gRPC metrics export -// KafkaLogExporter — Kafka log export -// KafkaTraceExporter — Kafka trace export -// JDBCBatchDAO — JDBC batch writes to database - -BatchQueue<RemoteMessage> grpcQueue = BatchQueueManager.createIfAbsent( - "GRPCRemoteClient.peer1", - BatchQueueConfig.<RemoteMessage>builder() - .sharedScheduler("IO_POOL", ThreadPolicy.cpuCores(0.5)) // creates IO_POOL on first use - .partitions(PartitionPolicy.fixed(1)) - .bufferSize(10_000) - .strategy(BufferStrategy.BLOCKING) - .consumer(new RemoteMessageHandler()) - .errorHandler((data, t) -> log.error(t.getMessage(), t)) - .build() -); - -// Another queue referencing the same "IO_POOL" — reuses existing scheduler. -// If ThreadPolicy differs from the first creator, logs a warning (first one wins). -BatchQueue<PrepareRequest> jdbcQueue = BatchQueueManager.createIfAbsent( - "JDBCBatchDAO", - BatchQueueConfig.<PrepareRequest>builder() - .sharedScheduler("IO_POOL", ThreadPolicy.cpuCores(0.5)) // reuses existing IO_POOL - .partitions(PartitionPolicy.fixed(1)) - .bufferSize(10_000) - .strategy(BufferStrategy.BLOCKING) - .consumer(new JDBCBatchHandler()) - .errorHandler((data, t) -> log.error(t.getMessage(), t)) - .build() -); +// In produce(), after the put/offer: +partitionThroughput.incrementAndGet(index); +``` -// ── Lifecycle ── +The counter is on the produce path, which is already doing an `ArrayBlockingQueue.put/offer`. +A single `incrementAndGet` adds negligible overhead (no contention — each metric type +hashes to a fixed partition, so each partition's counter is written by a predictable +set of producer threads). -BatchQueueManager.shutdown("METRICS_L1_AGGREGATION"); -BatchQueueManager.shutdownAll(); // shuts down all queues + shared schedulers -``` +### Rebalance algorithm (LPT — Longest Processing Time) -### Classes +The rebalancer runs on the queue's scheduler (one extra scheduled task). It uses the +classic **LPT multiprocessor scheduling** heuristic: ``` -library-batch-queue/ - src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ - BatchQueueManager.java — Global singleton registry + named shared schedulers - BatchQueue.java — Partitioned queue with dedicated or shared scheduler + handler map dispatch - BatchQueueConfig.java — Builder: threads/sharedScheduler, partitions, bufferSize, strategy - ThreadPolicy.java — Fixed or CPU-relative thread count: fixed(N) / cpuCores(double) - PartitionPolicy.java — Fixed or thread-relative partition count: fixed(N) / threadMultiply(N) - HandlerConsumer.java — Interface: void consume(List<T>), default void onIdle() - BufferStrategy.java — BLOCKING / IF_POSSIBLE - QueueErrorHandler.java — Functional interface: void onError(List<T>, Throwable) +1. snapshot = partitionThroughput[0..N-1] +2. reset all counters to 0 +3. sort partitions by snapshot[p] descending +4. threadLoad = long[taskCount], all zeros +5. newAssignment = List<Integer>[taskCount] +6. for each partition p in sorted order: + t = argmin(threadLoad) // thread with least total load + newAssignment[t].add(p) + threadLoad[t] += snapshot[p] +7. two-phase handoff (see below) ``` -### BatchQueueManager - -```java -/** - * Global registry for batch queues and shared schedulers. - * Thread-safe. Queues are created by name and shared across modules. - * - * Two scheduler modes: - * - Dedicated: queue creates its own ScheduledExecutorService (for high-throughput queues). - * Configured via BatchQueueConfig.threads(ThreadPolicy). - * - Shared: queue uses a named shared scheduler managed by this manager (for low-throughput queues). - * Configured via BatchQueueConfig.sharedScheduler(name, ThreadPolicy). - * Multiple queues referencing the same name share one ScheduledExecutorService. - * - * Shared schedulers are created lazily on first queue reference — no separate - * setup step needed. This eliminates startup ordering dependencies. - */ -public class BatchQueueManager { - private static final ConcurrentHashMap<String, BatchQueue<?>> queues = new ConcurrentHashMap<>(); - private static final ConcurrentHashMap<String, ScheduledExecutorService> sharedSchedulers = new ConcurrentHashMap<>(); - - /** - * Get or create a shared scheduler. Called internally by BatchQueue constructor - * when config specifies sharedScheduler(name, threads). - * - * - First call with a given name: creates the ScheduledExecutorService using - * threads.resolve() and caches it. - * - Subsequent calls with the same name: returns the cached scheduler. - * If the ThreadPolicy differs, logs a warning (first one wins). - * - * Thread-safe (ConcurrentHashMap.computeIfAbsent). - * - * Shared schedulers are owned by BatchQueueManager, NOT by any individual queue. - * They are destroyed only by shutdownAll() — never by individual queue shutdown. - */ - static ScheduledExecutorService getOrCreateSharedScheduler(String name, ThreadPolicy threads); +LPT is O(P log P) for sorting + O(P log T) for assignment (with a min-heap for +threadLoad). For 500 partitions and 8 threads, this is sub-millisecond. - /** - * Create a new named queue. Throws if name already exists. - */ - public static <T> BatchQueue<T> create(String name, BatchQueueConfig<T> config); +If a partition has zero throughput in the last interval, it keeps its previous +assignment (no unnecessary moves). - /** - * Create if not present. Returns existing queue if name is taken. - * - * If the queue already exists, validates consistency: - * - Consumption mode conflict: throws IllegalStateException if the existing queue - * uses direct consumer mode (config.consumer set) but the new config does not, - * or vice versa. These two modes are mutually exclusive per queue. - * - Infrastructure settings: logs a warning if threads, partitions, - * bufferSize, or strategy differ between the existing and new config. - */ - public static <T> BatchQueue<T> createIfAbsent(String name, BatchQueueConfig<T> config); +### Two-phase handoff protocol - /** - * Get an existing queue by name. Returns null if not found. - */ - public static <T> BatchQueue<T> get(String name); +Reassigning a partition from Thread-A to Thread-B while Thread-A is mid-dispatch +creates a **concurrent handler invocation** — two threads calling the same +`HandlerConsumer.consume()` on different batches simultaneously. For L1 aggregation, +`MergableBufferedData` is not thread-safe, so this corrupts state. - /** - * Shutdown and remove a single queue by name. - * - Dedicated scheduler: shut down together with the queue. - * - Shared scheduler: NOT shut down. It is owned by BatchQueueManager - * and may still be used by other queues. - */ - public static void shutdown(String name); +The race condition: - /** - * Shutdown all queues and all shared schedulers. Called during OAP server shutdown. - * - * Order: - * 1. Set running=false on all queues (stops drain loops from rescheduling) - * 2. Final drain of remaining data in each queue - * 3. Shut down all dedicated schedulers - * 4. Shut down all shared schedulers - * 5. Clear both registries - */ - public static void shutdownAll(); -} ``` - -### BatchQueue - -```java -/** - * A partitioned queue with handler-map-based dispatch. - * - * The scheduler is either dedicated (owned by this queue) or shared - * (managed by BatchQueueManager, shared with other queues). - * - * Partitions are created based on configured parallelism (default 1). - * Producers round-robin data across partitions. - * On drain, each batch is grouped by message class and dispatched to the - * registered handler for that class. - * - * Works uniformly for all use cases: - * - shared scheduler, partitions=1, one consumer → I/O queue (gRPC, Kafka, JDBC) - * - dedicated fixed(1), partitions=1, many handlers → TopN (all types share 1 thread) - * - dedicated cpuCores(1.0), threadMultiply(2), many handlers → metrics aggregation - */ -public class BatchQueue<T> { - private final String name; - private final ScheduledExecutorService scheduler; - private final boolean dedicatedScheduler; // true = owned by this queue, false = shared - private final ArrayBlockingQueue<T>[] partitions; - private final ConcurrentHashMap<Class<? extends T>, HandlerConsumer<T>> handlerMap; - private final BatchQueueConfig<T> config; - private final AtomicInteger roundRobinIndex = new AtomicInteger(0); - private volatile boolean running; - - /** - * Partition assignment per drain task. Each drain task owns a set of partition indices. - * - * Dedicated mode: one drain task per thread, partitions assigned round-robin. - * threads=4, partitions=8: task[0]→[0,4], task[1]→[1,5], task[2]→[2,6], task[3]→[3,7] - * - * Shared mode: single drain task covering ALL partitions (partitions typically = 1). - * assignedPartitions = { [0] } (one task, one partition) - */ - private final int[][] assignedPartitions; - - /** - * Per-task count of consecutive idle cycles (all assigned partitions empty). - * Used to compute adaptive backoff sleep interval. - */ - private final int[] consecutiveIdleCycles; - - BatchQueue(String name, BatchQueueConfig<T> config) { - this.name = name; - this.config = config; - this.handlerMap = new ConcurrentHashMap<>(); - - int taskCount; - if (config.getSharedSchedulerName() != null) { - // Shared scheduler mode: get-or-create shared scheduler from BatchQueueManager. - ScheduledExecutorService sharedScheduler = BatchQueueManager.getOrCreateSharedScheduler( - config.getSharedSchedulerName(), config.getSharedSchedulerThreads()); - - int partitionCount = config.getPartitions().resolve(1, 0); - this.partitions = new ArrayBlockingQueue[partitionCount]; - for (int i = 0; i < partitions.length; i++) { - partitions[i] = new ArrayBlockingQueue<>(config.getBufferSize()); - } - - this.scheduler = sharedScheduler; - this.dedicatedScheduler = false; - taskCount = 1; - this.assignedPartitions = new int[][] { - java.util.stream.IntStream.range(0, partitions.length).toArray() - }; - } else { - // Dedicated scheduler mode: resolve threads and partitions. - int threadCount = config.getThreads().resolve(); // cpuCores(1.0) → 8 on 8-core - int partitionCount = config.getPartitions().resolve(threadCount, 0); - - // Validation: if partitions < threads, cut threads to match and warn. - if (partitionCount < threadCount) { - log.warn("BatchQueue[{}]: partitions({}) < threads({}), " - + "reducing threads to {}", - name, partitionCount, threadCount, partitionCount); - threadCount = partitionCount; - } - - this.partitions = new ArrayBlockingQueue[partitionCount]; - for (int i = 0; i < partitions.length; i++) { - partitions[i] = new ArrayBlockingQueue<>(config.getBufferSize()); - } - - this.scheduler = Executors.newScheduledThreadPool( - threadCount, - new ThreadFactoryBuilder().setNameFormat("BatchQueue-" + name + "-%d").build() - ); - this.dedicatedScheduler = true; - taskCount = threadCount; - - // Assign partitions to threads by round-robin. - // threads=4, partitions=8: task[0]→[0,4], task[1]→[1,5], ... - // threads=4, threadMultiply(2)=8: same - // threads=8, partitions=8: task[0]→[0], task[1]→[1], ... - this.assignedPartitions = new int[taskCount][]; - List<List<Integer>> assignment = new ArrayList<>(); - for (int t = 0; t < taskCount; t++) { - assignment.add(new ArrayList<>()); - } - for (int p = 0; p < partitions.length; p++) { - assignment.get(p % taskCount).add(p); - } - for (int t = 0; t < taskCount; t++) { - assignedPartitions[t] = assignment.get(t).stream().mapToInt(Integer::intValue).toArray(); - } - } - - // Kick off one self-rescheduling drain task per assignment. - this.consecutiveIdleCycles = new int[taskCount]; - this.running = true; - for (int t = 0; t < taskCount; t++) { - scheduleDrain(t); - } - } - - /** - * Schedule the next drain with adaptive backoff. - * - * Idle count 0 (just had data): sleep = minIdleMs (e.g. 5ms) - * Idle count 1: sleep = minIdleMs * 2 (e.g. 10ms) - * Idle count 2: sleep = minIdleMs * 4 (e.g. 20ms) - * ... - * Idle count N: sleep = min(minIdleMs * 2^N, maxIdleMs) - */ - private void scheduleDrain(int taskIndex) { - int idleCount = consecutiveIdleCycles[taskIndex]; - long delay = Math.min( - config.getMinIdleMs() * (1L << Math.min(idleCount, 20)), - config.getMaxIdleMs() - ); - scheduler.schedule( - () -> drainLoop(taskIndex), - delay, - TimeUnit.MILLISECONDS - ); - } - - /** - * Register a handler for a specific message class type. - * Multiple metric types can each register their own handler instance. - */ - public void addHandler(Class<? extends T> type, HandlerConsumer<T> handler); - - /** - * Produce data into a partition (round-robin). - * BLOCKING: waits if the selected partition is full (queue.put). - * IF_POSSIBLE: returns false if full (queue.offer). - */ - public boolean produce(T data) { - int index = Math.abs(roundRobinIndex.getAndIncrement() % partitions.length); - if (config.getStrategy() == BufferStrategy.BLOCKING) { - partitions[index].put(data); // blocks until space available - return true; - } else { - return partitions[index].offer(data); // returns false if full - } - } - - /** - * Drain loop for one thread: iterates ALL assigned partitions in a round-robin loop. - * Only stops when ALL assigned partitions are empty in a full cycle. - * - * Each cycle: - * 1. Drain ALL assigned partitions into one combined batch - * 2. If combined batch is empty → all partitions empty → onIdle, break - * 3. dispatch(combined batch) → handlers get ALL data of their type as one list - * 4. Loop back to step 1 (more data may have arrived during dispatch) - * - * Example: Thread-0 owns partitions [0, 4] - * cycle 1: drain(0)→[A1,B1,A2] drain(4)→[C1,A3] - * combined = [A1,B1,A2,C1,A3] - * dispatch → groupBy class: - * handlerA.consume([A1,A2,A3]) ← all A's in one call - * handlerB.consume([B1]) - * handlerC.consume([C1]) - * cycle 2: drain(0)→[A4] drain(4)→[] - * combined = [A4] (not empty → dispatch) - * cycle 3: drain(0)→[] drain(4)→[] - * combined = [] → onIdle, reschedule - */ - void drainLoop(int taskIndex) { - int[] myPartitions = assignedPartitions[taskIndex]; - try { - while (running) { - // Step 1: drain ALL assigned partitions into one batch - List<T> combined = new ArrayList<>(); - for (int partitionIndex : myPartitions) { - partitions[partitionIndex].drainTo(combined); - } - - // Step 2: if nothing across all partitions, we are idle - if (combined.isEmpty()) { - consecutiveIdleCycles[taskIndex]++; - notifyIdle(); - break; // reschedule with backoff - } - - // Had data → reset backoff so next reschedule is fast - consecutiveIdleCycles[taskIndex] = 0; - - // Step 3: dispatch the combined batch - dispatch(combined); - // Step 4: loop immediately — more data may have arrived - } - } finally { - if (running) { - scheduleDrain(taskIndex); - } - } - } - - void shutdown() { - running = false; - // Final drain of remaining data across all partitions - List<T> combined = new ArrayList<>(); - for (int i = 0; i < partitions.length; i++) { - partitions[i].drainTo(combined); - } - if (!combined.isEmpty()) { - dispatch(combined); - } - // Only shut down the scheduler if this queue owns it. - // Shared schedulers are shut down by BatchQueueManager.shutdownAll(). - if (dedicatedScheduler) { - scheduler.shutdown(); - } - } -} +Thread-A Rebalancer Thread-B +──────── ────────── ──────── +drainTo(P3) → 500 items +dispatch(batch): + handler_X.consume(500) owner[P3] = B + ← still running drainTo(P3) → 200 new items + dispatch(batch): + handler_X.consume(200) + ← CONCURRENT! handler_X corrupted ``` -**Dispatch and idle notification:** - -```java -private void dispatch(List<T> batch) { - // Direct consumer mode: pass the whole batch, no groupBy overhead. - if (config.getConsumer() != null) { - try { - config.getConsumer().consume(batch); - } catch (Throwable t) { - config.getErrorHandler().onError(batch, t); - } - return; - } - - // Handler map mode: group by class type and dispatch to registered handlers. - Map<Class<?>, List<T>> grouped = new HashMap<>(); - for (T item : batch) { - grouped.computeIfAbsent(item.getClass(), k -> new ArrayList<>()).add(item); - } +A simple ownership gate (`if partitionOwner[p] != me: skip`) prevents the new owner +from **draining** the partition. But the old owner already drained the data and is +still **dispatching** to the handler. The new owner would drain new items and call +the same handler concurrently. - for (Map.Entry<Class<?>, List<T>> entry : grouped.entrySet()) { - HandlerConsumer<T> handler = handlerMap.get(entry.getKey()); - if (handler != null) { - try { - handler.consume(entry.getValue()); - } catch (Throwable t) { - config.getErrorHandler().onError(entry.getValue(), t); - } - } - } -} +The fix is a two-phase handoff with a cycle-count fence: -private void notifyIdle() { - if (config.getConsumer() != null) { - config.getConsumer().onIdle(); - } else { - handlerMap.values().forEach(HandlerConsumer::onIdle); - } -} ``` - -**Consumer workflow (end-to-end):** - +Phase 1 — Revoke: + partitionOwner[p] = UNOWNED (-1) + // Thread-A sees UNOWNED on next drainTo check, skips P3. + // But Thread-A may be mid-dispatch right now — handler still in use. + + Wait: spin until cycleCount[oldTask] > snapshot + // Once the counter increments, Thread-A has finished its current + // drain+dispatch cycle. The handler is no longer being called. + +Phase 2 — Assign: + partitionOwner[p] = newTaskIndex + // Thread-B picks up P3 on its next drain cycle. Safe — no concurrent handler call. ``` -Producer threads Consumer thread (Thread-0, owns partitions [0, 4]) -────────────── ───────────────────────────────────────────────── -produce(A1) ──offer/put──► Partition-0: [A1, C1, B1] -produce(B1) ──offer/put──► Partition-4: [A2, A3] -produce(C1) ──┘ (round-robin) +During the gap between phases, new items accumulate in partition P3 but are not lost. +Thread-B drains them once Phase 2 completes. - ┌─── scheduleDrain(0) after adaptive backoff delay - │ - ▼ - drainLoop(taskIndex=0) - │ - ┌────────┴───────────────────────────────────────────┐ - │ while (running): │ - │ │ - │ ── Step 1: drain ALL assigned partitions ── │ - │ combined = [] │ - │ Partition-0.drainTo(combined) → [A1, C1, B1] │ - │ Partition-4.drainTo(combined) → [A1,C1,B1,A2,A3]│ - │ │ - │ ── Step 2: check if empty ── │ - │ combined not empty → dispatch │ - │ │ - │ ── Step 3: dispatch(combined) ── │ - │ ┌─ config.consumer set? │ - │ │ YES → consumer.consume(combined) ─── done │ - │ │ NO → groupBy class: │ - │ │ MetricA.class → [A1, A2, A3] │ - │ │ MetricB.class → [B1] │ - │ │ MetricC.class → [C1] │ - │ │ for each group: │ - │ │ handler = handlerMap.get(class) │ - │ │ handler.consume(group) │ - │ │ ↓ │ - │ │ L1Handler_A.consume([A1, A2, A3]) │ - │ │ → workerA.onWork([A1, A2, A3]) │ - │ │ → mergeDataCache.accept(A1) │ - │ │ → mergeDataCache.accept(A2) │ - │ │ → mergeDataCache.accept(A3) │ - │ │ → flush() if period elapsed │ - │ │ L1Handler_B.consume([B1]) │ - │ │ → workerB.onWork([B1]) │ - │ │ L1Handler_C.consume([C1]) │ - │ │ → workerC.onWork([C1]) │ - │ └───────────────────────────────────────────── │ - │ │ - │ ── Step 4: loop immediately (more may have come)──│ - │ │ - │ ── next cycle ── │ - │ combined = [] │ - │ Partition-0.drainTo(combined) → [] │ - │ Partition-4.drainTo(combined) → [] │ - │ combined is empty │ - │ → notifyIdle() │ - │ handlerMap.values().forEach(::onIdle) │ - │ ↓ │ - │ L1Handler_A.onIdle() │ - │ → workerA.flush() (force flush cache) │ - │ L1Handler_B.onIdle() │ - │ → workerB.flush() │ - │ ... │ - │ consecutiveIdleCycles[0]++ (e.g. now = 1) │ - │ break │ - └────────────────────────────────────────────────────┘ - │ - ▼ (finally block) - scheduleDrain(0) - │ - ├─ idleCount=0 (just had data): wait 5ms ← fast re-poll - ├─ idleCount=1: wait 10ms - ├─ idleCount=2: wait 20ms - ├─ idleCount=3: wait 40ms - ├─ idleCount=4: wait 80ms - ├─ idleCount=5: wait 160ms - ├─ idleCount=6+: wait 200ms ← capped at maxIdleMs - │ - ▼ - drainLoop again - │ - (if data found → idleCount resets to 0 → back to fast polling) -``` - -**Key points:** -- Each cycle drains ALL assigned partitions into one combined batch before dispatching. -- `dispatch()` is called once per cycle with all data from all partitions combined. -- In handler map mode, `groupBy(class)` collects all items of the same type across all - partitions into one list. The handler receives ALL available data of its type in a single - `consume()` call — e.g., `[A1, A2, A3]` not three separate calls. -- The handler's `consume()` runs **synchronously** inside the drain thread. The handler - (an inner class of the worker) directly accesses the worker's fields — merge cache, - counters, flush logic — with no extra threading. -- If any partition had data, loop immediately to check all partitions again. -- `onIdle()` fires only when ALL assigned partitions are empty in a full cycle, giving - handlers a chance to flush periodic caches (e.g., L1 aggregation merge cache → nextWorker). -- **Adaptive backoff**: after data, re-poll in `minIdleMs` (5ms). Each consecutive empty - cycle doubles the sleep, capping at `maxIdleMs` (200ms). Data resets to fast polling. - -**Two consumption modes, same queue class:** -- **Direct consumer** (`config.consumer` set) — whole batch goes to one handler, no groupBy. - Use for I/O queues where all data is the same type (gRPC, Kafka, JDBC). -- **Handler map** (`addHandler` called) — batch grouped by class, dispatched per type. - Use for metrics aggregation (L1/L2) and TopN with many types sharing partitions. +### Cycle counter -If both are set, direct consumer takes priority (handler map is ignored). -If neither is set, data is drained but silently dropped. - -### BatchQueueConfig +Each drain task increments its cycle counter at the end of every drain cycle (after +dispatch completes, before re-scheduling). The rebalancer reads this counter to know +when a task has finished any in-flight work. ```java -@Builder -public class BatchQueueConfig<T> { - /** - * Thread policy for a dedicated ScheduledExecutorService. - * Resolved at queue construction time. Examples: - * ThreadPolicy.fixed(8) → always 8 threads - * ThreadPolicy.cpuCores(1.0) → 1x available CPU cores (8 on 8-core) - * ThreadPolicy.cpuCores(0.25) → 0.25x CPU cores (2 on 8-core, min 1) - * - * When set, the queue creates its own scheduler. - * When null, sharedScheduler must be set — the queue uses a shared scheduler. - * - * Use dedicated pools for high-throughput queues (metrics L1/L2 aggregation) - * where you need guaranteed thread capacity. - */ - private ThreadPolicy threads; - - /** - * Shared scheduler name and its ThreadPolicy. Set via the builder method - * sharedScheduler(name, threads) which populates both fields together. - * Mutually exclusive with the threads field above. - * - * When set, the queue registers its drain tasks on a shared - * ScheduledExecutorService managed by BatchQueueManager. - * The shared scheduler is created lazily on first queue reference — - * no separate setup step needed. Subsequent queues with the same name - * reuse the existing scheduler (ThreadPolicy mismatch logs a warning). - * - * Use for low-throughput I/O queues (gRPC, Kafka, JDBC) to reduce OS thread count. - * Multiple queues sharing the same scheduler name share the same thread pool. - * - * Exactly one of threads or sharedScheduler must be set. - */ - private String sharedSchedulerName; - private ThreadPolicy sharedSchedulerThreads; - - /** - * Number of partitions, or a policy to derive it from resolved thread count. - * - * Can be set as: - * - Absolute: fixed(8) → exactly 8 partitions. - * - Relative: threadMultiply(2) → 2x resolved thread count. - * e.g. cpuCores(0.5) on 8-core = 4 threads, threadMultiply(2) → 8 partitions. - * - * Default: fixed(1). - * - * Validation (applied at construction time for dedicated scheduler mode): - * - If partitions < resolved thread count, thread count is reduced to match - * partitions and a warning is logged. No point having more threads than partitions. - */ - @Builder.Default - private PartitionPolicy partitions = PartitionPolicy.fixed(1); - - /** - * Buffer size per partition. - */ - @Builder.Default - private int bufferSize = 10_000; - - /** - * BLOCKING: producer waits when buffer full. - * IF_POSSIBLE: producer gets false when buffer full. - */ - @Builder.Default - private BufferStrategy strategy = BufferStrategy.BLOCKING; - - /** - * Direct consumer for the whole batch. When set, all drained data goes to this - * handler without class-based grouping. Takes priority over handler map. - * - * Use this for I/O queues where all data is the same type (gRPC, Kafka, JDBC). - * Leave null to use handler map dispatch via addHandler(). - */ - private HandlerConsumer<T> consumer; - - /** - * Called when a handler throws during consume. - */ - private QueueErrorHandler<T> errorHandler; +AtomicLongArray cycleCount; // one slot per drain task - /** - * Minimum idle sleep in milliseconds. Default 5ms. - * Used as the base interval when data was recently consumed. - * The thread re-polls quickly to catch new data with low latency. - */ - @Builder.Default - private long minIdleMs = 5; - - /** - * Maximum idle sleep in milliseconds. Default 200ms. - * After several consecutive empty cycles, the sleep interval backs off - * exponentially from minIdleMs up to this cap. - * - * Backoff: sleep = min(minIdleMs * 2^consecutiveIdleCycles, maxIdleMs) - * idle 0: 5ms → idle 1: 10ms → idle 2: 20ms → idle 3: 40ms - * → idle 4: 80ms → idle 5: 160ms → idle 6+: 200ms (capped) - */ - @Builder.Default - private long maxIdleMs = 200; +void drainLoop(int taskIndex) { + // ... drain assigned partitions, dispatch ... + cycleCount.incrementAndGet(taskIndex); + // re-schedule } ``` -### HandlerConsumer +The rebalancer uses it in the revoke phase: ```java -/** - * Handler for processing a batch of data for a specific type. - * Each metric type (or I/O queue user) provides its own handler instance. - */ -public interface HandlerConsumer<T> { - /** - * Process a batch of data belonging to this handler's type. - */ - void consume(List<T> data); +void movePartition(int p, int oldTask, int newTask) { + partitionOwner.set(p, UNOWNED); - /** - * Called when there is nothing to consume. Can be used as a timer trigger - * (e.g. flush L1 aggregation cache periodically). - */ - default void onIdle() { + long snapshot = cycleCount.get(oldTask); + while (cycleCount.get(oldTask) <= snapshot) { + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); } -} -``` - -### ThreadPolicy - -```java -/** - * Determines the number of threads for a BatchQueue's dedicated scheduler - * or for a shared scheduler created via BatchQueueManager. - * - * Two modes: - * - fixed(N): exactly N threads, regardless of hardware. - * - cpuCores(multiplier): multiplier * Runtime.availableProcessors(), rounded. - * - * Resolved value is always >= 1 — every pool must have at least one thread. - * fixed() requires count >= 1 at construction. cpuCores() applies max(1, ...) at resolution. - */ -public class ThreadPolicy { - private final int fixedCount; // > 0 for fixed mode, 0 for cpuCores mode - private final double cpuMultiplier; // > 0 for cpuCores mode, 0 for fixed mode - - /** - * Fixed number of threads. Count must be >= 1. - * Example: fixed(1) → always 1 thread. - * fixed(8) → always 8 threads. - * Throws IllegalArgumentException if count < 1. - */ - public static ThreadPolicy fixed(int count); - - /** - * Threads = multiplier * available CPU cores, rounded, min 1. - * Multiplier must be > 0. - * Example on 8-core machine: - * cpuCores(1.0) → 8 threads - * cpuCores(0.5) → 4 threads - * cpuCores(0.25) → 2 threads - * cpuCores(2.0) → 16 threads - * Example on 2-core machine: - * cpuCores(0.25) → 1 thread (min 1, never 0) - */ - public static ThreadPolicy cpuCores(double multiplier); - - /** - * Resolve the actual thread count. Always returns >= 1. - * For fixed mode, returns fixedCount. - * For cpuCores mode, returns max(1, round(cpuMultiplier * availableProcessors())). - */ - public int resolve(); -} -``` - -### PartitionPolicy - -```java -/** - * Determines the number of partitions for a BatchQueue. - * - * Two modes: - * - fixed(N): exactly N partitions, regardless of thread count. - * - threadMultiply(N): N * resolved thread count. - * - adaptive(): partition count grows with registered handlers. - * Threshold = threadCount * multiplier (default 25). - * Below threshold: 1:1 (one partition per handler). - * Above threshold: excess at 1:2 ratio. - * - * All policies resolved via resolve(threadCount, handlerCount). - * At queue construction time, if partitions < resolved thread count, - * thread count is reduced to match and a warning is logged. - */ -public class PartitionPolicy { - private final int fixedCount; // > 0 for fixed mode - private final int multiplier; // > 0 for threadMultiply/adaptive - private final boolean adaptive; // true for adaptive mode - - public static PartitionPolicy fixed(int count); - public static PartitionPolicy threadMultiply(int multiplier); - public static PartitionPolicy adaptive(); - public static PartitionPolicy adaptive(int multiplier); - /** - * Resolve the actual partition count. - * - fixed: returns fixedCount (both params ignored). - * - threadMultiply: returns multiplier * resolvedThreadCount. - * - adaptive: handlerCount == 0 → resolvedThreadCount; - * handlerCount <= threshold → handlerCount (1:1); - * handlerCount > threshold → threshold + (excess / 2). - */ - public int resolve(int resolvedThreadCount, int handlerCount); + partitionOwner.set(p, newTask); } ``` -### Implementing HandlerConsumer +**Worst-case wait:** one `maxIdleMs` (the drain task may be sleeping in backoff). +For L1 aggregation this is 50ms, for L2 persistence 50ms. Since rebalancing runs +every few minutes, this latency is negligible. -Each worker creates a handler as an **inner class** that directly accesses the worker's -fields (merge cache, telemetry counters, flush logic, etc.). The handler instance is -registered per metric class — one handler per worker, one worker per metric type. - -#### L1 Aggregation (MetricsAggregateWorker) - -Current code uses an inner `AggregatorConsumer` that calls `onWork()` and `flush()`: +### Partition ownership array ```java -// Current: inner class IConsumer accesses outer worker fields -private class AggregatorConsumer implements IConsumer<Metrics> { - public void consume(List<Metrics> data) { - MetricsAggregateWorker.this.onWork(data); // accesses mergeDataCache - } - public void nothingToConsume() { - flush(); // accesses lastSendTime, nextWorker - } -} +AtomicIntegerArray partitionOwner; // partitionOwner[p] = taskIndex that owns it, or -1 ``` -New code — same pattern, just implements `HandlerConsumer` instead of `IConsumer`. -Each `MetricsAggregateWorker` instance creates its own handler and registers it for -its specific metric class: +The drain loop checks ownership before draining each partition: ```java -public class MetricsAggregateWorker extends AbstractWorker<Metrics> { - private final MergableBufferedData<Metrics> mergeDataCache; - private final AbstractWorker<Metrics> nextWorker; - private final BatchQueue<Metrics> l1Queue; - - MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, - AbstractWorker<Metrics> nextWorker, - String modelName, - Class<? extends Metrics> metricsClass, - ...) { - this.nextWorker = nextWorker; - this.mergeDataCache = new MergableBufferedData<>(); - - // Get or create the shared L1 queue (idempotent) - this.l1Queue = BatchQueueManager.createIfAbsent( - "METRICS_L1_AGGREGATION", - BatchQueueConfig.<Metrics>builder() - .threads(ThreadPolicy.cpuCores(1.0)) // 1x CPU cores - .partitions(PartitionPolicy.threadMultiply(2)) // 2x resolved threads - .bufferSize(10_000) - .strategy(BufferStrategy.IF_POSSIBLE) - .errorHandler((data, t) -> log.error(t.getMessage(), t)) - .build() - ); - - // Register this worker's handler for its specific metric class. - // The inner class directly accesses mergeDataCache, nextWorker, etc. - l1Queue.addHandler(metricsClass, new L1Handler()); - } - - @Override - public void in(Metrics metrics) { - l1Queue.produce(metrics); - } - - private void onWork(List<Metrics> data) { - data.forEach(mergeDataCache::accept); - flush(); - } - - private void flush() { - if (System.currentTimeMillis() - lastSendTime > l1FlushPeriod) { - mergeDataCache.read().forEach(nextWorker::in); - lastSendTime = System.currentTimeMillis(); - } - } - - // Inner class handler — accesses worker fields directly - private class L1Handler implements HandlerConsumer<Metrics> { - @Override - public void consume(List<Metrics> data) { - MetricsAggregateWorker.this.onWork(data); - } - - @Override - public void onIdle() { - MetricsAggregateWorker.this.flush(); +void drainLoop(int taskIndex) { + for (int p : assignedPartitions[taskIndex]) { + if (partitionOwner.get(p) != taskIndex) { + continue; // revoked or not yet assigned } + partitions[p].drainTo(combined); } + dispatch(combined); + cycleCount.incrementAndGet(taskIndex); } ``` -Key point: 100+ `MetricsAggregateWorker` instances are created (one per metric type, -both OAL and MAL), each registers its own `L1Handler` inner class instance on the -same shared queue. Handler map dispatch routes each metric class to its own worker's -`mergeDataCache` — OAL and MAL handlers coexist in the same queue without interference. +The `partitionOwner` check is one `AtomicIntegerArray.get()` per partition per drain +cycle — a volatile read with no CAS. This is the only overhead on the hot path. -#### L2 Persistent (MetricsPersistentMinWorker) +### Configuration -Same pattern — inner class handler accesses the worker's `onWork()` method: +Rebalancing is opt-in via `BatchQueueConfig`: ```java -public class MetricsPersistentMinWorker extends MetricsPersistentWorker { - private final BatchQueue<Metrics> l2Queue; - - MetricsPersistentMinWorker(..., Class<? extends Metrics> metricsClass, ...) { - super(...); - - this.l2Queue = BatchQueueManager.createIfAbsent( - "METRICS_L2_PERSISTENT", - BatchQueueConfig.<Metrics>builder() - .threads(ThreadPolicy.cpuCores(0.25)) // 0.25x CPU cores (2 on 8-core) - .partitions(PartitionPolicy.threadMultiply(2)) // 2x resolved threads - .bufferSize(10_000) - .strategy(BufferStrategy.BLOCKING) - .errorHandler((data, t) -> log.error(t.getMessage(), t)) - .build() - ); - - l2Queue.addHandler(metricsClass, new L2Handler()); - } - - @Override - public void in(Metrics metrics) { - l2Queue.produce(metrics); - } - - // Inner class handler — accesses worker's onWork, queuePercentageGauge, etc. - private class L2Handler implements HandlerConsumer<Metrics> { - @Override - public void consume(List<Metrics> data) { - queuePercentageGauge.setValue(...); - MetricsPersistentMinWorker.this.onWork(data); - } - } -} +BatchQueueConfig.<Metrics>builder() + .threads(ThreadPolicy.cpuCores(1.0)) + .partitions(PartitionPolicy.adaptive()) + .rebalanceIntervalMs(300_000) // every 5 minutes, 0 = disabled (default) + .build(); ``` -#### TopN (TopNWorker — shared queue, handler map dispatch) - -All TopN types share one queue. Same handler map pattern as L1/L2 — each TopNWorker -registers its inner class handler for its specific TopN class: - -```java -public class TopNWorker extends PersistenceWorker<TopN> { - private final BatchQueue<TopN> topNQueue; - - TopNWorker(..., Class<? extends TopN> topNClass, ...) { - // Get or create the shared TopN queue (idempotent) - this.topNQueue = BatchQueueManager.createIfAbsent( - "TOPN_WORKER", - BatchQueueConfig.<TopN>builder() - .threads(ThreadPolicy.fixed(1)) // all TopN types share 1 thread - .partitions(PartitionPolicy.fixed(1)) - .bufferSize(1000) - .strategy(BufferStrategy.BLOCKING) - .errorHandler((data, t) -> log.error(t.getMessage(), t)) - .build() - ); - - // Register this worker's handler for its specific TopN class - topNQueue.addHandler(topNClass, new TopNHandler()); - } - - @Override - public void in(TopN topN) { - topNQueue.produce(topN); - } +Only meaningful for dedicated-scheduler queues with multiple threads and partitions. +Silently ignored for single-thread or shared-scheduler queues. - // Inner class — accesses worker's limitedSizeBufferedData, reportPeriod, etc. - private class TopNHandler implements HandlerConsumer<TopN> { - @Override - public void consume(List<TopN> data) { - TopNWorker.this.onWork(data); - } +### Full rebalance cycle - @Override - public void onIdle() { - TopNWorker.this.flushIfNeeded(); - } - } -} +``` +1. rebalanceTask fires (scheduled every rebalanceIntervalMs) + | +2. snapshot all partitionThroughput[p], reset to 0 + | +3. skip rebalance if throughput is uniform (max/min ratio < 1.5) + | +4. LPT assignment: sort partitions by throughput desc, + assign each to the least-loaded thread + | +5. diff against current assignedPartitions + | only partitions that changed owner need handoff + | +6. for each moved partition: + | Phase 1: partitionOwner[p] = UNOWNED + | +7. for each moved partition: + | wait: cycleCount[oldTask] > snapshot_before_revoke + | +8. for each moved partition: + | Phase 2: partitionOwner[p] = newTask + | +9. update assignedPartitions (volatile write) + | +10. log summary: "rebalanced N partitions, max thread load delta: X%" ``` -### Thread Reduction - -Thread counts scale with CPU cores via ThreadPolicy. OAL and MAL share the same L1/L2 pools. -Example on 8-core machine: - -| Before (DataCarrier) | Threads | After (BatchQueue) | Threads (8-core) | -|---------------------------------|---------|-------------------------------------------------|------------------| -| L1 OAL pool | 24 | METRICS_L1_AGGREGATION (cpuCores(1.0)) | 8 | -| L1 MAL pool | 2 | *(shared with L1 above)* | *(shared)* | -| L2 OAL pool | 2 | METRICS_L2_PERSISTENT (cpuCores(0.25)) | 2 | -| L2 MAL pool | 1 | *(shared with L2 above)* | *(shared)* | -| TopNWorker (5-10 types) | 5-10 | TOPN_WORKER (fixed(1), handler map) | 1 | -| GRPCRemoteClient (2-4 peers) | 2-4 | GRPCRemoteClient.* (shared IO_POOL) | | -| JDBCBatchDAO | 2-4 | JDBCBatchDAO (shared IO_POOL) | | -| Exporters (gRPC/Kafka) | 0-3 | Exporter.* (shared IO_POOL) | | -| | | **IO_POOL shared scheduler (cpuCores(0.5))** | **4** | -| **Total** | **~38-48** | **Total** | **~15** | - -On different hardware: - -| Machine | L1 (OAL+MAL) | L2 (OAL+MAL) | TopN | IO_POOL | Total | -|-----------|---------------|---------------|------|---------|-------| -| 2-core | 2 | 1 | 1 | 1 | 5 | -| 4-core | 4 | 1 | 1 | 2 | 8 | -| 8-core | 8 | 2 | 1 | 4 | 15 | -| 16-core | 16 | 4 | 1 | 8 | 29 | - -Savings (8-core): -- L1: 24+2 → 8 threads (OAL+MAL share one pool, CPU-relative, no empty channel iteration) -- L2: 2+1 → 2 threads (OAL+MAL share one pool) -- TopN: 5-10 → 1 thread (all types share one queue with handler map dispatch) -- I/O queues: 4-11 → 4 threads (shared IO_POOL for gRPC, Kafka, JDBC) -- Total: from ~38-48 OS threads down to ~15 - -IO_POOL queues (all do network or database I/O, low-throughput, bursty): - -| Queue Name | Current Source | I/O Type | Threads Before | -|-------------------------|------------------------|---------------|----------------| -| GRPCRemoteClient.* | GRPCRemoteClient.java | gRPC network | 2-4 (per peer) | -| GRPCMetricsExporter | GRPCMetricsExporter.java| gRPC network | 1 | -| KafkaLogExporter | KafkaLogExporter.java | Kafka network | 1 | -| KafkaTraceExporter | KafkaTraceExporter.java| Kafka network | 1 | -| JDBCBatchDAO | JDBCBatchDAO.java | JDBC database | 2-4 | -| **Subtotal** | | | **7-14** | -| **After (IO_POOL)** | | | **cpuCores(0.5)** | - -Number of partitions/buffers: - -| Before | Count | After | Count | -|--------------------------------|----------|------------------------------------|-------| -| L1 OAL channels (100+ * 2ch) | 200+ | L1 partitions (8 threads * 2) | 16 | -| L1 MAL channels (N * 1ch) | N | *(shared with L1 above)* | *(0)* | -| L2 OAL channels (100+ * 1ch) | 100+ | L2 partitions (2 threads * 2) | 4 | -| L2 MAL channels (N * 1ch) | N | *(shared with L2 above)* | *(0)* | -| TopN buffers (5-10 types) | 5-10 | TOPN_WORKER partitions | 1 | -| I/O buffers (gRPC, JDBC, etc.) | 5-8 | I/O queue partitions | 5-8 | -| **Total buffers** | **300+** | **Total buffers** | **~28** | - -### What Gets Dropped - -| DataCarrier Feature | Status | Reason | -|-------------------------------------|-----------|-------------------------------------------------------| -| One queue per metric type | Dropped | Shared partitions + handler map instead | -| Separate OAL / MAL pools | Dropped | OAL and MAL share L1/L2 queues (handler map dispatch) | -| One thread per TopN type | Dropped | All TopN types share one TOPN_WORKER queue | -| Multi-channel per DataCarrier | Dropped | Single partition array replaces multi-channel | -| IDataPartitioner | Dropped | Simple round-robin on partition array | -| Consumer instantiation by class | Dropped | All callers use instance-based handlers | -| Consumer init(Properties) | Dropped | Not used by any production consumer | -| EnvUtil override | Dropped | Configuration via application.yml | -| Two separate queue classes | Dropped | One `BatchQueue` with configurable scheduler modes | -| BulkConsumePool / ConsumerPoolFactory | Dropped | Dedicated/shared ScheduledExecutorService replaces pool| -| Fixed thread counts | Dropped | ThreadPolicy: CPU-relative (cpuCores) or fixed | -| Signal-driven consumption | Dropped | Adaptive backoff replaces explicit notify | -| Separate createSharedScheduler step | Dropped | Shared schedulers created lazily on first queue ref | - -### What Gets Preserved - -| Feature | How | -|-----------------------|--------------------------------------------------------------| -| Named queue management| `BatchQueueManager.create/createIfAbsent/get` by name | -| Per-type isolation | `handlerMap` dispatches each class to its own handler | -| Bounded buffer | ArrayBlockingQueue per partition | -| BLOCKING strategy | `queue.put()` — producer blocks when full | -| IF_POSSIBLE strategy | `queue.offer()` — returns false when full, data dropped | -| Batch consumption | `drainTo(list)` — same as current | -| Error handling | `errorHandler.onError(batch, throwable)` | -| Nothing-to-consume | `handler.onIdle()` — called when all partitions empty | -| Fast data response | Adaptive backoff (minIdleMs=5ms) replaces signal-driven mode | -| Drain on shutdown | Manager shutdown drains all queues, then schedulers | -| Produce-gate | `produce()` returns false if queue is shut down | -| Hardware scaling | ThreadPolicy.cpuCores() scales threads with available cores | +Steps 6 and 7 batch all revocations first, then wait for all old owners in parallel. +This bounds the total handoff latency to one drain cycle (the slowest old owner), +rather than one cycle per moved partition sequentially. + +### Safety guarantees + +| Property | Mechanism | +|----------|-----------| +| No concurrent handler calls | Two-phase handoff: revoke + cycle-count fence + assign | +| No data loss | Items stay in `ArrayBlockingQueue` during the UNOWNED gap | +| No data duplication | `drainTo` atomically moves items out of the queue | +| Lock-free hot path | Only `AtomicIntegerArray.get()` added to drain loop | +| Lock-free produce path | Only `AtomicLongArray.incrementAndGet()` added | +| Bounded handoff latency | At most one `maxIdleMs` wait per rebalance | + +### Scope + +| Queue | Rebalance? | Reason | +|-------|------------|--------| +| L1 Aggregation (`METRICS_L1_AGGREGATION`) | Yes | Hundreds of metric types, cpuCores threads, high throughput variance | +| L2 Persistence (`METRICS_L2_PERSISTENCE`) | Yes | Same type distribution, fewer threads but still benefits | +| TopN (`TOPN_PERSISTENCE`) | No | Single thread, nothing to rebalance | +| Exporters / gRPC Remote / JDBC | No | Single thread or fixed(1) partition, nothing to rebalance | + +### Complexity budget + +| Component | Lines (est.) | Hot-path cost | +|-----------|-------------|---------------| +| `partitionThroughput` counter | ~5 | 1 `AtomicLong.incrementAndGet` per produce | +| `partitionOwner` check in drain loop | ~5 | 1 `AtomicInteger.get` per partition per cycle | +| `cycleCount` bump | ~2 | 1 `AtomicLong.incrementAndGet` per drain cycle | +| Rebalance task (LPT + handoff) | ~80 | 0 (runs on scheduler, not on hot path) | +| Config field + validation | ~10 | 0 | +| **Total** | **~100** | **2 atomic ops per produce+drain** |
