This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch library-batch-queue in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit a2bd9aa7c5a4ecdaae27a6ffc40d008e52e73f28 Author: Wu Sheng <[email protected]> AuthorDate: Fri Feb 13 23:27:02 2026 +0800 Simplify PartitionPolicy API and clean up redundant tests Remove resolve(int) and isAdaptive() from PartitionPolicy — the single resolve(int, int) method handles all policy types uniformly. For adaptive policies with 0 handlers, resolve now returns threadCount as a sensible initial partition count, eliminating isAdaptive() branching in BatchQueue. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../server-library/library-batch-queue/DESIGN.md | 37 +- .../oap/server/library/batchqueue/BatchQueue.java | 177 +++-- .../library/batchqueue/BatchQueueConfig.java | 4 +- .../server/library/batchqueue/PartitionPolicy.java | 103 ++- .../library/batchqueue/BatchQueueBenchmark.java | 523 +++++++------- .../library/batchqueue/BatchQueueConfigTest.java | 4 +- .../server/library/batchqueue/BatchQueueTest.java | 115 ++-- .../library/batchqueue/BenchmarkMetricTypes.java | 756 +++++---------------- .../library/batchqueue/PartitionPolicyTest.java | 76 ++- .../library/datacarrier/DataCarrierBenchmark.java | 297 +------- 10 files changed, 801 insertions(+), 1291 deletions(-) diff --git a/oap-server/server-library/library-batch-queue/DESIGN.md b/oap-server/server-library/library-batch-queue/DESIGN.md index 4094f56f29..bc107dbbaf 100644 --- a/oap-server/server-library/library-batch-queue/DESIGN.md +++ b/oap-server/server-library/library-batch-queue/DESIGN.md @@ -388,7 +388,7 @@ public class BatchQueue<T> { ScheduledExecutorService sharedScheduler = BatchQueueManager.getOrCreateSharedScheduler( config.getSharedSchedulerName(), config.getSharedSchedulerThreads()); - int partitionCount = config.getPartitions().resolve(1); + int partitionCount = config.getPartitions().resolve(1, 0); this.partitions = new ArrayBlockingQueue[partitionCount]; for (int i = 0; i < partitions.length; i++) { partitions[i] = new ArrayBlockingQueue<>(config.getBufferSize()); @@ -403,7 +403,7 @@ public class BatchQueue<T> { } else { // Dedicated scheduler mode: resolve threads and partitions. int threadCount = config.getThreads().resolve(); // cpuCores(1.0) → 8 on 8-core - int partitionCount = config.getPartitions().resolve(threadCount); + int partitionCount = config.getPartitions().resolve(threadCount, 0); // Validation: if partitions < threads, cut threads to match and warn. if (partitionCount < threadCount) { @@ -899,33 +899,34 @@ public class ThreadPolicy { * Two modes: * - fixed(N): exactly N partitions, regardless of thread count. * - threadMultiply(N): N * resolved thread count. + * - adaptive(): partition count grows with registered handlers. + * Threshold = threadCount * multiplier (default 25). + * Below threshold: 1:1 (one partition per handler). + * Above threshold: excess at 1:2 ratio. * - * At queue construction time, the resolved partition count is validated: - * if partitions < resolved thread count, thread count is reduced to match - * partitions and a warning is logged. + * All policies resolved via resolve(threadCount, handlerCount). + * At queue construction time, if partitions < resolved thread count, + * thread count is reduced to match and a warning is logged. */ public class PartitionPolicy { private final int fixedCount; // > 0 for fixed mode - private final int multiplier; // > 0 for threadMultiply mode + private final int multiplier; // > 0 for threadMultiply/adaptive + private final boolean adaptive; // true for adaptive mode - /** - * Fixed number of partitions. - * Example: fixed(8) → always 8 partitions. - */ public static PartitionPolicy fixed(int count); - - /** - * Partitions = multiplier * resolved thread count. - * Example: threadMultiply(2) with 4 resolved threads → 8 partitions. - */ public static PartitionPolicy threadMultiply(int multiplier); + public static PartitionPolicy adaptive(); + public static PartitionPolicy adaptive(int multiplier); /** * Resolve the actual partition count. - * For fixed mode, returns fixedCount. - * For threadMultiply mode, returns multiplier * resolvedThreadCount. + * - fixed: returns fixedCount (both params ignored). + * - threadMultiply: returns multiplier * resolvedThreadCount. + * - adaptive: handlerCount == 0 → resolvedThreadCount; + * handlerCount <= threshold → handlerCount (1:1); + * handlerCount > threshold → threshold + (excess / 2). */ - public int resolve(int resolvedThreadCount); + public int resolve(int resolvedThreadCount, int handlerCount); } ``` diff --git a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java index 6f2d04b79e..f6a117039e 100644 --- a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java +++ b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -32,6 +33,19 @@ import lombok.extern.slf4j.Slf4j; /** * A partitioned, self-draining queue with type-based dispatch. * + * <h3>Usage</h3> + * <pre> + * BatchQueue queue = BatchQueueManager.create(name, config); + * queue.addHandler(TypeA.class, handlerA); // register metric types + * queue.addHandler(TypeB.class, handlerB); // partitions grow adaptively + * queue.produce(data); // data flows immediately + * </pre> + * + * <p>For adaptive partition policies, each {@link #addHandler} call recalculates + * the partition count from the current number of registered handlers, growing the + * partition array as needed. The thread count is resolved at construction time + * and remains fixed. + * * <h3>Produce workflow</h3> * <pre> * produce(data) @@ -90,13 +104,14 @@ import lombok.extern.slf4j.Slf4j; * <pre> * shared scheduler, partitions=1, one consumer --> I/O queue (gRPC, Kafka, JDBC) * dedicated fixed(1), partitions=1, many handlers --> TopN (all types share 1 thread) - * dedicated cpuCores(1.0), threadMultiply(2), + * dedicated cpuCores(1.0), adaptive(), * many handlers --> metrics aggregation * </pre> */ @Slf4j public class BatchQueue<T> { private final String name; + private final BatchQueueConfig<T> config; /** The thread pool that executes drain tasks. Either dedicated or shared. */ private final ScheduledExecutorService scheduler; @@ -107,13 +122,6 @@ public class BatchQueue<T> { /** Non-null only for shared schedulers; used to release the ref count on shutdown. */ private final String sharedSchedulerName; - /** - * Fixed-size partitions. Producers select a partition via {@link PartitionSelector} - * to reduce contention. Each partition is an {@link ArrayBlockingQueue} with - * capacity = bufferSize. - */ - private final ArrayBlockingQueue<T>[] partitions; - /** * Cached partition selector from config. Only used when {@code partitions.length > 1}; * single-partition queues bypass the selector entirely. @@ -128,25 +136,30 @@ public class BatchQueue<T> { */ private final ConcurrentHashMap<Class<?>, HandlerConsumer<T>> handlerMap; - private final BatchQueueConfig<T> config; + /** + * Tracks unregistered types that have already been warned about, + * to avoid flooding the log with repeated errors. + */ + private final Set<Class<?>> warnedUnregisteredTypes; - /** Set to false on {@link #shutdown()} to stop drain loops and reject new data. */ - private volatile boolean running; + /** Resolved thread count, stored for adaptive partition recalculation. */ + private final int resolvedThreadCount; + + /** Number of drain tasks (equals thread count for dedicated, 1 for shared). */ + private final int taskCount; + + /** + * Partitions. Producers select a partition via {@link PartitionSelector}. + * For adaptive policies, this array grows via {@link #addHandler} as handlers + * are registered. Volatile for visibility to drain loop threads. + */ + private volatile ArrayBlockingQueue<T>[] partitions; /** * Which partitions each drain task is responsible for. - * {@code assignedPartitions[taskIndex] = int[] of partition indices}. - * - * <p>Dedicated mode: partitions are distributed across threads by round-robin. - * Example with 4 partitions and 2 threads: - * <pre> - * task 0 --> partitions [0, 2] - * task 1 --> partitions [1, 3] - * </pre> - * - * <p>Shared mode: a single task drains all partitions. + * Volatile for visibility when partitions grow via {@link #addHandler}. */ - private final int[][] assignedPartitions; + private volatile int[][] assignedPartitions; /** * Per-task count of consecutive idle cycles (no data drained). @@ -154,23 +167,25 @@ public class BatchQueue<T> { */ private final int[] consecutiveIdleCycles; + /** Set to false on {@link #shutdown()} to stop drain loops and reject new data. */ + private volatile boolean running; + @SuppressWarnings("unchecked") BatchQueue(final String name, final BatchQueueConfig<T> config) { this.name = name; this.config = config; this.partitionSelector = config.getPartitionSelector(); this.handlerMap = new ConcurrentHashMap<>(); + this.warnedUnregisteredTypes = ConcurrentHashMap.newKeySet(); - final int taskCount; if (config.getSharedSchedulerName() != null) { // ---- Shared scheduler mode ---- - // Borrow a scheduler from BatchQueueManager (ref-counted). - // Only 1 drain task; it drains all partitions sequentially. final ScheduledExecutorService sharedScheduler = BatchQueueManager.getOrCreateSharedScheduler( config.getSharedSchedulerName(), config.getSharedSchedulerThreads()); - final int partitionCount = config.getPartitions().resolve(1); + this.resolvedThreadCount = 1; + final int partitionCount = config.getPartitions().resolve(1, 0); this.partitions = new ArrayBlockingQueue[partitionCount]; for (int i = 0; i < partitions.length; i++) { partitions[i] = new ArrayBlockingQueue<>(config.getBufferSize()); @@ -179,18 +194,16 @@ public class BatchQueue<T> { this.scheduler = sharedScheduler; this.dedicatedScheduler = false; this.sharedSchedulerName = config.getSharedSchedulerName(); - taskCount = 1; - final int[] allPartitions = new int[partitions.length]; - for (int i = 0; i < allPartitions.length; i++) { - allPartitions[i] = i; - } - this.assignedPartitions = new int[][] {allPartitions}; + this.taskCount = 1; + this.assignedPartitions = buildAssignments(1, partitionCount); } else { // ---- Dedicated scheduler mode ---- - // Create a private ScheduledThreadPool. Each thread drains - // a fixed subset of partitions (no cross-thread contention). int threadCount = config.getThreads().resolve(); - final int partitionCount = config.getPartitions().resolve(threadCount); + this.resolvedThreadCount = threadCount; + + // For adaptive with 0 handlers, resolve returns threadCount (sensible initial). + // For fixed/threadMultiply, resolve returns the configured count. + final int partitionCount = config.getPartitions().resolve(threadCount, 0); if (partitionCount < threadCount) { log.warn("BatchQueue[{}]: partitions({}) < threads({}), reducing threads to {}", @@ -211,24 +224,8 @@ public class BatchQueue<T> { }); this.dedicatedScheduler = true; this.sharedSchedulerName = null; - taskCount = threadCount; - - // Distribute partitions across threads by round-robin - this.assignedPartitions = new int[taskCount][]; - final List<List<Integer>> assignment = new ArrayList<>(); - for (int t = 0; t < taskCount; t++) { - assignment.add(new ArrayList<>()); - } - for (int p = 0; p < partitions.length; p++) { - assignment.get(p % taskCount).add(p); - } - for (int t = 0; t < taskCount; t++) { - final List<Integer> parts = assignment.get(t); - assignedPartitions[t] = new int[parts.size()]; - for (int i = 0; i < parts.size(); i++) { - assignedPartitions[t][i] = parts.get(i); - } - } + this.taskCount = threadCount; + this.assignedPartitions = buildAssignments(threadCount, partitionCount); } this.consecutiveIdleCycles = new int[taskCount]; @@ -239,6 +236,28 @@ public class BatchQueue<T> { } } + /** + * Build round-robin partition-to-task assignments. + */ + private static int[][] buildAssignments(final int taskCount, final int partitionCount) { + final int[][] result = new int[taskCount][]; + final List<List<Integer>> assignment = new ArrayList<>(); + for (int t = 0; t < taskCount; t++) { + assignment.add(new ArrayList<>()); + } + for (int p = 0; p < partitionCount; p++) { + assignment.get(p % taskCount).add(p); + } + for (int t = 0; t < taskCount; t++) { + final List<Integer> parts = assignment.get(t); + result[t] = new int[parts.size()]; + for (int i = 0; i < parts.size(); i++) { + result[t][i] = parts.get(i); + } + } + return result; + } + /** * Schedule the next drain for the given task. The delay uses adaptive exponential * backoff: {@code minIdleMs * 2^consecutiveIdleCycles}, capped at maxIdleMs. @@ -265,10 +284,29 @@ public class BatchQueue<T> { /** * Register a type-based handler. Items whose {@code getClass()} matches the given * type will be batched together and dispatched to this handler. - * Only used when no single consumer is set in config. + * + * <p>For adaptive partition policies, adding a handler recalculates the partition + * count and grows the partition array if needed. For non-adaptive policies the + * resolved count never changes, so this is a no-op beyond the registration. + * Drain loop threads pick up new partitions on their next cycle via volatile reads. */ + @SuppressWarnings("unchecked") public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler) { handlerMap.put(type, handler); + + final int newPartitionCount = config.getPartitions() + .resolve(resolvedThreadCount, handlerMap.size()); + final ArrayBlockingQueue<T>[] currentPartitions = this.partitions; + if (newPartitionCount > currentPartitions.length) { + final ArrayBlockingQueue<T>[] grown = new ArrayBlockingQueue[newPartitionCount]; + System.arraycopy(currentPartitions, 0, grown, 0, currentPartitions.length); + for (int i = currentPartitions.length; i < newPartitionCount; i++) { + grown[i] = new ArrayBlockingQueue<>(config.getBufferSize()); + } + // Volatile writes — drain loop threads see the new assignments on next cycle + this.assignedPartitions = buildAssignments(taskCount, newPartitionCount); + this.partitions = grown; + } } /** @@ -292,18 +330,19 @@ public class BatchQueue<T> { if (!running) { return false; } - final int index = partitions.length == 1 - ? 0 : partitionSelector.select(data, partitions.length); + final ArrayBlockingQueue<T>[] currentPartitions = this.partitions; + final int index = currentPartitions.length == 1 + ? 0 : partitionSelector.select(data, currentPartitions.length); if (config.getStrategy() == BufferStrategy.BLOCKING) { try { - partitions[index].put(data); + currentPartitions[index].put(data); return true; } catch (final InterruptedException e) { Thread.currentThread().interrupt(); return false; } } else { - return partitions[index].offer(data); + return currentPartitions[index].offer(data); } } @@ -312,17 +351,21 @@ public class BatchQueue<T> { * into a single list, then dispatches. Loops until partitions are empty, * then breaks to re-schedule with backoff. * - * <p>The loop structure ensures that a burst of data is fully drained before - * the task yields. This prevents data piling up across reschedule gaps. + * <p>Reads volatile references to {@code partitions} and {@code assignedPartitions} + * at the start of each cycle, so it picks up new partitions added via + * {@link #addHandler} on the next iteration. */ void drainLoop(final int taskIndex) { - final int[] myPartitions = assignedPartitions[taskIndex]; + final ArrayBlockingQueue<T>[] currentPartitions = this.partitions; + final int[] myPartitions = this.assignedPartitions[taskIndex]; try { while (running) { // Drain all assigned partitions into one batch final List<T> combined = new ArrayList<>(); for (final int partitionIndex : myPartitions) { - partitions[partitionIndex].drainTo(combined); + if (partitionIndex < currentPartitions.length) { + currentPartitions[partitionIndex].drainTo(combined); + } } if (combined.isEmpty()) { @@ -355,7 +398,7 @@ public class BatchQueue<T> { * to one consumer, regardless of item types.</li> * <li><b>Handler map</b>: items are grouped by {@code item.getClass()}, then * each group is dispatched to its registered handler. Unregistered types - * are silently dropped.</li> + * are logged as errors and dropped.</li> * </ol> */ private void dispatch(final List<T> batch) { @@ -382,6 +425,11 @@ public class BatchQueue<T> { } catch (final Throwable t) { handleError(entry.getValue(), t); } + } else { + if (warnedUnregisteredTypes.add(entry.getKey())) { + log.error("BatchQueue[{}]: no handler for type {}, {} items abandoned", + name, entry.getKey().getName(), entry.getValue().size()); + } } } } @@ -427,8 +475,9 @@ public class BatchQueue<T> { void shutdown() { running = false; // Final drain — flush any remaining data to consumers + final ArrayBlockingQueue<T>[] currentPartitions = this.partitions; final List<T> combined = new ArrayList<>(); - for (final ArrayBlockingQueue<T> partition : partitions) { + for (final ArrayBlockingQueue<T> partition : currentPartitions) { partition.drainTo(combined); } if (!combined.isEmpty()) { diff --git a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java index 32ca86c2bf..5de0fd39d2 100644 --- a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java +++ b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java @@ -68,10 +68,10 @@ public class BatchQueueConfig<T> { private QueueErrorHandler<T> errorHandler; @Builder.Default - private long minIdleMs = 5; + private long minIdleMs = 1; @Builder.Default - private long maxIdleMs = 200; + private long maxIdleMs = 50; void validate() { final boolean hasDedicated = threads != null; diff --git a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java index ac03e9f75d..6f3428a6f3 100644 --- a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java +++ b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java @@ -21,56 +21,128 @@ package org.apache.skywalking.oap.server.library.batchqueue; /** * Determines the number of partitions for a BatchQueue. * - * Two modes: - * - fixed(N): exactly N partitions, regardless of thread count. - * - threadMultiply(N): N * resolved thread count. + * <ul> + * <li>{@link #fixed(int)}: exactly N partitions, regardless of thread count.</li> + * <li>{@link #threadMultiply(int)}: N * resolved thread count.</li> + * <li>{@link #adaptive()}: recommended for metrics aggregation. The partition + * count grows as handlers are registered via {@link BatchQueue#addHandler}. + * Uses {@code threadCount * multiplier} (default 25) as a threshold. + * Below threshold, 1:1 mapping (one partition per handler). Above threshold, + * excess handlers share partitions at 1:2 ratio.</li> + * </ul> * - * At queue construction time, the resolved partition count is validated: - * if partitions less than resolved thread count, thread count is reduced to match - * partitions and a warning is logged. + * <p>All policies are resolved via {@link #resolve(int, int)}. For non-adaptive + * policies the handlerCount parameter is ignored. At queue creation time, if the + * resolved partition count is less than the thread count, the thread count is + * reduced to match and a warning is logged. */ public class PartitionPolicy { + private static final int DEFAULT_ADAPTIVE_MULTIPLIER = 25; + private final int fixedCount; private final int multiplier; + private final boolean adaptive; - private PartitionPolicy(final int fixedCount, final int multiplier) { + private PartitionPolicy(final int fixedCount, final int multiplier, + final boolean adaptive) { this.fixedCount = fixedCount; this.multiplier = multiplier; + this.adaptive = adaptive; } /** * Fixed number of partitions. * - * @throws IllegalArgumentException if count < 1 + * @throws IllegalArgumentException if count < 1 */ public static PartitionPolicy fixed(final int count) { if (count < 1) { throw new IllegalArgumentException("Partition count must be >= 1, got: " + count); } - return new PartitionPolicy(count, 0); + return new PartitionPolicy(count, 0, false); } /** * Partitions = multiplier * resolved thread count. * - * @throws IllegalArgumentException if multiplier < 1 + * @throws IllegalArgumentException if multiplier < 1 */ public static PartitionPolicy threadMultiply(final int multiplier) { if (multiplier < 1) { throw new IllegalArgumentException("Partition multiplier must be >= 1, got: " + multiplier); } - return new PartitionPolicy(0, multiplier); + return new PartitionPolicy(0, multiplier, false); + } + + /** + * Adaptive partition count with default threshold multiplier (25). + * + * <p>The partition count grows as handlers are registered via + * {@link BatchQueue#addHandler}: + * <ul> + * <li>Threshold = {@code threadCount * 25}</li> + * <li>handlerCount <= threshold: one partition per handler (1:1)</li> + * <li>handlerCount > threshold: {@code threshold + (handlerCount - threshold) / 2}</li> + * <li>handlerCount == 0: returns {@code threadCount} as initial count</li> + * </ul> + * + * <p>Examples with 8 threads (threshold = 200): + * <pre> + * 0 handlers → 8 partitions (initial = threadCount) + * 100 handlers → 100 partitions (1:1, below threshold) + * 200 handlers → 200 partitions (1:1, at threshold) + * 500 handlers → 350 partitions (200 + 300/2) + * 1000 handlers → 600 partitions (200 + 800/2) + * 2000 handlers → 1100 partitions (200 + 1800/2) + * </pre> + */ + public static PartitionPolicy adaptive() { + return new PartitionPolicy(0, DEFAULT_ADAPTIVE_MULTIPLIER, true); + } + + /** + * Adaptive partition count with custom threshold multiplier. + * + * <p>Threshold = {@code threadCount * multiplier}. Below threshold, one + * partition per handler (1:1). Above threshold, excess handlers share + * at 1:2 ratio: {@code threshold + (handlerCount - threshold) / 2}. + * + * @param multiplier threshold per thread (default 25) + * @throws IllegalArgumentException if multiplier < 1 + */ + public static PartitionPolicy adaptive(final int multiplier) { + if (multiplier < 1) { + throw new IllegalArgumentException( + "adaptive multiplier must be >= 1, got: " + multiplier); + } + return new PartitionPolicy(0, multiplier, true); } /** * Resolve the actual partition count. - * For fixed mode, returns fixedCount. - * For threadMultiply mode, returns multiplier * resolvedThreadCount (min 1). + * <ul> + * <li>fixed: returns the pre-set count (both parameters ignored).</li> + * <li>threadMultiply: returns multiplier * resolvedThreadCount (handlerCount ignored).</li> + * <li>adaptive: when handlerCount is 0, returns resolvedThreadCount as a sensible + * initial count. Otherwise, threshold = threadCount * multiplier; if handlerCount + * <= threshold, returns handlerCount (1:1). If above, returns + * threshold + (handlerCount - threshold) / 2.</li> + * </ul> */ - public int resolve(final int resolvedThreadCount) { + public int resolve(final int resolvedThreadCount, final int handlerCount) { if (fixedCount > 0) { return fixedCount; } + if (adaptive) { + if (handlerCount == 0) { + return Math.max(1, resolvedThreadCount); + } + final int threshold = Math.max(1, multiplier * resolvedThreadCount); + if (handlerCount <= threshold) { + return handlerCount; + } + return threshold + (handlerCount - threshold) / 2; + } return Math.max(1, multiplier * resolvedThreadCount); } @@ -79,6 +151,9 @@ public class PartitionPolicy { if (fixedCount > 0) { return "fixed(" + fixedCount + ")"; } + if (adaptive) { + return "adaptive(multiplier=" + multiplier + ")"; + } return "threadMultiply(" + multiplier + ")"; } } diff --git a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java index e4e8ceee25..115a03fcad 100644 --- a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java +++ b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java @@ -26,50 +26,56 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; /** - * Throughput benchmark for BatchQueue consuming with handler-map dispatch. + * Throughput benchmark for BatchQueue with handler-map dispatch. * - * <p>Measures how many metric items per second the queue can consume - * when there are 40 or 200 distinct metric types, each routed - * to its own handler via class-based dispatch. + * <p>Simulates production OAP metrics aggregation: 500-2000 distinct metric types, + * 32 gRPC producer threads, 8 consumer threads. Tests various partition strategies + * from fixed small counts to 1:1 partition-per-type binding. * * <p>Run with: mvn test -pl oap-server/server-library/library-batch-queue * -Dtest=BatchQueueBenchmark -DfailIfNoTests=false * * <h3>Reference results (Apple M3 Max, 128 GB RAM, macOS 26.2, JDK 17)</h3> + * + * <p><b>Fixed partitions (typeHash selector):</b> * <pre> - * Benchmark Types Threads Partitions IF_POSSIBLE BLOCKING - * ---------------------------- ------ -------- --------------- ------------- ------------- - * 40-types-1thread 40 fixed(1) fixed(1) ~33,200,000 ~32,200,000 - * 200-types-1thread 200 fixed(1) fixed(1) ~34,600,000 ~33,900,000 - * direct-consumer-4threads 1 fixed(4) multiply(8) ~19,200,000 ~18,800,000 - * 40-types-4threads 40 fixed(4) multiply(8) ~31,200,000 ~44,300,000 - * 200-types-4threads 200 fixed(4) multiply(8) ~43,900,000 ~43,400,000 - * 200reg/50hot, 100 producers 200 fixed(4) multiply(8) ~26,900,000 ~26,100,000 + * Partitions BufSize 500 types (IF/BLK) 1000 types (IF/BLK) 2000 types (IF/BLK) + * ---------- -------- ----------------------- ----------------------- ----------------------- + * fixed(16) 50,000 ~19.0M / ~12.3M ~17.9M / ~17.1M ~15.7M / ~15.2M + * fixed(64) 500,000 ~17.0M / ~18.0M ~17.7M / ~18.4M ~17.7M / ~18.0M + * fixed(128) 500,000 ~24.1M / ~23.2M ~24.9M / ~26.1M ~25.3M / ~24.8M + * </pre> * - * All runs: bufferSize=50,000, 0% drop rate. - * Single-partition queues (fixed(1)) bypass PartitionSelector entirely. - * Multi-partition queues default to PartitionSelector.typeHash() — - * same type always hits the same partition, so each consumer thread - * drains pre-grouped batches. Custom selectors can be set via config. + * <p><b>Type-aware partitions (typeId selector, 50K buffer each):</b> + * <pre> + * Ratio Partitions 500 types (IF/BLK) 1000 types (IF/BLK) 2000 types (IF/BLK) + * ---------- ------------ ----------------------- ----------------------- ----------------------- + * 1:4 types/4 ~32.1M / ~29.9M ~40.0M / ~40.5M ~47.1M / ~50.7M + * 1:2 types/2 ~38.2M / ~35.9M ~49.6M / ~35.9M ~52.6M / ~58.6M + * adaptive 350/600/1100 ~45.7M / ~46.3M ~50.5M / ~54.1M ~64.0M / ~60.3M + * 1:1 types ~51.3M / ~54.4M ~61.2M / ~62.5M ~75.7M / ~67.4M * </pre> * - * <h3>Comparison with DataCarrier — production case (4 consumer threads)</h3> + * <p><b>DataCarrier baseline (N independent carriers, raw Long values, pool(8)):</b> * <pre> - * Scenario DataCarrier BatchQueue - * (N carriers (1 queue + - * + BulkPool) handler map) - * ------------------------------- ----------- ----------- - * 40 types, 4 producers ~42,000,000 ~31,200,000 - * 200 types, 4 producers ~25,100,000 ~43,900,000 - * 200 reg / 50 hot, 100 producers ~17,600,000 ~26,900,000 (IF_POSSIBLE) - * 200 reg / 50 hot, 100 producers ~23,800,000 ~26,100,000 (BLOCKING) + * 500 types: ~33.4M IF_POSSIBLE / ~32.5M BLOCKING + * 1000 types: ~37.6M IF_POSSIBLE / ~36.0M BLOCKING + * 2000 types: ~38.0M IF_POSSIBLE / ~42.1M BLOCKING + * </pre> * - * BatchQueue uses configurable PartitionSelector (default: typeHash). - * Same-type items land on the same partition, so dispatch() grouping is - * effectively a no-op. The production-realistic case (200reg/50hot/100 - * producers) shows BatchQueue at ~27M vs DataCarrier at ~18M, a 1.5x - * advantage. + * <p><b>BatchQueue vs DataCarrier (IF_POSSIBLE):</b> + * <pre> + * 500 types 1000 types 2000 types + * 1:4 -4% +6% +24% + * 1:2 +14% +32% +38% + * adaptive +37% +34% +68% + * 1:1 +53% +63% +99% * </pre> + * + * <p>All runs: 32 producers, fixed(8) threads, minIdleMs=1, maxIdleMs=50, 0% drop rate. + * 2000 metric types generated at runtime via bytecode (see {@link BenchmarkMetricTypes}). + * Adaptive policy: {@link PartitionPolicy#adaptive()} with threshold = threadCount * 25. + * Below threshold: 1:1 (one partition per type). Above: excess at 1:2 ratio. */ @Slf4j @SuppressWarnings("all") @@ -77,274 +83,345 @@ public class BatchQueueBenchmark { private static final int WARMUP_SECONDS = 2; private static final int MEASURE_SECONDS = 5; - private static final int PRODUCER_THREADS = 4; + private static final int PRODUCER_THREADS = 32; + private static final ThreadPolicy THREADS = ThreadPolicy.fixed(8); @AfterEach public void cleanup() { BatchQueueManager.reset(); } - // ---- IF_POSSIBLE (non-blocking) benchmarks ---- + // ---- Fixed partitions, typeHash selector ---- + // fixed(16) 50K: ~19.0M/~12.3M ~17.9M/~17.1M ~15.7M/~15.2M + // fixed(64) 500K: ~17.0M/~18.0M ~17.7M/~18.4M ~17.7M/~18.0M + // fixed(128)500K: ~24.1M/~23.2M ~24.9M/~26.1M ~25.3M/~24.8M @Test - public void benchmarkHandlerMapDispatch40Types() throws Exception { - runBenchmark("40-types-4t-ifpossible", 40, ThreadPolicy.fixed(4), - PartitionPolicy.threadMultiply(8), BufferStrategy.IF_POSSIBLE); + public void benchmark500Types() throws Exception { + runBenchmark("500-types", 500, 16, 50_000, BufferStrategy.IF_POSSIBLE); } @Test - public void benchmarkHandlerMapDispatch200Types() throws Exception { - runBenchmark("200-types-4t-ifpossible", 200, ThreadPolicy.fixed(4), - PartitionPolicy.threadMultiply(8), BufferStrategy.IF_POSSIBLE); + public void benchmark1000Types() throws Exception { + runBenchmark("1000-types", 1000, 16, 50_000, BufferStrategy.IF_POSSIBLE); } @Test - public void benchmarkDirectConsumer() throws Exception { - runDirectBenchmark("direct-4t-ifpossible", ThreadPolicy.fixed(4), - PartitionPolicy.threadMultiply(8), BufferStrategy.IF_POSSIBLE); + public void benchmark2000Types() throws Exception { + runBenchmark("2000-types", 2000, 16, 50_000, BufferStrategy.IF_POSSIBLE); } @Test - public void benchmarkHandlerMapDispatch40TypesSingleThread() throws Exception { - runBenchmark("40-types-1t-ifpossible", 40, ThreadPolicy.fixed(1), - PartitionPolicy.fixed(1), BufferStrategy.IF_POSSIBLE); + public void benchmark500TypesBlocking() throws Exception { + runBenchmark("500-types-blocking", 500, 16, 50_000, BufferStrategy.BLOCKING); } @Test - public void benchmarkHandlerMapDispatch200TypesSingleThread() throws Exception { - runBenchmark("200-types-1t-ifpossible", 200, ThreadPolicy.fixed(1), - PartitionPolicy.fixed(1), BufferStrategy.IF_POSSIBLE); + public void benchmark1000TypesBlocking() throws Exception { + runBenchmark("1000-types-blocking", 1000, 16, 50_000, BufferStrategy.BLOCKING); } - // ---- BLOCKING benchmarks ---- + @Test + public void benchmark2000TypesBlocking() throws Exception { + runBenchmark("2000-types-blocking", 2000, 16, 50_000, BufferStrategy.BLOCKING); + } @Test - public void benchmarkHandlerMapDispatch40TypesBlocking() throws Exception { - runBenchmark("40-types-4t-blocking", 40, ThreadPolicy.fixed(4), - PartitionPolicy.threadMultiply(8), BufferStrategy.BLOCKING); + public void benchmark500Types_64p() throws Exception { + runBenchmark("500-types-64p", 500, 64, 500_000, BufferStrategy.IF_POSSIBLE); } @Test - public void benchmarkHandlerMapDispatch200TypesBlocking() throws Exception { - runBenchmark("200-types-4t-blocking", 200, ThreadPolicy.fixed(4), - PartitionPolicy.threadMultiply(8), BufferStrategy.BLOCKING); + public void benchmark1000Types_64p() throws Exception { + runBenchmark("1000-types-64p", 1000, 64, 500_000, BufferStrategy.IF_POSSIBLE); } @Test - public void benchmarkDirectConsumerBlocking() throws Exception { - runDirectBenchmark("direct-4t-blocking", ThreadPolicy.fixed(4), - PartitionPolicy.threadMultiply(8), BufferStrategy.BLOCKING); + public void benchmark2000Types_64p() throws Exception { + runBenchmark("2000-types-64p", 2000, 64, 500_000, BufferStrategy.IF_POSSIBLE); } @Test - public void benchmarkHandlerMapDispatch40TypesSingleThreadBlocking() throws Exception { - runBenchmark("40-types-1t-blocking", 40, ThreadPolicy.fixed(1), - PartitionPolicy.fixed(1), BufferStrategy.BLOCKING); + public void benchmark500TypesBlocking_64p() throws Exception { + runBenchmark("500-types-blocking-64p", 500, 64, 500_000, BufferStrategy.BLOCKING); } @Test - public void benchmarkHandlerMapDispatch200TypesSingleThreadBlocking() throws Exception { - runBenchmark("200-types-1t-blocking", 200, ThreadPolicy.fixed(1), - PartitionPolicy.fixed(1), BufferStrategy.BLOCKING); + public void benchmark1000TypesBlocking_64p() throws Exception { + runBenchmark("1000-types-blocking-64p", 1000, 64, 500_000, BufferStrategy.BLOCKING); } - // ---- Production-realistic: 200 types, 50 hot, 100 gRPC producers ---- + @Test + public void benchmark2000TypesBlocking_64p() throws Exception { + runBenchmark("2000-types-blocking-64p", 2000, 64, 500_000, BufferStrategy.BLOCKING); + } @Test - public void benchmarkHotTypes200x50x100producers() throws Exception { - runHotTypesBenchmark("200reg-50hot-100p-ifpossible", 200, 50, 100, - ThreadPolicy.fixed(4), PartitionPolicy.threadMultiply(8), - BufferStrategy.IF_POSSIBLE); + public void benchmark500Types_128p() throws Exception { + runBenchmark("500-types-128p", 500, 128, 500_000, BufferStrategy.IF_POSSIBLE); } @Test - public void benchmarkHotTypes200x50x100producersBlocking() throws Exception { - runHotTypesBenchmark("200reg-50hot-100p-blocking", 200, 50, 100, - ThreadPolicy.fixed(4), PartitionPolicy.threadMultiply(8), - BufferStrategy.BLOCKING); + public void benchmark1000Types_128p() throws Exception { + runBenchmark("1000-types-128p", 1000, 128, 500_000, BufferStrategy.IF_POSSIBLE); } - private void runHotTypesBenchmark(final String label, final int totalTypes, - final int hotTypes, final int producerCount, - final ThreadPolicy threads, - final PartitionPolicy partitions, - final BufferStrategy strategy) throws Exception { - final AtomicLong consumed = new AtomicLong(0); + @Test + public void benchmark2000Types_128p() throws Exception { + runBenchmark("2000-types-128p", 2000, 128, 500_000, BufferStrategy.IF_POSSIBLE); + } - final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue = BatchQueueManager.create( - "bench-" + label, - BatchQueueConfig.<BenchmarkMetricTypes.TypedMetric>builder() - .threads(threads) - .partitions(partitions) - .bufferSize(50_000) - .strategy(strategy) - .minIdleMs(1) - .maxIdleMs(50) - .build()); + @Test + public void benchmark500TypesBlocking_128p() throws Exception { + runBenchmark("500-types-blocking-128p", 500, 128, 500_000, BufferStrategy.BLOCKING); + } - // Register handlers for ALL 200 types - for (int t = 0; t < totalTypes; t++) { - queue.addHandler(BenchmarkMetricTypes.CLASSES[t], - (HandlerConsumer<BenchmarkMetricTypes.TypedMetric>) data -> - consumed.addAndGet(data.size())); - } + @Test + public void benchmark1000TypesBlocking_128p() throws Exception { + runBenchmark("1000-types-blocking-128p", 1000, 128, 500_000, BufferStrategy.BLOCKING); + } - // Warmup — only hot types get traffic - final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 1000L; - runProducersHot(queue, hotTypes, producerCount, warmupEnd); - Thread.sleep(200); - consumed.set(0); + @Test + public void benchmark2000TypesBlocking_128p() throws Exception { + runBenchmark("2000-types-blocking-128p", 2000, 128, 500_000, BufferStrategy.BLOCKING); + } - // Measure - final long measureStart = System.currentTimeMillis(); - final long measureEnd = measureStart + MEASURE_SECONDS * 1000L; - final long produced = runProducersHot(queue, hotTypes, producerCount, measureEnd); - final long measureDuration = System.currentTimeMillis() - measureStart; + // ---- Type-aware partitions, typeId selector, 50K buffer each ---- + // 1:4 types/4: ~32.1M/~29.9M ~40.0M/~40.5M ~47.1M/~50.7M + // 1:2 types/2: ~38.2M/~35.9M ~49.6M/~35.9M ~52.6M/~58.6M + // adaptive 350/600/1100: ~45.7M/~46.3M ~50.5M/~54.1M ~64.0M/~60.3M + // 1:1 types: ~51.3M/~54.4M ~61.2M/~62.5M ~75.7M/~67.4M - Thread.sleep(500); - final long totalConsumed = consumed.get(); + private static final PartitionSelector<BenchmarkMetricTypes.TypedMetric> TYPE_ID_SELECTOR = + (data, count) -> data.typeId % count; - log.info("\n=== BatchQueue Benchmark: {} ===\n" - + " Total types: {} (handlers registered)\n" - + " Hot types: {} (receiving traffic)\n" - + " Threads: {}\n" - + " Partitions: {}\n" - + " Producers: {} (simulating gRPC connections)\n" - + " Duration: {} ms\n" - + " Produced: {}\n" - + " Consumed: {}\n" - + " Produce rate: {} items/sec\n" - + " Consume rate: {} items/sec\n" - + " Drop rate: {}%\n", - label, totalTypes, hotTypes, threads, partitions, producerCount, - measureDuration, - String.format("%,d", produced), String.format("%,d", totalConsumed), - String.format("%,.0f", produced * 1000.0 / measureDuration), - String.format("%,.0f", totalConsumed * 1000.0 / measureDuration), - String.format("%.2f", produced > 0 - ? (produced - totalConsumed) * 100.0 / produced : 0)); + @Test + public void benchmark500Types_quarter() throws Exception { + runBenchmark("500-types-quarter", 500, 125, 50_000, + BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true); } - private long runProducersHot(final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue, - final int hotTypes, final int producerCount, - final long endTimeMs) throws InterruptedException { - final AtomicLong totalProduced = new AtomicLong(0); - final CountDownLatch done = new CountDownLatch(producerCount); + @Test + public void benchmark1000Types_quarter() throws Exception { + runBenchmark("1000-types-quarter", 1000, 250, 50_000, + BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true); + } - for (int p = 0; p < producerCount; p++) { - final int producerIndex = p; - final Thread thread = new Thread(() -> { - long count = 0; - int typeIndex = producerIndex; - while (System.currentTimeMillis() < endTimeMs) { - for (int batch = 0; batch < 100; batch++) { - final int type = typeIndex % hotTypes; - if (queue.produce(BenchmarkMetricTypes.FACTORIES[type].create(count))) { - count++; - } - typeIndex++; - } - } - totalProduced.addAndGet(count); - done.countDown(); - }); - thread.setName("gRPC-" + producerIndex); - thread.setDaemon(true); - thread.start(); - } + @Test + public void benchmark2000Types_quarter() throws Exception { + runBenchmark("2000-types-quarter", 2000, 500, 50_000, + BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true); + } - done.await(MEASURE_SECONDS + 10, TimeUnit.SECONDS); - return totalProduced.get(); + @Test + public void benchmark500TypesBlocking_quarter() throws Exception { + runBenchmark("500-types-blocking-quarter", 500, 125, 50_000, + BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true); } - private void runBenchmark(final String label, final int typeCount, - final ThreadPolicy threads, - final PartitionPolicy partitions, - final BufferStrategy strategy) throws Exception { - final AtomicLong consumed = new AtomicLong(0); + @Test + public void benchmark1000TypesBlocking_quarter() throws Exception { + runBenchmark("1000-types-blocking-quarter", 1000, 250, 50_000, + BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true); + } - final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue = BatchQueueManager.create( - "bench-" + label, - BatchQueueConfig.<BenchmarkMetricTypes.TypedMetric>builder() - .threads(threads) - .partitions(partitions) - .bufferSize(50_000) - .strategy(strategy) - .minIdleMs(1) - .maxIdleMs(50) - .build()); + @Test + public void benchmark2000TypesBlocking_quarter() throws Exception { + runBenchmark("2000-types-blocking-quarter", 2000, 500, 50_000, + BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true); + } - for (int t = 0; t < typeCount; t++) { - queue.addHandler(BenchmarkMetricTypes.CLASSES[t], - (HandlerConsumer<BenchmarkMetricTypes.TypedMetric>) data -> - consumed.addAndGet(data.size())); - } + @Test + public void benchmark500Types_half() throws Exception { + runBenchmark("500-types-half", 500, 250, 50_000, + BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true); + } - // Warmup - final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 1000L; - runProducers(queue, typeCount, warmupEnd); - Thread.sleep(200); - final long warmupConsumed = consumed.get(); - consumed.set(0); + @Test + public void benchmark1000Types_half() throws Exception { + runBenchmark("1000-types-half", 1000, 500, 50_000, + BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true); + } - // Measure - final long measureStart = System.currentTimeMillis(); - final long measureEnd = measureStart + MEASURE_SECONDS * 1000L; - final long produced = runProducers(queue, typeCount, measureEnd); - final long measureDuration = System.currentTimeMillis() - measureStart; + @Test + public void benchmark2000Types_half() throws Exception { + runBenchmark("2000-types-half", 2000, 1000, 50_000, + BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true); + } - Thread.sleep(500); - final long totalConsumed = consumed.get(); + @Test + public void benchmark500TypesBlocking_half() throws Exception { + runBenchmark("500-types-blocking-half", 500, 250, 50_000, + BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true); + } + + @Test + public void benchmark1000TypesBlocking_half() throws Exception { + runBenchmark("1000-types-blocking-half", 1000, 500, 50_000, + BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true); + } + + @Test + public void benchmark2000TypesBlocking_half() throws Exception { + runBenchmark("2000-types-blocking-half", 2000, 1000, 50_000, + BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true); + } + + @Test + public void benchmark500Types_adaptive() throws Exception { + runAdaptiveBenchmark("500-types-adaptive", 500, BufferStrategy.IF_POSSIBLE); + } + + @Test + public void benchmark1000Types_adaptive() throws Exception { + runAdaptiveBenchmark("1000-types-adaptive", 1000, BufferStrategy.IF_POSSIBLE); + } + + @Test + public void benchmark2000Types_adaptive() throws Exception { + runAdaptiveBenchmark("2000-types-adaptive", 2000, BufferStrategy.IF_POSSIBLE); + } + + @Test + public void benchmark500TypesBlocking_adaptive() throws Exception { + runAdaptiveBenchmark("500-types-blocking-adaptive", 500, BufferStrategy.BLOCKING); + } + + @Test + public void benchmark1000TypesBlocking_adaptive() throws Exception { + runAdaptiveBenchmark("1000-types-blocking-adaptive", 1000, BufferStrategy.BLOCKING); + } + + @Test + public void benchmark2000TypesBlocking_adaptive() throws Exception { + runAdaptiveBenchmark("2000-types-blocking-adaptive", 2000, BufferStrategy.BLOCKING); + } + + @Test + public void benchmark500Types_1to1() throws Exception { + runBenchmark("500-types-1to1", 500, 500, 50_000, + BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true); + } + + @Test + public void benchmark1000Types_1to1() throws Exception { + runBenchmark("1000-types-1to1", 1000, 1000, 50_000, + BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true); + } + + @Test + public void benchmark2000Types_1to1() throws Exception { + runBenchmark("2000-types-1to1", 2000, 2000, 50_000, + BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true); + } + + @Test + public void benchmark500TypesBlocking_1to1() throws Exception { + runBenchmark("500-types-blocking-1to1", 500, 500, 50_000, + BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true); + } + + @Test + public void benchmark1000TypesBlocking_1to1() throws Exception { + runBenchmark("1000-types-blocking-1to1", 1000, 1000, 50_000, + BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true); + } - printResults(label, typeCount, threads, partitions, - produced, totalConsumed, measureDuration, warmupConsumed); + @Test + public void benchmark2000TypesBlocking_1to1() throws Exception { + runBenchmark("2000-types-blocking-1to1", 2000, 2000, 50_000, + BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true); } - private void runDirectBenchmark(final String label, - final ThreadPolicy threads, - final PartitionPolicy partitions, - final BufferStrategy strategy) throws Exception { + private void runAdaptiveBenchmark(final String label, final int typeCount, + final BufferStrategy strategy) throws Exception { + // adaptive(): threshold = threadCount * 25 = 200 + // 500 types → 350p (200 + 300/2) + // 1000 types → 600p (200 + 800/2) + // 2000 types → 1100p (200 + 1800/2) + final int partitionCount = PartitionPolicy.adaptive() + .resolve(THREADS.resolve(), typeCount); + runBenchmark(label, typeCount, partitionCount, 50_000, + strategy, TYPE_ID_SELECTOR, true); + } + + private void runBenchmark(final String label, final int typeCount, + final int partitionCount, final int bufferSize, + final BufferStrategy strategy) throws Exception { + runBenchmark(label, typeCount, partitionCount, bufferSize, strategy, null, true); + } + + private void runBenchmark(final String label, final int typeCount, + final int partitionCount, final int bufferSize, + final BufferStrategy strategy, + final PartitionSelector<BenchmarkMetricTypes.TypedMetric> selector, + final boolean ignored) throws Exception { final AtomicLong consumed = new AtomicLong(0); + final PartitionPolicy partitions = PartitionPolicy.fixed(partitionCount); - final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue = BatchQueueManager.create( - "bench-" + label, + final BatchQueueConfig.BatchQueueConfigBuilder<BenchmarkMetricTypes.TypedMetric> configBuilder = BatchQueueConfig.<BenchmarkMetricTypes.TypedMetric>builder() - .threads(threads) + .threads(THREADS) .partitions(partitions) - .bufferSize(50_000) + .bufferSize(bufferSize) .strategy(strategy) - .consumer(data -> consumed.addAndGet(data.size())) .minIdleMs(1) - .maxIdleMs(50) - .build()); + .maxIdleMs(50); + if (selector != null) { + configBuilder.partitionSelector(selector); + } + + final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue = BatchQueueManager.create( + "bench-" + label, configBuilder.build()); + + for (int t = 0; t < typeCount; t++) { + queue.addHandler(BenchmarkMetricTypes.CLASSES[t], + (HandlerConsumer<BenchmarkMetricTypes.TypedMetric>) data -> + consumed.addAndGet(data.size())); + } // Warmup final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 1000L; - runProducers(queue, 1, warmupEnd); + runProducers(queue, typeCount, PRODUCER_THREADS, warmupEnd); Thread.sleep(200); - final long warmupConsumed = consumed.get(); consumed.set(0); // Measure final long measureStart = System.currentTimeMillis(); final long measureEnd = measureStart + MEASURE_SECONDS * 1000L; - final long produced = runProducers(queue, 1, measureEnd); + final long produced = runProducers(queue, typeCount, PRODUCER_THREADS, measureEnd); final long measureDuration = System.currentTimeMillis() - measureStart; Thread.sleep(500); final long totalConsumed = consumed.get(); - printResults(label, 1, threads, partitions, - produced, totalConsumed, measureDuration, warmupConsumed); + log.info("\n=== BatchQueue Benchmark: {} ===\n" + + " Types: {}\n" + + " Threads: {}\n" + + " Partitions: {}\n" + + " BufferSize: {}\n" + + " Strategy: {}\n" + + " Producers: {}\n" + + " Duration: {} ms\n" + + " Produced: {}\n" + + " Consumed: {}\n" + + " Consume rate: {} items/sec\n" + + " Drop rate: {}%\n", + label, typeCount, THREADS, partitions, bufferSize, strategy, PRODUCER_THREADS, + measureDuration, + String.format("%,d", produced), String.format("%,d", totalConsumed), + String.format("%,.0f", totalConsumed * 1000.0 / measureDuration), + String.format("%.2f", produced > 0 + ? (produced - totalConsumed) * 100.0 / produced : 0)); } private long runProducers(final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue, - final int typeCount, + final int typeCount, final int producerCount, final long endTimeMs) throws InterruptedException { final AtomicLong totalProduced = new AtomicLong(0); - final CountDownLatch done = new CountDownLatch(PRODUCER_THREADS); + final CountDownLatch done = new CountDownLatch(producerCount); - for (int p = 0; p < PRODUCER_THREADS; p++) { + for (int p = 0; p < producerCount; p++) { final int producerIndex = p; final Thread thread = new Thread(() -> { long count = 0; @@ -362,39 +439,11 @@ public class BatchQueueBenchmark { done.countDown(); }); thread.setName("Producer-" + producerIndex); + thread.setDaemon(true); thread.start(); } done.await(MEASURE_SECONDS + 10, TimeUnit.SECONDS); return totalProduced.get(); } - - private void printResults(final String label, final int typeCount, - final ThreadPolicy threads, - final PartitionPolicy partitions, - final long produced, final long consumed, - final long durationMs, final long warmupConsumed) { - final double producedPerSec = produced * 1000.0 / durationMs; - final double consumedPerSec = consumed * 1000.0 / durationMs; - final double dropRate = produced > 0 ? (produced - consumed) * 100.0 / produced : 0; - - log.info("\n=== BatchQueue Benchmark: {} ===\n" - + " Types: {}\n" - + " Threads: {}\n" - + " Partitions: {}\n" - + " Producers: {}\n" - + " Duration: {} ms\n" - + " Warmup consumed: {}\n" - + " Produced: {}\n" - + " Consumed: {}\n" - + " Produce rate: {} items/sec\n" - + " Consume rate: {} items/sec\n" - + " Drop rate: {}%\n", - label, typeCount, threads, partitions, PRODUCER_THREADS, - durationMs, String.format("%,d", warmupConsumed), - String.format("%,d", produced), String.format("%,d", consumed), - String.format("%,.0f", producedPerSec), - String.format("%,.0f", consumedPerSec), - String.format("%.2f", dropRate)); - } } diff --git a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java index 0717dae4fe..f74add79f9 100644 --- a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java +++ b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java @@ -104,7 +104,7 @@ public class BatchQueueConfigTest { .build(); assertEquals(10_000, config.getBufferSize()); assertEquals(BufferStrategy.BLOCKING, config.getStrategy()); - assertEquals(5, config.getMinIdleMs()); - assertEquals(200, config.getMaxIdleMs()); + assertEquals(1, config.getMinIdleMs()); + assertEquals(50, config.getMaxIdleMs()); } } diff --git a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java index 4f95544e40..a63bbdf400 100644 --- a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java +++ b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java @@ -127,41 +127,6 @@ public class BatchQueueTest { assertEquals(50, receivedB.size()); } - @Test - public void testHandlerMapWithManyTypes() { - // Simulate the 40-type scenario - final int typeCount = 40; - final int itemsPerType = 100; - final AtomicInteger totalReceived = new AtomicInteger(0); - - final BatchQueue<Object> queue = BatchQueueManager.create("many-types-test", - BatchQueueConfig.<Object>builder() - .threads(ThreadPolicy.fixed(2)) - .partitions(PartitionPolicy.threadMultiply(2)) - .bufferSize(10_000) - .build()); - - // Register one handler per "type" using MetricA as representative - // In real use, each type would be a different class. - // Here we use a direct consumer to simplify. - // We'll test the actual handler-map routing with fewer types above. - // This test validates throughput with many items. - final List<Object> received = new CopyOnWriteArrayList<>(); - queue.addHandler(MetricA.class, data -> { - totalReceived.addAndGet(data.size()); - received.addAll(data); - }); - - for (int i = 0; i < typeCount * itemsPerType; i++) { - queue.produce(new MetricA("item-" + i)); - } - - Awaitility.await().atMost(10, TimeUnit.SECONDS) - .until(() -> totalReceived.get() == typeCount * itemsPerType); - - assertEquals(typeCount * itemsPerType, totalReceived.get()); - } - // --- Partition assignment --- @Test @@ -233,6 +198,43 @@ public class BatchQueueTest { } } + @Test + public void testAdaptivePartitionGrowsWithHandlers() { + final BatchQueue<Object> queue = BatchQueueManager.create("adaptive-test", + BatchQueueConfig.<Object>builder() + .threads(ThreadPolicy.fixed(8)) + .partitions(PartitionPolicy.adaptive()) + .bufferSize(100) + .build()); + + // Initial: 8 partitions (threadCount) + assertEquals(8, queue.getPartitionCount()); + + // Register 500 handlers — threshold = 8*25 = 200, so 200 + 300/2 = 350 + for (int i = 0; i < 500; i++) { + queue.addHandler(BenchmarkMetricTypes.CLASSES[i], data -> { }); + } + + assertEquals(350, queue.getPartitionCount()); + } + + @Test + public void testAdaptiveBelowThresholdIs1to1() { + final BatchQueue<Object> queue = BatchQueueManager.create("adaptive-below", + BatchQueueConfig.<Object>builder() + .threads(ThreadPolicy.fixed(8)) + .partitions(PartitionPolicy.adaptive()) + .bufferSize(100) + .build()); + + // Register 100 handlers — below threshold (200), so 1:1 + for (int i = 0; i < 100; i++) { + queue.addHandler(BenchmarkMetricTypes.CLASSES[i], data -> { }); + } + + assertEquals(100, queue.getPartitionCount()); + } + // --- Shared scheduler --- @Test @@ -298,17 +300,13 @@ public class BatchQueueTest { .build()); // Produce enough to fill the buffer while consumer is blocked - // First batch will be drained and block the consumer. - // Wait for it to be picked up, then fill the buffer. Awaitility.await().atMost(2, TimeUnit.SECONDS) .pollInterval(10, TimeUnit.MILLISECONDS) .until(() -> { - // Keep producing — once consumer is blocked, buffer fills up queue.produce("x"); return !queue.produce("overflow"); }); - // At least one produce should have returned false (dropped) blockLatch.countDown(); } @@ -339,7 +337,6 @@ public class BatchQueueTest { .threads(ThreadPolicy.fixed(1)) .consumer(data -> { if (consumeCalls.getAndIncrement() == 0) { - // Block on first consume to let items accumulate try { blockLatch.await(2, TimeUnit.SECONDS); } catch (final InterruptedException e) { @@ -352,7 +349,6 @@ public class BatchQueueTest { .strategy(BufferStrategy.IF_POSSIBLE) .build()); - // Produce items int produced = 0; for (int i = 0; i < 500; i++) { if (queue.produce("item-" + i)) { @@ -360,12 +356,10 @@ public class BatchQueueTest { } } - // Unblock consumer and immediately shutdown — should drain remaining blockLatch.countDown(); - Thread.sleep(50); // Let the drain loop pick up some + Thread.sleep(50); queue.shutdown(); - // After shutdown, all produced items should be consumed assertEquals(produced, received.size()); } @@ -381,7 +375,6 @@ public class BatchQueueTest { .consumer(new HandlerConsumer<String>() { @Override public void consume(final List<String> data) { - // no-op } @Override @@ -392,7 +385,6 @@ public class BatchQueueTest { .bufferSize(100) .build()); - // Queue starts with no data, so idle should be called quickly Awaitility.await().atMost(3, TimeUnit.SECONDS) .until(() -> idleCalls.get() > 0); @@ -461,9 +453,6 @@ public class BatchQueueTest { @Test public void testAdaptiveBackoffIncreasesDelay() throws Exception { - // When queue is idle, consecutive idle cycles increase. - // We verify by checking that with no data, the queue - // doesn't spin rapidly — idle callbacks should slow down. final List<Long> idleTimestamps = new CopyOnWriteArrayList<>(); BatchQueueManager.create("backoff-test", @@ -484,11 +473,9 @@ public class BatchQueueTest { .bufferSize(100) .build()); - // Wait for several idle cycles Awaitility.await().atMost(10, TimeUnit.SECONDS) .until(() -> idleTimestamps.size() >= 8); - // Verify that intervals increase (later gaps should be larger than early gaps) final long earlyGap = idleTimestamps.get(1) - idleTimestamps.get(0); final long laterGap = idleTimestamps.get(idleTimestamps.size() - 1) - idleTimestamps.get(idleTimestamps.size() - 2); @@ -521,21 +508,17 @@ public class BatchQueueTest { .bufferSize(100) .build()); - // Let backoff build up Awaitility.await().atMost(5, TimeUnit.SECONDS) .until(() -> idleTimestamps.size() >= 5); - // Now send data — this should reset the backoff final int beforeIdles = idleTimestamps.size(); for (int i = 0; i < 10; i++) { queue.produce("reset-" + i); } - // Wait for the data to be consumed Awaitility.await().atMost(5, TimeUnit.SECONDS) .until(() -> consumeCount.get() >= 10); - // After data is consumed, next idle should come quickly (backoff reset) final long postDataTime = System.currentTimeMillis(); Awaitility.await().atMost(3, TimeUnit.SECONDS) .until(() -> { @@ -548,26 +531,4 @@ public class BatchQueueTest { }); } - // --- Round robin produce --- - - @Test - public void testRoundRobinDistributesAcrossPartitions() { - final List<String> received = new CopyOnWriteArrayList<>(); - final BatchQueue<String> queue = BatchQueueManager.create("roundrobin-test", - BatchQueueConfig.<String>builder() - .threads(ThreadPolicy.fixed(2)) - .partitions(PartitionPolicy.fixed(4)) - .consumer(data -> received.addAll(data)) - .bufferSize(1000) - .build()); - - for (int i = 0; i < 400; i++) { - queue.produce("item-" + i); - } - - Awaitility.await().atMost(5, TimeUnit.SECONDS) - .until(() -> received.size() == 400); - - assertEquals(400, received.size()); - } } diff --git a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java index b055c36f0a..910c5b0ccf 100644 --- a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java +++ b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java @@ -18,9 +18,22 @@ package org.apache.skywalking.oap.server.library.batchqueue; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; + /** - * 200 distinct metric subclasses for benchmark handler-map dispatch testing. - * Each class represents a distinct metric type that gets routed to its own handler. + * 2000 distinct metric subclasses generated at class-load time for benchmark + * handler-map dispatch testing. Each class represents a distinct metric type + * that gets routed to its own handler via {@code data.getClass().hashCode()}. + * + * <p>Classes are generated as bytecode at runtime using + * {@link MethodHandles.Lookup#defineClass(byte[])} to avoid a 6000+ line + * source file. Each generated class {@code Dyn0..Dyn1999} extends + * {@link TypedMetric} with a constructor that calls {@code super(typeId, v)}. */ @SuppressWarnings("all") class BenchmarkMetricTypes { @@ -40,610 +53,151 @@ class BenchmarkMetricTypes { TypedMetric create(long value); } - static final int MAX_TYPES = 200; + static final int MAX_TYPES = 2000; static final Class<? extends TypedMetric>[] CLASSES = new Class[MAX_TYPES]; static final MetricFactory[] FACTORIES = new MetricFactory[MAX_TYPES]; + private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup(); + private static final String SUPER_INTERNAL = + "org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes$TypedMetric"; + static { - CLASSES[0] = M0.class; FACTORIES[0] = v -> new M0(v); - CLASSES[1] = M1.class; FACTORIES[1] = v -> new M1(v); - CLASSES[2] = M2.class; FACTORIES[2] = v -> new M2(v); - CLASSES[3] = M3.class; FACTORIES[3] = v -> new M3(v); - CLASSES[4] = M4.class; FACTORIES[4] = v -> new M4(v); - CLASSES[5] = M5.class; FACTORIES[5] = v -> new M5(v); - CLASSES[6] = M6.class; FACTORIES[6] = v -> new M6(v); - CLASSES[7] = M7.class; FACTORIES[7] = v -> new M7(v); - CLASSES[8] = M8.class; FACTORIES[8] = v -> new M8(v); - CLASSES[9] = M9.class; FACTORIES[9] = v -> new M9(v); - CLASSES[10] = M10.class; FACTORIES[10] = v -> new M10(v); - CLASSES[11] = M11.class; FACTORIES[11] = v -> new M11(v); - CLASSES[12] = M12.class; FACTORIES[12] = v -> new M12(v); - CLASSES[13] = M13.class; FACTORIES[13] = v -> new M13(v); - CLASSES[14] = M14.class; FACTORIES[14] = v -> new M14(v); - CLASSES[15] = M15.class; FACTORIES[15] = v -> new M15(v); - CLASSES[16] = M16.class; FACTORIES[16] = v -> new M16(v); - CLASSES[17] = M17.class; FACTORIES[17] = v -> new M17(v); - CLASSES[18] = M18.class; FACTORIES[18] = v -> new M18(v); - CLASSES[19] = M19.class; FACTORIES[19] = v -> new M19(v); - CLASSES[20] = M20.class; FACTORIES[20] = v -> new M20(v); - CLASSES[21] = M21.class; FACTORIES[21] = v -> new M21(v); - CLASSES[22] = M22.class; FACTORIES[22] = v -> new M22(v); - CLASSES[23] = M23.class; FACTORIES[23] = v -> new M23(v); - CLASSES[24] = M24.class; FACTORIES[24] = v -> new M24(v); - CLASSES[25] = M25.class; FACTORIES[25] = v -> new M25(v); - CLASSES[26] = M26.class; FACTORIES[26] = v -> new M26(v); - CLASSES[27] = M27.class; FACTORIES[27] = v -> new M27(v); - CLASSES[28] = M28.class; FACTORIES[28] = v -> new M28(v); - CLASSES[29] = M29.class; FACTORIES[29] = v -> new M29(v); - CLASSES[30] = M30.class; FACTORIES[30] = v -> new M30(v); - CLASSES[31] = M31.class; FACTORIES[31] = v -> new M31(v); - CLASSES[32] = M32.class; FACTORIES[32] = v -> new M32(v); - CLASSES[33] = M33.class; FACTORIES[33] = v -> new M33(v); - CLASSES[34] = M34.class; FACTORIES[34] = v -> new M34(v); - CLASSES[35] = M35.class; FACTORIES[35] = v -> new M35(v); - CLASSES[36] = M36.class; FACTORIES[36] = v -> new M36(v); - CLASSES[37] = M37.class; FACTORIES[37] = v -> new M37(v); - CLASSES[38] = M38.class; FACTORIES[38] = v -> new M38(v); - CLASSES[39] = M39.class; FACTORIES[39] = v -> new M39(v); - CLASSES[40] = M40.class; FACTORIES[40] = v -> new M40(v); - CLASSES[41] = M41.class; FACTORIES[41] = v -> new M41(v); - CLASSES[42] = M42.class; FACTORIES[42] = v -> new M42(v); - CLASSES[43] = M43.class; FACTORIES[43] = v -> new M43(v); - CLASSES[44] = M44.class; FACTORIES[44] = v -> new M44(v); - CLASSES[45] = M45.class; FACTORIES[45] = v -> new M45(v); - CLASSES[46] = M46.class; FACTORIES[46] = v -> new M46(v); - CLASSES[47] = M47.class; FACTORIES[47] = v -> new M47(v); - CLASSES[48] = M48.class; FACTORIES[48] = v -> new M48(v); - CLASSES[49] = M49.class; FACTORIES[49] = v -> new M49(v); - CLASSES[50] = M50.class; FACTORIES[50] = v -> new M50(v); - CLASSES[51] = M51.class; FACTORIES[51] = v -> new M51(v); - CLASSES[52] = M52.class; FACTORIES[52] = v -> new M52(v); - CLASSES[53] = M53.class; FACTORIES[53] = v -> new M53(v); - CLASSES[54] = M54.class; FACTORIES[54] = v -> new M54(v); - CLASSES[55] = M55.class; FACTORIES[55] = v -> new M55(v); - CLASSES[56] = M56.class; FACTORIES[56] = v -> new M56(v); - CLASSES[57] = M57.class; FACTORIES[57] = v -> new M57(v); - CLASSES[58] = M58.class; FACTORIES[58] = v -> new M58(v); - CLASSES[59] = M59.class; FACTORIES[59] = v -> new M59(v); - CLASSES[60] = M60.class; FACTORIES[60] = v -> new M60(v); - CLASSES[61] = M61.class; FACTORIES[61] = v -> new M61(v); - CLASSES[62] = M62.class; FACTORIES[62] = v -> new M62(v); - CLASSES[63] = M63.class; FACTORIES[63] = v -> new M63(v); - CLASSES[64] = M64.class; FACTORIES[64] = v -> new M64(v); - CLASSES[65] = M65.class; FACTORIES[65] = v -> new M65(v); - CLASSES[66] = M66.class; FACTORIES[66] = v -> new M66(v); - CLASSES[67] = M67.class; FACTORIES[67] = v -> new M67(v); - CLASSES[68] = M68.class; FACTORIES[68] = v -> new M68(v); - CLASSES[69] = M69.class; FACTORIES[69] = v -> new M69(v); - CLASSES[70] = M70.class; FACTORIES[70] = v -> new M70(v); - CLASSES[71] = M71.class; FACTORIES[71] = v -> new M71(v); - CLASSES[72] = M72.class; FACTORIES[72] = v -> new M72(v); - CLASSES[73] = M73.class; FACTORIES[73] = v -> new M73(v); - CLASSES[74] = M74.class; FACTORIES[74] = v -> new M74(v); - CLASSES[75] = M75.class; FACTORIES[75] = v -> new M75(v); - CLASSES[76] = M76.class; FACTORIES[76] = v -> new M76(v); - CLASSES[77] = M77.class; FACTORIES[77] = v -> new M77(v); - CLASSES[78] = M78.class; FACTORIES[78] = v -> new M78(v); - CLASSES[79] = M79.class; FACTORIES[79] = v -> new M79(v); - CLASSES[80] = M80.class; FACTORIES[80] = v -> new M80(v); - CLASSES[81] = M81.class; FACTORIES[81] = v -> new M81(v); - CLASSES[82] = M82.class; FACTORIES[82] = v -> new M82(v); - CLASSES[83] = M83.class; FACTORIES[83] = v -> new M83(v); - CLASSES[84] = M84.class; FACTORIES[84] = v -> new M84(v); - CLASSES[85] = M85.class; FACTORIES[85] = v -> new M85(v); - CLASSES[86] = M86.class; FACTORIES[86] = v -> new M86(v); - CLASSES[87] = M87.class; FACTORIES[87] = v -> new M87(v); - CLASSES[88] = M88.class; FACTORIES[88] = v -> new M88(v); - CLASSES[89] = M89.class; FACTORIES[89] = v -> new M89(v); - CLASSES[90] = M90.class; FACTORIES[90] = v -> new M90(v); - CLASSES[91] = M91.class; FACTORIES[91] = v -> new M91(v); - CLASSES[92] = M92.class; FACTORIES[92] = v -> new M92(v); - CLASSES[93] = M93.class; FACTORIES[93] = v -> new M93(v); - CLASSES[94] = M94.class; FACTORIES[94] = v -> new M94(v); - CLASSES[95] = M95.class; FACTORIES[95] = v -> new M95(v); - CLASSES[96] = M96.class; FACTORIES[96] = v -> new M96(v); - CLASSES[97] = M97.class; FACTORIES[97] = v -> new M97(v); - CLASSES[98] = M98.class; FACTORIES[98] = v -> new M98(v); - CLASSES[99] = M99.class; FACTORIES[99] = v -> new M99(v); - CLASSES[100] = M100.class; FACTORIES[100] = v -> new M100(v); - CLASSES[101] = M101.class; FACTORIES[101] = v -> new M101(v); - CLASSES[102] = M102.class; FACTORIES[102] = v -> new M102(v); - CLASSES[103] = M103.class; FACTORIES[103] = v -> new M103(v); - CLASSES[104] = M104.class; FACTORIES[104] = v -> new M104(v); - CLASSES[105] = M105.class; FACTORIES[105] = v -> new M105(v); - CLASSES[106] = M106.class; FACTORIES[106] = v -> new M106(v); - CLASSES[107] = M107.class; FACTORIES[107] = v -> new M107(v); - CLASSES[108] = M108.class; FACTORIES[108] = v -> new M108(v); - CLASSES[109] = M109.class; FACTORIES[109] = v -> new M109(v); - CLASSES[110] = M110.class; FACTORIES[110] = v -> new M110(v); - CLASSES[111] = M111.class; FACTORIES[111] = v -> new M111(v); - CLASSES[112] = M112.class; FACTORIES[112] = v -> new M112(v); - CLASSES[113] = M113.class; FACTORIES[113] = v -> new M113(v); - CLASSES[114] = M114.class; FACTORIES[114] = v -> new M114(v); - CLASSES[115] = M115.class; FACTORIES[115] = v -> new M115(v); - CLASSES[116] = M116.class; FACTORIES[116] = v -> new M116(v); - CLASSES[117] = M117.class; FACTORIES[117] = v -> new M117(v); - CLASSES[118] = M118.class; FACTORIES[118] = v -> new M118(v); - CLASSES[119] = M119.class; FACTORIES[119] = v -> new M119(v); - CLASSES[120] = M120.class; FACTORIES[120] = v -> new M120(v); - CLASSES[121] = M121.class; FACTORIES[121] = v -> new M121(v); - CLASSES[122] = M122.class; FACTORIES[122] = v -> new M122(v); - CLASSES[123] = M123.class; FACTORIES[123] = v -> new M123(v); - CLASSES[124] = M124.class; FACTORIES[124] = v -> new M124(v); - CLASSES[125] = M125.class; FACTORIES[125] = v -> new M125(v); - CLASSES[126] = M126.class; FACTORIES[126] = v -> new M126(v); - CLASSES[127] = M127.class; FACTORIES[127] = v -> new M127(v); - CLASSES[128] = M128.class; FACTORIES[128] = v -> new M128(v); - CLASSES[129] = M129.class; FACTORIES[129] = v -> new M129(v); - CLASSES[130] = M130.class; FACTORIES[130] = v -> new M130(v); - CLASSES[131] = M131.class; FACTORIES[131] = v -> new M131(v); - CLASSES[132] = M132.class; FACTORIES[132] = v -> new M132(v); - CLASSES[133] = M133.class; FACTORIES[133] = v -> new M133(v); - CLASSES[134] = M134.class; FACTORIES[134] = v -> new M134(v); - CLASSES[135] = M135.class; FACTORIES[135] = v -> new M135(v); - CLASSES[136] = M136.class; FACTORIES[136] = v -> new M136(v); - CLASSES[137] = M137.class; FACTORIES[137] = v -> new M137(v); - CLASSES[138] = M138.class; FACTORIES[138] = v -> new M138(v); - CLASSES[139] = M139.class; FACTORIES[139] = v -> new M139(v); - CLASSES[140] = M140.class; FACTORIES[140] = v -> new M140(v); - CLASSES[141] = M141.class; FACTORIES[141] = v -> new M141(v); - CLASSES[142] = M142.class; FACTORIES[142] = v -> new M142(v); - CLASSES[143] = M143.class; FACTORIES[143] = v -> new M143(v); - CLASSES[144] = M144.class; FACTORIES[144] = v -> new M144(v); - CLASSES[145] = M145.class; FACTORIES[145] = v -> new M145(v); - CLASSES[146] = M146.class; FACTORIES[146] = v -> new M146(v); - CLASSES[147] = M147.class; FACTORIES[147] = v -> new M147(v); - CLASSES[148] = M148.class; FACTORIES[148] = v -> new M148(v); - CLASSES[149] = M149.class; FACTORIES[149] = v -> new M149(v); - CLASSES[150] = M150.class; FACTORIES[150] = v -> new M150(v); - CLASSES[151] = M151.class; FACTORIES[151] = v -> new M151(v); - CLASSES[152] = M152.class; FACTORIES[152] = v -> new M152(v); - CLASSES[153] = M153.class; FACTORIES[153] = v -> new M153(v); - CLASSES[154] = M154.class; FACTORIES[154] = v -> new M154(v); - CLASSES[155] = M155.class; FACTORIES[155] = v -> new M155(v); - CLASSES[156] = M156.class; FACTORIES[156] = v -> new M156(v); - CLASSES[157] = M157.class; FACTORIES[157] = v -> new M157(v); - CLASSES[158] = M158.class; FACTORIES[158] = v -> new M158(v); - CLASSES[159] = M159.class; FACTORIES[159] = v -> new M159(v); - CLASSES[160] = M160.class; FACTORIES[160] = v -> new M160(v); - CLASSES[161] = M161.class; FACTORIES[161] = v -> new M161(v); - CLASSES[162] = M162.class; FACTORIES[162] = v -> new M162(v); - CLASSES[163] = M163.class; FACTORIES[163] = v -> new M163(v); - CLASSES[164] = M164.class; FACTORIES[164] = v -> new M164(v); - CLASSES[165] = M165.class; FACTORIES[165] = v -> new M165(v); - CLASSES[166] = M166.class; FACTORIES[166] = v -> new M166(v); - CLASSES[167] = M167.class; FACTORIES[167] = v -> new M167(v); - CLASSES[168] = M168.class; FACTORIES[168] = v -> new M168(v); - CLASSES[169] = M169.class; FACTORIES[169] = v -> new M169(v); - CLASSES[170] = M170.class; FACTORIES[170] = v -> new M170(v); - CLASSES[171] = M171.class; FACTORIES[171] = v -> new M171(v); - CLASSES[172] = M172.class; FACTORIES[172] = v -> new M172(v); - CLASSES[173] = M173.class; FACTORIES[173] = v -> new M173(v); - CLASSES[174] = M174.class; FACTORIES[174] = v -> new M174(v); - CLASSES[175] = M175.class; FACTORIES[175] = v -> new M175(v); - CLASSES[176] = M176.class; FACTORIES[176] = v -> new M176(v); - CLASSES[177] = M177.class; FACTORIES[177] = v -> new M177(v); - CLASSES[178] = M178.class; FACTORIES[178] = v -> new M178(v); - CLASSES[179] = M179.class; FACTORIES[179] = v -> new M179(v); - CLASSES[180] = M180.class; FACTORIES[180] = v -> new M180(v); - CLASSES[181] = M181.class; FACTORIES[181] = v -> new M181(v); - CLASSES[182] = M182.class; FACTORIES[182] = v -> new M182(v); - CLASSES[183] = M183.class; FACTORIES[183] = v -> new M183(v); - CLASSES[184] = M184.class; FACTORIES[184] = v -> new M184(v); - CLASSES[185] = M185.class; FACTORIES[185] = v -> new M185(v); - CLASSES[186] = M186.class; FACTORIES[186] = v -> new M186(v); - CLASSES[187] = M187.class; FACTORIES[187] = v -> new M187(v); - CLASSES[188] = M188.class; FACTORIES[188] = v -> new M188(v); - CLASSES[189] = M189.class; FACTORIES[189] = v -> new M189(v); - CLASSES[190] = M190.class; FACTORIES[190] = v -> new M190(v); - CLASSES[191] = M191.class; FACTORIES[191] = v -> new M191(v); - CLASSES[192] = M192.class; FACTORIES[192] = v -> new M192(v); - CLASSES[193] = M193.class; FACTORIES[193] = v -> new M193(v); - CLASSES[194] = M194.class; FACTORIES[194] = v -> new M194(v); - CLASSES[195] = M195.class; FACTORIES[195] = v -> new M195(v); - CLASSES[196] = M196.class; FACTORIES[196] = v -> new M196(v); - CLASSES[197] = M197.class; FACTORIES[197] = v -> new M197(v); - CLASSES[198] = M198.class; FACTORIES[198] = v -> new M198(v); - CLASSES[199] = M199.class; FACTORIES[199] = v -> new M199(v); + try { + for (int i = 0; i < MAX_TYPES; i++) { + final String name = "org/apache/skywalking/oap/server/library/batchqueue/Dyn" + i; + final byte[] bytes = buildClassBytes(name, SUPER_INTERNAL, i); + final Class<? extends TypedMetric> cls = + (Class<? extends TypedMetric>) LOOKUP.defineClass(bytes); + CLASSES[i] = cls; + final MethodHandle mh = LOOKUP.findConstructor( + cls, MethodType.methodType(void.class, long.class)); + FACTORIES[i] = v -> { + try { + return (TypedMetric) mh.invoke(v); + } catch (final Throwable e) { + throw new RuntimeException(e); + } + }; + } + } catch (final Exception e) { + throw new ExceptionInInitializerError(e); + } } - static class M0 extends TypedMetric { M0(final long v) { super(0, v); } } - - static class M1 extends TypedMetric { M1(final long v) { super(1, v); } } - - static class M2 extends TypedMetric { M2(final long v) { super(2, v); } } - - static class M3 extends TypedMetric { M3(final long v) { super(3, v); } } - - static class M4 extends TypedMetric { M4(final long v) { super(4, v); } } - - static class M5 extends TypedMetric { M5(final long v) { super(5, v); } } - - static class M6 extends TypedMetric { M6(final long v) { super(6, v); } } - - static class M7 extends TypedMetric { M7(final long v) { super(7, v); } } - - static class M8 extends TypedMetric { M8(final long v) { super(8, v); } } - - static class M9 extends TypedMetric { M9(final long v) { super(9, v); } } - - static class M10 extends TypedMetric { M10(final long v) { super(10, v); } } - - static class M11 extends TypedMetric { M11(final long v) { super(11, v); } } - - static class M12 extends TypedMetric { M12(final long v) { super(12, v); } } - - static class M13 extends TypedMetric { M13(final long v) { super(13, v); } } - - static class M14 extends TypedMetric { M14(final long v) { super(14, v); } } - - static class M15 extends TypedMetric { M15(final long v) { super(15, v); } } - - static class M16 extends TypedMetric { M16(final long v) { super(16, v); } } - - static class M17 extends TypedMetric { M17(final long v) { super(17, v); } } - - static class M18 extends TypedMetric { M18(final long v) { super(18, v); } } - - static class M19 extends TypedMetric { M19(final long v) { super(19, v); } } - - static class M20 extends TypedMetric { M20(final long v) { super(20, v); } } - - static class M21 extends TypedMetric { M21(final long v) { super(21, v); } } - - static class M22 extends TypedMetric { M22(final long v) { super(22, v); } } - - static class M23 extends TypedMetric { M23(final long v) { super(23, v); } } - - static class M24 extends TypedMetric { M24(final long v) { super(24, v); } } - - static class M25 extends TypedMetric { M25(final long v) { super(25, v); } } - - static class M26 extends TypedMetric { M26(final long v) { super(26, v); } } - - static class M27 extends TypedMetric { M27(final long v) { super(27, v); } } - - static class M28 extends TypedMetric { M28(final long v) { super(28, v); } } - - static class M29 extends TypedMetric { M29(final long v) { super(29, v); } } - - static class M30 extends TypedMetric { M30(final long v) { super(30, v); } } - - static class M31 extends TypedMetric { M31(final long v) { super(31, v); } } - - static class M32 extends TypedMetric { M32(final long v) { super(32, v); } } - - static class M33 extends TypedMetric { M33(final long v) { super(33, v); } } - - static class M34 extends TypedMetric { M34(final long v) { super(34, v); } } - - static class M35 extends TypedMetric { M35(final long v) { super(35, v); } } - - static class M36 extends TypedMetric { M36(final long v) { super(36, v); } } - - static class M37 extends TypedMetric { M37(final long v) { super(37, v); } } - - static class M38 extends TypedMetric { M38(final long v) { super(38, v); } } - - static class M39 extends TypedMetric { M39(final long v) { super(39, v); } } - - static class M40 extends TypedMetric { M40(final long v) { super(40, v); } } - - static class M41 extends TypedMetric { M41(final long v) { super(41, v); } } - - static class M42 extends TypedMetric { M42(final long v) { super(42, v); } } - - static class M43 extends TypedMetric { M43(final long v) { super(43, v); } } - - static class M44 extends TypedMetric { M44(final long v) { super(44, v); } } - - static class M45 extends TypedMetric { M45(final long v) { super(45, v); } } - - static class M46 extends TypedMetric { M46(final long v) { super(46, v); } } - - static class M47 extends TypedMetric { M47(final long v) { super(47, v); } } - - static class M48 extends TypedMetric { M48(final long v) { super(48, v); } } - - static class M49 extends TypedMetric { M49(final long v) { super(49, v); } } - - static class M50 extends TypedMetric { M50(final long v) { super(50, v); } } - - static class M51 extends TypedMetric { M51(final long v) { super(51, v); } } - - static class M52 extends TypedMetric { M52(final long v) { super(52, v); } } - - static class M53 extends TypedMetric { M53(final long v) { super(53, v); } } - - static class M54 extends TypedMetric { M54(final long v) { super(54, v); } } - - static class M55 extends TypedMetric { M55(final long v) { super(55, v); } } - - static class M56 extends TypedMetric { M56(final long v) { super(56, v); } } - - static class M57 extends TypedMetric { M57(final long v) { super(57, v); } } - - static class M58 extends TypedMetric { M58(final long v) { super(58, v); } } - - static class M59 extends TypedMetric { M59(final long v) { super(59, v); } } - - static class M60 extends TypedMetric { M60(final long v) { super(60, v); } } - - static class M61 extends TypedMetric { M61(final long v) { super(61, v); } } - - static class M62 extends TypedMetric { M62(final long v) { super(62, v); } } - - static class M63 extends TypedMetric { M63(final long v) { super(63, v); } } - - static class M64 extends TypedMetric { M64(final long v) { super(64, v); } } - - static class M65 extends TypedMetric { M65(final long v) { super(65, v); } } - - static class M66 extends TypedMetric { M66(final long v) { super(66, v); } } - - static class M67 extends TypedMetric { M67(final long v) { super(67, v); } } - - static class M68 extends TypedMetric { M68(final long v) { super(68, v); } } - - static class M69 extends TypedMetric { M69(final long v) { super(69, v); } } - - static class M70 extends TypedMetric { M70(final long v) { super(70, v); } } - - static class M71 extends TypedMetric { M71(final long v) { super(71, v); } } - - static class M72 extends TypedMetric { M72(final long v) { super(72, v); } } - - static class M73 extends TypedMetric { M73(final long v) { super(73, v); } } - - static class M74 extends TypedMetric { M74(final long v) { super(74, v); } } - - static class M75 extends TypedMetric { M75(final long v) { super(75, v); } } - - static class M76 extends TypedMetric { M76(final long v) { super(76, v); } } - - static class M77 extends TypedMetric { M77(final long v) { super(77, v); } } - - static class M78 extends TypedMetric { M78(final long v) { super(78, v); } } - - static class M79 extends TypedMetric { M79(final long v) { super(79, v); } } - - static class M80 extends TypedMetric { M80(final long v) { super(80, v); } } - - static class M81 extends TypedMetric { M81(final long v) { super(81, v); } } - - static class M82 extends TypedMetric { M82(final long v) { super(82, v); } } - - static class M83 extends TypedMetric { M83(final long v) { super(83, v); } } - - static class M84 extends TypedMetric { M84(final long v) { super(84, v); } } - - static class M85 extends TypedMetric { M85(final long v) { super(85, v); } } - - static class M86 extends TypedMetric { M86(final long v) { super(86, v); } } - - static class M87 extends TypedMetric { M87(final long v) { super(87, v); } } - - static class M88 extends TypedMetric { M88(final long v) { super(88, v); } } - - static class M89 extends TypedMetric { M89(final long v) { super(89, v); } } - - static class M90 extends TypedMetric { M90(final long v) { super(90, v); } } - - static class M91 extends TypedMetric { M91(final long v) { super(91, v); } } - - static class M92 extends TypedMetric { M92(final long v) { super(92, v); } } - - static class M93 extends TypedMetric { M93(final long v) { super(93, v); } } - - static class M94 extends TypedMetric { M94(final long v) { super(94, v); } } - - static class M95 extends TypedMetric { M95(final long v) { super(95, v); } } - - static class M96 extends TypedMetric { M96(final long v) { super(96, v); } } - - static class M97 extends TypedMetric { M97(final long v) { super(97, v); } } - - static class M98 extends TypedMetric { M98(final long v) { super(98, v); } } - - static class M99 extends TypedMetric { M99(final long v) { super(99, v); } } - - static class M100 extends TypedMetric { M100(final long v) { super(100, v); } } - - static class M101 extends TypedMetric { M101(final long v) { super(101, v); } } - - static class M102 extends TypedMetric { M102(final long v) { super(102, v); } } - - static class M103 extends TypedMetric { M103(final long v) { super(103, v); } } - - static class M104 extends TypedMetric { M104(final long v) { super(104, v); } } - - static class M105 extends TypedMetric { M105(final long v) { super(105, v); } } - - static class M106 extends TypedMetric { M106(final long v) { super(106, v); } } - - static class M107 extends TypedMetric { M107(final long v) { super(107, v); } } - - static class M108 extends TypedMetric { M108(final long v) { super(108, v); } } - - static class M109 extends TypedMetric { M109(final long v) { super(109, v); } } - - static class M110 extends TypedMetric { M110(final long v) { super(110, v); } } - - static class M111 extends TypedMetric { M111(final long v) { super(111, v); } } - - static class M112 extends TypedMetric { M112(final long v) { super(112, v); } } - - static class M113 extends TypedMetric { M113(final long v) { super(113, v); } } - - static class M114 extends TypedMetric { M114(final long v) { super(114, v); } } - - static class M115 extends TypedMetric { M115(final long v) { super(115, v); } } - - static class M116 extends TypedMetric { M116(final long v) { super(116, v); } } - - static class M117 extends TypedMetric { M117(final long v) { super(117, v); } } - - static class M118 extends TypedMetric { M118(final long v) { super(118, v); } } - - static class M119 extends TypedMetric { M119(final long v) { super(119, v); } } - - static class M120 extends TypedMetric { M120(final long v) { super(120, v); } } - - static class M121 extends TypedMetric { M121(final long v) { super(121, v); } } - - static class M122 extends TypedMetric { M122(final long v) { super(122, v); } } - - static class M123 extends TypedMetric { M123(final long v) { super(123, v); } } - - static class M124 extends TypedMetric { M124(final long v) { super(124, v); } } - - static class M125 extends TypedMetric { M125(final long v) { super(125, v); } } - - static class M126 extends TypedMetric { M126(final long v) { super(126, v); } } - - static class M127 extends TypedMetric { M127(final long v) { super(127, v); } } - - static class M128 extends TypedMetric { M128(final long v) { super(128, v); } } - - static class M129 extends TypedMetric { M129(final long v) { super(129, v); } } - - static class M130 extends TypedMetric { M130(final long v) { super(130, v); } } - - static class M131 extends TypedMetric { M131(final long v) { super(131, v); } } - - static class M132 extends TypedMetric { M132(final long v) { super(132, v); } } - - static class M133 extends TypedMetric { M133(final long v) { super(133, v); } } - - static class M134 extends TypedMetric { M134(final long v) { super(134, v); } } - - static class M135 extends TypedMetric { M135(final long v) { super(135, v); } } - - static class M136 extends TypedMetric { M136(final long v) { super(136, v); } } - - static class M137 extends TypedMetric { M137(final long v) { super(137, v); } } - - static class M138 extends TypedMetric { M138(final long v) { super(138, v); } } - - static class M139 extends TypedMetric { M139(final long v) { super(139, v); } } - - static class M140 extends TypedMetric { M140(final long v) { super(140, v); } } - - static class M141 extends TypedMetric { M141(final long v) { super(141, v); } } - - static class M142 extends TypedMetric { M142(final long v) { super(142, v); } } - - static class M143 extends TypedMetric { M143(final long v) { super(143, v); } } - - static class M144 extends TypedMetric { M144(final long v) { super(144, v); } } - - static class M145 extends TypedMetric { M145(final long v) { super(145, v); } } - - static class M146 extends TypedMetric { M146(final long v) { super(146, v); } } - - static class M147 extends TypedMetric { M147(final long v) { super(147, v); } } - - static class M148 extends TypedMetric { M148(final long v) { super(148, v); } } - - static class M149 extends TypedMetric { M149(final long v) { super(149, v); } } - - static class M150 extends TypedMetric { M150(final long v) { super(150, v); } } - - static class M151 extends TypedMetric { M151(final long v) { super(151, v); } } - - static class M152 extends TypedMetric { M152(final long v) { super(152, v); } } - - static class M153 extends TypedMetric { M153(final long v) { super(153, v); } } - - static class M154 extends TypedMetric { M154(final long v) { super(154, v); } } - - static class M155 extends TypedMetric { M155(final long v) { super(155, v); } } - - static class M156 extends TypedMetric { M156(final long v) { super(156, v); } } - - static class M157 extends TypedMetric { M157(final long v) { super(157, v); } } - - static class M158 extends TypedMetric { M158(final long v) { super(158, v); } } - - static class M159 extends TypedMetric { M159(final long v) { super(159, v); } } - - static class M160 extends TypedMetric { M160(final long v) { super(160, v); } } - - static class M161 extends TypedMetric { M161(final long v) { super(161, v); } } - - static class M162 extends TypedMetric { M162(final long v) { super(162, v); } } - - static class M163 extends TypedMetric { M163(final long v) { super(163, v); } } - - static class M164 extends TypedMetric { M164(final long v) { super(164, v); } } - - static class M165 extends TypedMetric { M165(final long v) { super(165, v); } } - - static class M166 extends TypedMetric { M166(final long v) { super(166, v); } } - - static class M167 extends TypedMetric { M167(final long v) { super(167, v); } } - - static class M168 extends TypedMetric { M168(final long v) { super(168, v); } } - - static class M169 extends TypedMetric { M169(final long v) { super(169, v); } } - - static class M170 extends TypedMetric { M170(final long v) { super(170, v); } } - - static class M171 extends TypedMetric { M171(final long v) { super(171, v); } } - - static class M172 extends TypedMetric { M172(final long v) { super(172, v); } } - - static class M173 extends TypedMetric { M173(final long v) { super(173, v); } } - - static class M174 extends TypedMetric { M174(final long v) { super(174, v); } } - - static class M175 extends TypedMetric { M175(final long v) { super(175, v); } } - - static class M176 extends TypedMetric { M176(final long v) { super(176, v); } } - - static class M177 extends TypedMetric { M177(final long v) { super(177, v); } } - - static class M178 extends TypedMetric { M178(final long v) { super(178, v); } } - - static class M179 extends TypedMetric { M179(final long v) { super(179, v); } } - - static class M180 extends TypedMetric { M180(final long v) { super(180, v); } } - - static class M181 extends TypedMetric { M181(final long v) { super(181, v); } } - - static class M182 extends TypedMetric { M182(final long v) { super(182, v); } } - - static class M183 extends TypedMetric { M183(final long v) { super(183, v); } } - - static class M184 extends TypedMetric { M184(final long v) { super(184, v); } } - - static class M185 extends TypedMetric { M185(final long v) { super(185, v); } } - - static class M186 extends TypedMetric { M186(final long v) { super(186, v); } } - - static class M187 extends TypedMetric { M187(final long v) { super(187, v); } } - - static class M188 extends TypedMetric { M188(final long v) { super(188, v); } } - - static class M189 extends TypedMetric { M189(final long v) { super(189, v); } } - - static class M190 extends TypedMetric { M190(final long v) { super(190, v); } } - - static class M191 extends TypedMetric { M191(final long v) { super(191, v); } } - - static class M192 extends TypedMetric { M192(final long v) { super(192, v); } } - - static class M193 extends TypedMetric { M193(final long v) { super(193, v); } } - - static class M194 extends TypedMetric { M194(final long v) { super(194, v); } } - - static class M195 extends TypedMetric { M195(final long v) { super(195, v); } } - - static class M196 extends TypedMetric { M196(final long v) { super(196, v); } } - - static class M197 extends TypedMetric { M197(final long v) { super(197, v); } } - - static class M198 extends TypedMetric { M198(final long v) { super(198, v); } } + /** + * Build minimal class bytecode: + * <pre> + * class {name} extends TypedMetric { + * {name}(long v) { super(typeId, v); } + * } + * </pre> + */ + private static byte[] buildClassBytes(final String thisClass, final String superClass, + final int typeId) throws IOException { + final ByteArrayOutputStream buf = new ByteArrayOutputStream(256); + final DataOutputStream out = new DataOutputStream(buf); + + // Magic + version (Java 11 = 55) + out.writeInt(0xCAFEBABE); + out.writeShort(0); + out.writeShort(55); + + // Constant pool (10 entries, 1-indexed => count = 11) + out.writeShort(11); + // #1 Methodref -> #3.#7 (superClass.<init>:(IJ)V) + out.writeByte(10); + out.writeShort(3); + out.writeShort(7); + // #2 Class -> #8 (this class) + out.writeByte(7); + out.writeShort(8); + // #3 Class -> #9 (super class) + out.writeByte(7); + out.writeShort(9); + // #4 Utf8 "<init>" + out.writeByte(1); + out.writeUTF("<init>"); + // #5 Utf8 "(J)V" + out.writeByte(1); + out.writeUTF("(J)V"); + // #6 Utf8 "Code" + out.writeByte(1); + out.writeUTF("Code"); + // #7 NameAndType -> #4:#10 (<init>:(IJ)V) + out.writeByte(12); + out.writeShort(4); + out.writeShort(10); + // #8 Utf8 this class name + out.writeByte(1); + out.writeUTF(thisClass); + // #9 Utf8 super class name + out.writeByte(1); + out.writeUTF(superClass); + // #10 Utf8 "(IJ)V" + out.writeByte(1); + out.writeUTF("(IJ)V"); + + // Access flags: ACC_SUPER (0x0020) + out.writeShort(0x0020); + // This class (#2), Super class (#3) + out.writeShort(2); + out.writeShort(3); + // Interfaces: 0 + out.writeShort(0); + // Fields: 0 + out.writeShort(0); + + // Methods: 1 (constructor) + out.writeShort(1); + // Method access_flags: 0 (package-private) + out.writeShort(0); + // Method name: #4 (<init>) + out.writeShort(4); + // Method descriptor: #5 ((J)V) + out.writeShort(5); + // Method attributes: 1 (Code) + out.writeShort(1); + + // Code attribute + out.writeShort(6); // attribute_name_index -> "Code" + + // Build bytecode first to know length + final byte[] code = buildConstructorCode(typeId); + final int codeAttrLen = 2 + 2 + 4 + code.length + 2 + 2; // max_stack + max_locals + code_length + code + exception_table_length + attributes_count + out.writeInt(codeAttrLen); + out.writeShort(4); // max_stack (this + int + long[2 slots]) + out.writeShort(3); // max_locals (this + long[2 slots]) + out.writeInt(code.length); + out.write(code); + out.writeShort(0); // exception_table_length + out.writeShort(0); // code attributes_count + + // Class attributes: 0 + out.writeShort(0); + + out.flush(); + return buf.toByteArray(); + } - static class M199 extends TypedMetric { M199(final long v) { super(199, v); } } + /** + * Constructor bytecode: aload_0, sipush typeId, lload_1, invokespecial #1, return + */ + private static byte[] buildConstructorCode(final int typeId) { + final ByteArrayOutputStream code = new ByteArrayOutputStream(16); + // aload_0 + code.write(0x2A); + // sipush typeId (works for 0..32767) + code.write(0x11); + code.write((typeId >> 8) & 0xFF); + code.write(typeId & 0xFF); + // lload_1 + code.write(0x1F); + // invokespecial #1 + code.write(0xB7); + code.write(0x00); + code.write(0x01); + // return + code.write(0xB1); + return code.toByteArray(); + } } diff --git a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java index 0f95402c56..9cdb5d883b 100644 --- a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java +++ b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java @@ -26,16 +26,16 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public class PartitionPolicyTest { @Test - public void testFixedReturnsExactCount() { - assertEquals(1, PartitionPolicy.fixed(1).resolve(4)); - assertEquals(8, PartitionPolicy.fixed(8).resolve(4)); - } + public void testFixedResolve() { + // Returns exact count, ignores both threadCount and handlerCount + assertEquals(1, PartitionPolicy.fixed(1).resolve(4, 0)); + assertEquals(8, PartitionPolicy.fixed(8).resolve(4, 0)); + assertEquals(4, PartitionPolicy.fixed(4).resolve(8, 500)); - @Test - public void testFixedIgnoresThreadCount() { + // Same result regardless of threadCount final PartitionPolicy policy = PartitionPolicy.fixed(5); - assertEquals(5, policy.resolve(1)); - assertEquals(5, policy.resolve(100)); + assertEquals(5, policy.resolve(1, 0)); + assertEquals(5, policy.resolve(100, 0)); } @Test @@ -49,15 +49,14 @@ public class PartitionPolicyTest { } @Test - public void testThreadMultiply() { - assertEquals(8, PartitionPolicy.threadMultiply(2).resolve(4)); - assertEquals(12, PartitionPolicy.threadMultiply(3).resolve(4)); - } + public void testThreadMultiplyResolve() { + // multiplier * threadCount, ignores handlerCount + assertEquals(8, PartitionPolicy.threadMultiply(2).resolve(4, 0)); + assertEquals(12, PartitionPolicy.threadMultiply(3).resolve(4, 0)); + assertEquals(16, PartitionPolicy.threadMultiply(2).resolve(8, 500)); - @Test - public void testThreadMultiplyMinOne() { // Even with 0 thread count, should return at least 1 - assertEquals(1, PartitionPolicy.threadMultiply(1).resolve(0)); + assertEquals(1, PartitionPolicy.threadMultiply(1).resolve(0, 0)); } @Test @@ -65,9 +64,56 @@ public class PartitionPolicyTest { assertThrows(IllegalArgumentException.class, () -> PartitionPolicy.threadMultiply(0)); } + @Test + public void testAdaptiveZeroHandlersReturnsThreadCount() { + assertEquals(8, PartitionPolicy.adaptive().resolve(8, 0)); + assertEquals(4, PartitionPolicy.adaptive().resolve(4, 0)); + assertEquals(1, PartitionPolicy.adaptive().resolve(0, 0)); + } + + @Test + public void testAdaptiveBelowThreshold() { + // 8 threads * 25 = 200 threshold, handlerCount <= 200 -> 1:1 + assertEquals(50, PartitionPolicy.adaptive().resolve(8, 50)); + assertEquals(100, PartitionPolicy.adaptive().resolve(8, 100)); + assertEquals(200, PartitionPolicy.adaptive().resolve(8, 200)); + } + + @Test + public void testAdaptiveAboveThreshold() { + // 8 threads * 25 = 200 threshold, excess at 1:2 ratio + assertEquals(350, PartitionPolicy.adaptive().resolve(8, 500)); // 200 + 300/2 + assertEquals(600, PartitionPolicy.adaptive().resolve(8, 1000)); // 200 + 800/2 + assertEquals(1100, PartitionPolicy.adaptive().resolve(8, 2000)); // 200 + 1800/2 + } + + @Test + public void testAdaptiveCustomMultiplier() { + // 8 threads * 50 = 400 threshold + assertEquals(100, PartitionPolicy.adaptive(50).resolve(8, 100)); // 1:1 + assertEquals(400, PartitionPolicy.adaptive(50).resolve(8, 400)); // 1:1 at threshold + assertEquals(500, PartitionPolicy.adaptive(50).resolve(8, 600)); // 400 + 200/2 + } + + @Test + public void testAdaptiveWithDifferentThreadCounts() { + // 4 threads * 25 = 100 threshold + assertEquals(50, PartitionPolicy.adaptive().resolve(4, 50)); // 1:1 + assertEquals(100, PartitionPolicy.adaptive().resolve(4, 100)); // 1:1 + assertEquals(350, PartitionPolicy.adaptive().resolve(4, 600)); // 100 + 500/2 + } + + @Test + public void testAdaptiveRejectsInvalidMultiplier() { + assertThrows(IllegalArgumentException.class, + () -> PartitionPolicy.adaptive(0)); + } + @Test public void testToString() { assertEquals("fixed(4)", PartitionPolicy.fixed(4).toString()); assertEquals("threadMultiply(2)", PartitionPolicy.threadMultiply(2).toString()); + assertEquals("adaptive(multiplier=25)", PartitionPolicy.adaptive().toString()); + assertEquals("adaptive(multiplier=50)", PartitionPolicy.adaptive(50).toString()); } } diff --git a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierBenchmark.java b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierBenchmark.java index f6efad4171..ad80a94761 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierBenchmark.java +++ b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierBenchmark.java @@ -28,137 +28,84 @@ import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.junit.jupiter.api.Test; /** - * Throughput benchmark for DataCarrier, comparable to BatchQueueBenchmark. + * Throughput benchmark for DataCarrier as a baseline for BatchQueue comparison. * * <p>Simulates the real production pattern: N DataCarrier instances (one per * metric type), each with 1 channel, all sharing a {@link BulkConsumePool} - * with 4 {@link org.apache.skywalking.oap.server.library.datacarrier.consumer.MultipleChannelsConsumer} threads. - * Producers round-robin across the N carriers by type. + * with 8 consumer threads. 32 producer threads simulate gRPC connections. * * <p>Run with: mvn test -pl oap-server/server-library/library-datacarrier-queue * -Dtest=DataCarrierBenchmark -DfailIfNoTests=false * * <h3>Reference results (Apple M3 Max, 128 GB RAM, macOS 26.2, JDK 17)</h3> * <pre> - * Benchmark Config IF_POSSIBLE BLOCKING - * ------------------------------- ---------------------- ------------- ------------- - * direct-1ch-1consumer 1ch, 1 consumer ~35,300,000 ~35,300,000 - * direct-8ch-4consumer 8ch, 4 consumers ~31,100,000 ~21,900,000 - * 40-types-pool4t (prod-like) 40 carriers, pool(4) ~42,000,000 ~40,600,000 - * 200-types-pool4t (prod-like) 200 carriers, pool(4) ~25,100,000 ~23,900,000 - * 200reg/50hot, 100 producers 200 carriers, pool(4) ~17,600,000 ~23,800,000 + * Types Producers Pool threads IF_POSSIBLE BLOCKING + * ------ --------- -------------- ------------- ------------- + * 500 32 pool(8) ~33,400,000 ~32,500,000 + * 1000 32 pool(8) ~37,600,000 ~36,000,000 + * 2000 32 pool(8) ~38,000,000 ~42,100,000 * - * All runs: 4 producer threads (except hot-types: 100), bufferSize=50,000, - * consumeCycle=1ms, 0% drop rate. + * All runs: 1 channel per carrier, bufferSize=50,000, consumeCycle=1ms, 0% drop rate. * </pre> * - * <h3>Comparison with BatchQueue — production case (4 consumer threads)</h3> + * <p><b>BatchQueue comparison (type-aware partitions, typed objects):</b> * <pre> - * Scenario DataCarrier BatchQueue - * (N carriers (1 queue + - * + BulkPool) handler map) - * ------------------------------- ----------- ----------- - * 40 types, 4 producers ~42,000,000 ~31,200,000 - * 200 types, 4 producers ~25,100,000 ~43,900,000 - * 200 reg / 50 hot, 100 producers ~17,600,000 ~26,900,000 (IF_POSSIBLE) - * 200 reg / 50 hot, 100 producers ~23,800,000 ~26,100,000 (BLOCKING) - * - * BatchQueue uses configurable PartitionSelector (default: typeHash). - * Same-type items always land on the same partition, so dispatch grouping - * is effectively a no-op. In the 200-type case, BatchQueue reaches ~44M/s - * (1.75x DataCarrier's 25M) because 200 types spread across 32 partitions - * with minimal collision, while DataCarrier's consumer threads must iterate - * all 200 groups per cycle. In the production-realistic hot-types scenario - * (200reg/50hot/100 producers), BatchQueue leads ~27M vs ~18M (IF_POSSIBLE). - * DataCarrier wins the 40-type case (~42M vs ~31M) where fewer types mean - * less iteration overhead for its per-carrier polling model. + * 500 types 1000 types 2000 types + * 1:4 -4% +6% +24% + * 1:2 +14% +32% +38% + * adaptive +37% +34% +68% + * 1:1 +53% +63% +99% * </pre> + * + * <p>BatchQueue adaptive() = threshold(threadCount * 25), 1:1 below, 1:2 + * above. Consistently outperforms DataCarrier across all type counts. + * See BatchQueueBenchmark for full details. */ @SuppressWarnings("all") public class DataCarrierBenchmark { private static final int WARMUP_SECONDS = 2; private static final int MEASURE_SECONDS = 5; - private static final int PRODUCER_THREADS = 4; + private static final int PRODUCER_THREADS = 32; + private static final int POOL_THREADS = 8; private static final int BUFFER_SIZE = 50_000; - // ---- Production-like: N types sharing BulkConsumePool ---- - - @Test - public void benchmark40TypesSharedPool4Threads() throws Exception { - runSharedPoolBenchmark("40-types-pool4t-ifpossible", - 40, 4, 1, BufferStrategy.IF_POSSIBLE); - } - - @Test - public void benchmark200TypesSharedPool4Threads() throws Exception { - runSharedPoolBenchmark("200-types-pool4t-ifpossible", - 200, 4, 1, BufferStrategy.IF_POSSIBLE); - } - - @Test - public void benchmark40TypesSharedPool4ThreadsBlocking() throws Exception { - runSharedPoolBenchmark("40-types-pool4t-blocking", - 40, 4, 1, BufferStrategy.BLOCKING); - } - - @Test - public void benchmark200TypesSharedPool4ThreadsBlocking() throws Exception { - runSharedPoolBenchmark("200-types-pool4t-blocking", - 200, 4, 1, BufferStrategy.BLOCKING); - } - - // ---- Production-realistic: 200 types, 50 hot, 100 gRPC producers ---- - @Test - public void benchmarkHotTypes200x50x100producers() throws Exception { - runHotTypesPoolBenchmark("200reg-50hot-100p-ifpossible", - 200, 50, 100, 4, 1, BufferStrategy.IF_POSSIBLE); + public void benchmark500Types() throws Exception { + runSharedPoolBenchmark("500-types", 500, BufferStrategy.IF_POSSIBLE); } @Test - public void benchmarkHotTypes200x50x100producersBlocking() throws Exception { - runHotTypesPoolBenchmark("200reg-50hot-100p-blocking", - 200, 50, 100, 4, 1, BufferStrategy.BLOCKING); + public void benchmark1000Types() throws Exception { + runSharedPoolBenchmark("1000-types", 1000, BufferStrategy.IF_POSSIBLE); } - // ---- Single carrier baselines ---- - @Test - public void benchmarkDirectConsumer1Thread() throws Exception { - runSingleBenchmark("direct-1ch-1consumer-ifpossible", - 1, 1, 1, BufferStrategy.IF_POSSIBLE); + public void benchmark2000Types() throws Exception { + runSharedPoolBenchmark("2000-types", 2000, BufferStrategy.IF_POSSIBLE); } @Test - public void benchmarkDirectConsumer4Threads() throws Exception { - runSingleBenchmark("direct-8ch-4consumer-ifpossible", - 8, 4, 1, BufferStrategy.IF_POSSIBLE); + public void benchmark500TypesBlocking() throws Exception { + runSharedPoolBenchmark("500-types-blocking", 500, BufferStrategy.BLOCKING); } @Test - public void benchmarkDirectConsumer1ThreadBlocking() throws Exception { - runSingleBenchmark("direct-1ch-1consumer-blocking", - 1, 1, 1, BufferStrategy.BLOCKING); + public void benchmark1000TypesBlocking() throws Exception { + runSharedPoolBenchmark("1000-types-blocking", 1000, BufferStrategy.BLOCKING); } @Test - public void benchmarkDirectConsumer4ThreadsBlocking() throws Exception { - runSingleBenchmark("direct-8ch-4consumer-blocking", - 8, 4, 1, BufferStrategy.BLOCKING); + public void benchmark2000TypesBlocking() throws Exception { + runSharedPoolBenchmark("2000-types-blocking", 2000, BufferStrategy.BLOCKING); } - /** - * Production-like benchmark: N DataCarrier instances sharing one BulkConsumePool. - * Each carrier has 1 channel. Producers round-robin across carriers by type. - */ private void runSharedPoolBenchmark(final String label, final int typeCount, - final int poolThreads, final long consumeCycleMs, final BufferStrategy strategy) throws Exception { final AtomicLong consumed = new AtomicLong(0); final BulkConsumePool pool = new BulkConsumePool( - "bench-pool", poolThreads, consumeCycleMs, false); + "bench-pool", POOL_THREADS, 1, false); final DataCarrier<Long>[] carriers = new DataCarrier[typeCount]; for (int i = 0; i < typeCount; i++) { @@ -181,7 +128,6 @@ public class DataCarrierBenchmark { final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 1000L; runProducers(carriers, warmupEnd); Thread.sleep(200); - final long warmupConsumed = consumed.get(); consumed.set(0); // Measure @@ -198,188 +144,16 @@ public class DataCarrierBenchmark { System.out.printf("%n=== DataCarrier Benchmark: %s ===%n" + " Types: %d (1 DataCarrier per type, 1 channel each)%n" + " Pool threads:%d%n" - + " ConsumeCycle:%d ms%n" - + " Strategy: %s%n" - + " Producers: %d%n" - + " Duration: %d ms%n" - + " Warmup consumed: %,d%n" - + " Produced: %,d%n" - + " Consumed: %,d%n" - + " Produce rate: %,.0f items/sec%n" - + " Consume rate: %,.0f items/sec%n" - + " Drop rate: %.2f%%%n", - label, typeCount, poolThreads, consumeCycleMs, strategy, - PRODUCER_THREADS, measureDuration, warmupConsumed, - produced, totalConsumed, - produced * 1000.0 / measureDuration, - totalConsumed * 1000.0 / measureDuration, - produced > 0 ? (produced - totalConsumed) * 100.0 / produced : 0); - } - - /** - * Production-realistic: 200 DataCarrier instances created, but only 50 hot types - * get traffic from 100 producer threads (simulating gRPC connections). - * Consumer threads must iterate all 200 groups each cycle, including 150 idle ones. - */ - private void runHotTypesPoolBenchmark(final String label, final int totalTypes, - final int hotTypes, final int producerCount, - final int poolThreads, final long consumeCycleMs, - final BufferStrategy strategy) throws Exception { - final AtomicLong consumed = new AtomicLong(0); - - final BulkConsumePool pool = new BulkConsumePool( - "bench-hot-pool", poolThreads, consumeCycleMs, false); - - // Create ALL carriers (200), each registered with the shared pool - final DataCarrier<Long>[] carriers = new DataCarrier[totalTypes]; - for (int i = 0; i < totalTypes; i++) { - carriers[i] = new DataCarrier<>( - "type-" + i, "bench", 1, BUFFER_SIZE, strategy); - carriers[i].consume(pool, new IConsumer<Long>() { - @Override - public void consume(final List<Long> data) { - consumed.addAndGet(data.size()); - } - - @Override - public void onError(final List<Long> data, final Throwable t) { - t.printStackTrace(); - } - }); - } - - // Only hot carriers get traffic - final DataCarrier<Long>[] hotCarriers = new DataCarrier[hotTypes]; - System.arraycopy(carriers, 0, hotCarriers, 0, hotTypes); - - // Warmup - final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 1000L; - runProducersN(hotCarriers, producerCount, warmupEnd); - Thread.sleep(200); - consumed.set(0); - - // Measure - final long measureStart = System.currentTimeMillis(); - final long measureEnd = measureStart + MEASURE_SECONDS * 1000L; - final long produced = runProducersN(hotCarriers, producerCount, measureEnd); - final long measureDuration = System.currentTimeMillis() - measureStart; - - Thread.sleep(500); - final long totalConsumed = consumed.get(); - - pool.close(null); - - System.out.printf("%n=== DataCarrier Benchmark: %s ===%n" - + " Total types: %d (all registered with pool)%n" - + " Hot types: %d (receiving traffic)%n" - + " Pool threads:%d%n" - + " ConsumeCycle:%d ms%n" - + " Strategy: %s%n" - + " Producers: %d (simulating gRPC connections)%n" - + " Duration: %d ms%n" - + " Produced: %,d%n" - + " Consumed: %,d%n" - + " Produce rate: %,.0f items/sec%n" - + " Consume rate: %,.0f items/sec%n" - + " Drop rate: %.2f%%%n", - label, totalTypes, hotTypes, poolThreads, consumeCycleMs, strategy, - producerCount, measureDuration, - produced, totalConsumed, - produced * 1000.0 / measureDuration, - totalConsumed * 1000.0 / measureDuration, - produced > 0 ? (produced - totalConsumed) * 100.0 / produced : 0); - } - - private long runProducersN(final DataCarrier<Long>[] carriers, - final int producerCount, - final long endTimeMs) throws InterruptedException { - final int carrierCount = carriers.length; - final AtomicLong totalProduced = new AtomicLong(0); - final CountDownLatch done = new CountDownLatch(producerCount); - - for (int p = 0; p < producerCount; p++) { - final int producerIndex = p; - final Thread thread = new Thread(() -> { - long count = 0; - int typeIndex = producerIndex; - while (System.currentTimeMillis() < endTimeMs) { - for (int batch = 0; batch < 100; batch++) { - final int type = typeIndex % carrierCount; - if (carriers[type].produce(count)) { - count++; - } - typeIndex++; - } - } - totalProduced.addAndGet(count); - done.countDown(); - }); - thread.setName("gRPC-" + producerIndex); - thread.setDaemon(true); - thread.start(); - } - - done.await(MEASURE_SECONDS + 10, TimeUnit.SECONDS); - return totalProduced.get(); - } - - private void runSingleBenchmark(final String label, final int channelSize, - final int consumerThreads, final long consumeCycleMs, - final BufferStrategy strategy) throws Exception { - final AtomicLong consumed = new AtomicLong(0); - - final DataCarrier<Long> carrier = new DataCarrier<>( - "bench-" + label, "bench", channelSize, BUFFER_SIZE, strategy); - - carrier.consume(new IConsumer<Long>() { - @Override - public void consume(final List<Long> data) { - consumed.addAndGet(data.size()); - } - - @Override - public void onError(final List<Long> data, final Throwable t) { - t.printStackTrace(); - } - }, consumerThreads, consumeCycleMs); - - final DataCarrier<Long>[] carriers = new DataCarrier[] {carrier}; - - // Warmup - final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 1000L; - runProducers(carriers, warmupEnd); - Thread.sleep(200); - final long warmupConsumed = consumed.get(); - consumed.set(0); - - // Measure - final long measureStart = System.currentTimeMillis(); - final long measureEnd = measureStart + MEASURE_SECONDS * 1000L; - final long produced = runProducers(carriers, measureEnd); - final long measureDuration = System.currentTimeMillis() - measureStart; - - Thread.sleep(500); - final long totalConsumed = consumed.get(); - - carrier.shutdownConsumers(); - - System.out.printf("%n=== DataCarrier Benchmark: %s ===%n" - + " Channels: %d%n" - + " Consumers: %d%n" - + " ConsumeCycle:%d ms%n" + " Strategy: %s%n" + " Producers: %d%n" + " Duration: %d ms%n" - + " Warmup consumed: %,d%n" + " Produced: %,d%n" + " Consumed: %,d%n" - + " Produce rate: %,.0f items/sec%n" + " Consume rate: %,.0f items/sec%n" + " Drop rate: %.2f%%%n", - label, channelSize, consumerThreads, consumeCycleMs, strategy, - PRODUCER_THREADS, measureDuration, warmupConsumed, + label, typeCount, POOL_THREADS, strategy, + PRODUCER_THREADS, measureDuration, produced, totalConsumed, - produced * 1000.0 / measureDuration, totalConsumed * 1000.0 / measureDuration, produced > 0 ? (produced - totalConsumed) * 100.0 / produced : 0); } @@ -408,6 +182,7 @@ public class DataCarrierBenchmark { done.countDown(); }); thread.setName("Producer-" + producerIndex); + thread.setDaemon(true); thread.start(); }
