This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch library-batch-queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit a2bd9aa7c5a4ecdaae27a6ffc40d008e52e73f28
Author: Wu Sheng <[email protected]>
AuthorDate: Fri Feb 13 23:27:02 2026 +0800

    Simplify PartitionPolicy API and clean up redundant tests
    
    Remove resolve(int) and isAdaptive() from PartitionPolicy — the single
    resolve(int, int) method handles all policy types uniformly. For adaptive
    policies with 0 handlers, resolve now returns threadCount as a sensible
    initial partition count, eliminating isAdaptive() branching in BatchQueue.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 .../server-library/library-batch-queue/DESIGN.md   |  37 +-
 .../oap/server/library/batchqueue/BatchQueue.java  | 177 +++--
 .../library/batchqueue/BatchQueueConfig.java       |   4 +-
 .../server/library/batchqueue/PartitionPolicy.java | 103 ++-
 .../library/batchqueue/BatchQueueBenchmark.java    | 523 +++++++-------
 .../library/batchqueue/BatchQueueConfigTest.java   |   4 +-
 .../server/library/batchqueue/BatchQueueTest.java  | 115 ++--
 .../library/batchqueue/BenchmarkMetricTypes.java   | 756 +++++----------------
 .../library/batchqueue/PartitionPolicyTest.java    |  76 ++-
 .../library/datacarrier/DataCarrierBenchmark.java  | 297 +-------
 10 files changed, 801 insertions(+), 1291 deletions(-)

diff --git a/oap-server/server-library/library-batch-queue/DESIGN.md 
b/oap-server/server-library/library-batch-queue/DESIGN.md
index 4094f56f29..bc107dbbaf 100644
--- a/oap-server/server-library/library-batch-queue/DESIGN.md
+++ b/oap-server/server-library/library-batch-queue/DESIGN.md
@@ -388,7 +388,7 @@ public class BatchQueue<T> {
             ScheduledExecutorService sharedScheduler = 
BatchQueueManager.getOrCreateSharedScheduler(
                 config.getSharedSchedulerName(), 
config.getSharedSchedulerThreads());
 
-            int partitionCount = config.getPartitions().resolve(1);
+            int partitionCount = config.getPartitions().resolve(1, 0);
             this.partitions = new ArrayBlockingQueue[partitionCount];
             for (int i = 0; i < partitions.length; i++) {
                 partitions[i] = new 
ArrayBlockingQueue<>(config.getBufferSize());
@@ -403,7 +403,7 @@ public class BatchQueue<T> {
         } else {
             // Dedicated scheduler mode: resolve threads and partitions.
             int threadCount = config.getThreads().resolve();  // cpuCores(1.0) 
→ 8 on 8-core
-            int partitionCount = config.getPartitions().resolve(threadCount);
+            int partitionCount = config.getPartitions().resolve(threadCount, 
0);
 
             // Validation: if partitions < threads, cut threads to match and 
warn.
             if (partitionCount < threadCount) {
@@ -899,33 +899,34 @@ public class ThreadPolicy {
  * Two modes:
  * - fixed(N): exactly N partitions, regardless of thread count.
  * - threadMultiply(N): N * resolved thread count.
+ * - adaptive(): partition count grows with registered handlers.
+ *   Threshold = threadCount * multiplier (default 25).
+ *   Below threshold: 1:1 (one partition per handler).
+ *   Above threshold: excess at 1:2 ratio.
  *
- * At queue construction time, the resolved partition count is validated:
- * if partitions < resolved thread count, thread count is reduced to match
- * partitions and a warning is logged.
+ * All policies resolved via resolve(threadCount, handlerCount).
+ * At queue construction time, if partitions < resolved thread count,
+ * thread count is reduced to match and a warning is logged.
  */
 public class PartitionPolicy {
     private final int fixedCount;     // > 0 for fixed mode
-    private final int multiplier;     // > 0 for threadMultiply mode
+    private final int multiplier;     // > 0 for threadMultiply/adaptive
+    private final boolean adaptive;   // true for adaptive mode
 
-    /**
-     * Fixed number of partitions.
-     * Example: fixed(8) → always 8 partitions.
-     */
     public static PartitionPolicy fixed(int count);
-
-    /**
-     * Partitions = multiplier * resolved thread count.
-     * Example: threadMultiply(2) with 4 resolved threads → 8 partitions.
-     */
     public static PartitionPolicy threadMultiply(int multiplier);
+    public static PartitionPolicy adaptive();
+    public static PartitionPolicy adaptive(int multiplier);
 
     /**
      * Resolve the actual partition count.
-     * For fixed mode, returns fixedCount.
-     * For threadMultiply mode, returns multiplier * resolvedThreadCount.
+     * - fixed: returns fixedCount (both params ignored).
+     * - threadMultiply: returns multiplier * resolvedThreadCount.
+     * - adaptive: handlerCount == 0 → resolvedThreadCount;
+     *   handlerCount <= threshold → handlerCount (1:1);
+     *   handlerCount > threshold → threshold + (excess / 2).
      */
-    public int resolve(int resolvedThreadCount);
+    public int resolve(int resolvedThreadCount, int handlerCount);
 }
 ```
 
diff --git 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
index 6f2d04b79e..f6a117039e 100644
--- 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
+++ 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -32,6 +33,19 @@ import lombok.extern.slf4j.Slf4j;
 /**
  * A partitioned, self-draining queue with type-based dispatch.
  *
+ * <h3>Usage</h3>
+ * <pre>
+ * BatchQueue queue = BatchQueueManager.create(name, config);
+ * queue.addHandler(TypeA.class, handlerA);   // register metric types
+ * queue.addHandler(TypeB.class, handlerB);   // partitions grow adaptively
+ * queue.produce(data);                        // data flows immediately
+ * </pre>
+ *
+ * <p>For adaptive partition policies, each {@link #addHandler} call 
recalculates
+ * the partition count from the current number of registered handlers, growing 
the
+ * partition array as needed. The thread count is resolved at construction time
+ * and remains fixed.
+ *
  * <h3>Produce workflow</h3>
  * <pre>
  * produce(data)
@@ -90,13 +104,14 @@ import lombok.extern.slf4j.Slf4j;
  * <pre>
  * shared scheduler, partitions=1, one consumer    --> I/O queue (gRPC, Kafka, 
JDBC)
  * dedicated fixed(1), partitions=1, many handlers --> TopN (all types share 1 
thread)
- * dedicated cpuCores(1.0), threadMultiply(2),
+ * dedicated cpuCores(1.0), adaptive(),
  *           many handlers                         --> metrics aggregation
  * </pre>
  */
 @Slf4j
 public class BatchQueue<T> {
     private final String name;
+    private final BatchQueueConfig<T> config;
 
     /** The thread pool that executes drain tasks. Either dedicated or shared. 
*/
     private final ScheduledExecutorService scheduler;
@@ -107,13 +122,6 @@ public class BatchQueue<T> {
     /** Non-null only for shared schedulers; used to release the ref count on 
shutdown. */
     private final String sharedSchedulerName;
 
-    /**
-     * Fixed-size partitions. Producers select a partition via {@link 
PartitionSelector}
-     * to reduce contention. Each partition is an {@link ArrayBlockingQueue} 
with
-     * capacity = bufferSize.
-     */
-    private final ArrayBlockingQueue<T>[] partitions;
-
     /**
      * Cached partition selector from config. Only used when {@code 
partitions.length > 1};
      * single-partition queues bypass the selector entirely.
@@ -128,25 +136,30 @@ public class BatchQueue<T> {
      */
     private final ConcurrentHashMap<Class<?>, HandlerConsumer<T>> handlerMap;
 
-    private final BatchQueueConfig<T> config;
+    /**
+     * Tracks unregistered types that have already been warned about,
+     * to avoid flooding the log with repeated errors.
+     */
+    private final Set<Class<?>> warnedUnregisteredTypes;
 
-    /** Set to false on {@link #shutdown()} to stop drain loops and reject new 
data. */
-    private volatile boolean running;
+    /** Resolved thread count, stored for adaptive partition recalculation. */
+    private final int resolvedThreadCount;
+
+    /** Number of drain tasks (equals thread count for dedicated, 1 for 
shared). */
+    private final int taskCount;
+
+    /**
+     * Partitions. Producers select a partition via {@link PartitionSelector}.
+     * For adaptive policies, this array grows via {@link #addHandler} as 
handlers
+     * are registered. Volatile for visibility to drain loop threads.
+     */
+    private volatile ArrayBlockingQueue<T>[] partitions;
 
     /**
      * Which partitions each drain task is responsible for.
-     * {@code assignedPartitions[taskIndex] = int[] of partition indices}.
-     *
-     * <p>Dedicated mode: partitions are distributed across threads by 
round-robin.
-     * Example with 4 partitions and 2 threads:
-     * <pre>
-     *   task 0 --> partitions [0, 2]
-     *   task 1 --> partitions [1, 3]
-     * </pre>
-     *
-     * <p>Shared mode: a single task drains all partitions.
+     * Volatile for visibility when partitions grow via {@link #addHandler}.
      */
-    private final int[][] assignedPartitions;
+    private volatile int[][] assignedPartitions;
 
     /**
      * Per-task count of consecutive idle cycles (no data drained).
@@ -154,23 +167,25 @@ public class BatchQueue<T> {
      */
     private final int[] consecutiveIdleCycles;
 
+    /** Set to false on {@link #shutdown()} to stop drain loops and reject new 
data. */
+    private volatile boolean running;
+
     @SuppressWarnings("unchecked")
     BatchQueue(final String name, final BatchQueueConfig<T> config) {
         this.name = name;
         this.config = config;
         this.partitionSelector = config.getPartitionSelector();
         this.handlerMap = new ConcurrentHashMap<>();
+        this.warnedUnregisteredTypes = ConcurrentHashMap.newKeySet();
 
-        final int taskCount;
         if (config.getSharedSchedulerName() != null) {
             // ---- Shared scheduler mode ----
-            // Borrow a scheduler from BatchQueueManager (ref-counted).
-            // Only 1 drain task; it drains all partitions sequentially.
             final ScheduledExecutorService sharedScheduler =
                 BatchQueueManager.getOrCreateSharedScheduler(
                     config.getSharedSchedulerName(), 
config.getSharedSchedulerThreads());
 
-            final int partitionCount = config.getPartitions().resolve(1);
+            this.resolvedThreadCount = 1;
+            final int partitionCount = config.getPartitions().resolve(1, 0);
             this.partitions = new ArrayBlockingQueue[partitionCount];
             for (int i = 0; i < partitions.length; i++) {
                 partitions[i] = new 
ArrayBlockingQueue<>(config.getBufferSize());
@@ -179,18 +194,16 @@ public class BatchQueue<T> {
             this.scheduler = sharedScheduler;
             this.dedicatedScheduler = false;
             this.sharedSchedulerName = config.getSharedSchedulerName();
-            taskCount = 1;
-            final int[] allPartitions = new int[partitions.length];
-            for (int i = 0; i < allPartitions.length; i++) {
-                allPartitions[i] = i;
-            }
-            this.assignedPartitions = new int[][] {allPartitions};
+            this.taskCount = 1;
+            this.assignedPartitions = buildAssignments(1, partitionCount);
         } else {
             // ---- Dedicated scheduler mode ----
-            // Create a private ScheduledThreadPool. Each thread drains
-            // a fixed subset of partitions (no cross-thread contention).
             int threadCount = config.getThreads().resolve();
-            final int partitionCount = 
config.getPartitions().resolve(threadCount);
+            this.resolvedThreadCount = threadCount;
+
+            // For adaptive with 0 handlers, resolve returns threadCount 
(sensible initial).
+            // For fixed/threadMultiply, resolve returns the configured count.
+            final int partitionCount = 
config.getPartitions().resolve(threadCount, 0);
 
             if (partitionCount < threadCount) {
                 log.warn("BatchQueue[{}]: partitions({}) < threads({}), 
reducing threads to {}",
@@ -211,24 +224,8 @@ public class BatchQueue<T> {
             });
             this.dedicatedScheduler = true;
             this.sharedSchedulerName = null;
-            taskCount = threadCount;
-
-            // Distribute partitions across threads by round-robin
-            this.assignedPartitions = new int[taskCount][];
-            final List<List<Integer>> assignment = new ArrayList<>();
-            for (int t = 0; t < taskCount; t++) {
-                assignment.add(new ArrayList<>());
-            }
-            for (int p = 0; p < partitions.length; p++) {
-                assignment.get(p % taskCount).add(p);
-            }
-            for (int t = 0; t < taskCount; t++) {
-                final List<Integer> parts = assignment.get(t);
-                assignedPartitions[t] = new int[parts.size()];
-                for (int i = 0; i < parts.size(); i++) {
-                    assignedPartitions[t][i] = parts.get(i);
-                }
-            }
+            this.taskCount = threadCount;
+            this.assignedPartitions = buildAssignments(threadCount, 
partitionCount);
         }
 
         this.consecutiveIdleCycles = new int[taskCount];
@@ -239,6 +236,28 @@ public class BatchQueue<T> {
         }
     }
 
+    /**
+     * Build round-robin partition-to-task assignments.
+     */
+    private static int[][] buildAssignments(final int taskCount, final int 
partitionCount) {
+        final int[][] result = new int[taskCount][];
+        final List<List<Integer>> assignment = new ArrayList<>();
+        for (int t = 0; t < taskCount; t++) {
+            assignment.add(new ArrayList<>());
+        }
+        for (int p = 0; p < partitionCount; p++) {
+            assignment.get(p % taskCount).add(p);
+        }
+        for (int t = 0; t < taskCount; t++) {
+            final List<Integer> parts = assignment.get(t);
+            result[t] = new int[parts.size()];
+            for (int i = 0; i < parts.size(); i++) {
+                result[t][i] = parts.get(i);
+            }
+        }
+        return result;
+    }
+
     /**
      * Schedule the next drain for the given task. The delay uses adaptive 
exponential
      * backoff: {@code minIdleMs * 2^consecutiveIdleCycles}, capped at 
maxIdleMs.
@@ -265,10 +284,29 @@ public class BatchQueue<T> {
     /**
      * Register a type-based handler. Items whose {@code getClass()} matches 
the given
      * type will be batched together and dispatched to this handler.
-     * Only used when no single consumer is set in config.
+     *
+     * <p>For adaptive partition policies, adding a handler recalculates the 
partition
+     * count and grows the partition array if needed. For non-adaptive 
policies the
+     * resolved count never changes, so this is a no-op beyond the 
registration.
+     * Drain loop threads pick up new partitions on their next cycle via 
volatile reads.
      */
+    @SuppressWarnings("unchecked")
     public void addHandler(final Class<? extends T> type, final 
HandlerConsumer<T> handler) {
         handlerMap.put(type, handler);
+
+        final int newPartitionCount = config.getPartitions()
+            .resolve(resolvedThreadCount, handlerMap.size());
+        final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;
+        if (newPartitionCount > currentPartitions.length) {
+            final ArrayBlockingQueue<T>[] grown = new 
ArrayBlockingQueue[newPartitionCount];
+            System.arraycopy(currentPartitions, 0, grown, 0, 
currentPartitions.length);
+            for (int i = currentPartitions.length; i < newPartitionCount; i++) 
{
+                grown[i] = new ArrayBlockingQueue<>(config.getBufferSize());
+            }
+            // Volatile writes — drain loop threads see the new assignments on 
next cycle
+            this.assignedPartitions = buildAssignments(taskCount, 
newPartitionCount);
+            this.partitions = grown;
+        }
     }
 
     /**
@@ -292,18 +330,19 @@ public class BatchQueue<T> {
         if (!running) {
             return false;
         }
-        final int index = partitions.length == 1
-            ? 0 : partitionSelector.select(data, partitions.length);
+        final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;
+        final int index = currentPartitions.length == 1
+            ? 0 : partitionSelector.select(data, currentPartitions.length);
         if (config.getStrategy() == BufferStrategy.BLOCKING) {
             try {
-                partitions[index].put(data);
+                currentPartitions[index].put(data);
                 return true;
             } catch (final InterruptedException e) {
                 Thread.currentThread().interrupt();
                 return false;
             }
         } else {
-            return partitions[index].offer(data);
+            return currentPartitions[index].offer(data);
         }
     }
 
@@ -312,17 +351,21 @@ public class BatchQueue<T> {
      * into a single list, then dispatches. Loops until partitions are empty,
      * then breaks to re-schedule with backoff.
      *
-     * <p>The loop structure ensures that a burst of data is fully drained 
before
-     * the task yields. This prevents data piling up across reschedule gaps.
+     * <p>Reads volatile references to {@code partitions} and {@code 
assignedPartitions}
+     * at the start of each cycle, so it picks up new partitions added via
+     * {@link #addHandler} on the next iteration.
      */
     void drainLoop(final int taskIndex) {
-        final int[] myPartitions = assignedPartitions[taskIndex];
+        final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;
+        final int[] myPartitions = this.assignedPartitions[taskIndex];
         try {
             while (running) {
                 // Drain all assigned partitions into one batch
                 final List<T> combined = new ArrayList<>();
                 for (final int partitionIndex : myPartitions) {
-                    partitions[partitionIndex].drainTo(combined);
+                    if (partitionIndex < currentPartitions.length) {
+                        currentPartitions[partitionIndex].drainTo(combined);
+                    }
                 }
 
                 if (combined.isEmpty()) {
@@ -355,7 +398,7 @@ public class BatchQueue<T> {
      *       to one consumer, regardless of item types.</li>
      *   <li><b>Handler map</b>: items are grouped by {@code item.getClass()}, 
then
      *       each group is dispatched to its registered handler. Unregistered 
types
-     *       are silently dropped.</li>
+     *       are logged as errors and dropped.</li>
      * </ol>
      */
     private void dispatch(final List<T> batch) {
@@ -382,6 +425,11 @@ public class BatchQueue<T> {
                 } catch (final Throwable t) {
                     handleError(entry.getValue(), t);
                 }
+            } else {
+                if (warnedUnregisteredTypes.add(entry.getKey())) {
+                    log.error("BatchQueue[{}]: no handler for type {}, {} 
items abandoned",
+                        name, entry.getKey().getName(), 
entry.getValue().size());
+                }
             }
         }
     }
@@ -427,8 +475,9 @@ public class BatchQueue<T> {
     void shutdown() {
         running = false;
         // Final drain — flush any remaining data to consumers
+        final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;
         final List<T> combined = new ArrayList<>();
-        for (final ArrayBlockingQueue<T> partition : partitions) {
+        for (final ArrayBlockingQueue<T> partition : currentPartitions) {
             partition.drainTo(combined);
         }
         if (!combined.isEmpty()) {
diff --git 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java
 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java
index 32ca86c2bf..5de0fd39d2 100644
--- 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java
+++ 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java
@@ -68,10 +68,10 @@ public class BatchQueueConfig<T> {
     private QueueErrorHandler<T> errorHandler;
 
     @Builder.Default
-    private long minIdleMs = 5;
+    private long minIdleMs = 1;
 
     @Builder.Default
-    private long maxIdleMs = 200;
+    private long maxIdleMs = 50;
 
     void validate() {
         final boolean hasDedicated = threads != null;
diff --git 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
index ac03e9f75d..6f3428a6f3 100644
--- 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
+++ 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
@@ -21,56 +21,128 @@ package 
org.apache.skywalking.oap.server.library.batchqueue;
 /**
  * Determines the number of partitions for a BatchQueue.
  *
- * Two modes:
- * - fixed(N): exactly N partitions, regardless of thread count.
- * - threadMultiply(N): N * resolved thread count.
+ * <ul>
+ *   <li>{@link #fixed(int)}: exactly N partitions, regardless of thread 
count.</li>
+ *   <li>{@link #threadMultiply(int)}: N * resolved thread count.</li>
+ *   <li>{@link #adaptive()}: recommended for metrics aggregation. The 
partition
+ *       count grows as handlers are registered via {@link 
BatchQueue#addHandler}.
+ *       Uses {@code threadCount * multiplier} (default 25) as a threshold.
+ *       Below threshold, 1:1 mapping (one partition per handler). Above 
threshold,
+ *       excess handlers share partitions at 1:2 ratio.</li>
+ * </ul>
  *
- * At queue construction time, the resolved partition count is validated:
- * if partitions less than resolved thread count, thread count is reduced to 
match
- * partitions and a warning is logged.
+ * <p>All policies are resolved via {@link #resolve(int, int)}. For 
non-adaptive
+ * policies the handlerCount parameter is ignored. At queue creation time, if 
the
+ * resolved partition count is less than the thread count, the thread count is
+ * reduced to match and a warning is logged.
  */
 public class PartitionPolicy {
+    private static final int DEFAULT_ADAPTIVE_MULTIPLIER = 25;
+
     private final int fixedCount;
     private final int multiplier;
+    private final boolean adaptive;
 
-    private PartitionPolicy(final int fixedCount, final int multiplier) {
+    private PartitionPolicy(final int fixedCount, final int multiplier,
+                            final boolean adaptive) {
         this.fixedCount = fixedCount;
         this.multiplier = multiplier;
+        this.adaptive = adaptive;
     }
 
     /**
      * Fixed number of partitions.
      *
-     * @throws IllegalArgumentException if count < 1
+     * @throws IllegalArgumentException if count &lt; 1
      */
     public static PartitionPolicy fixed(final int count) {
         if (count < 1) {
             throw new IllegalArgumentException("Partition count must be >= 1, 
got: " + count);
         }
-        return new PartitionPolicy(count, 0);
+        return new PartitionPolicy(count, 0, false);
     }
 
     /**
      * Partitions = multiplier * resolved thread count.
      *
-     * @throws IllegalArgumentException if multiplier < 1
+     * @throws IllegalArgumentException if multiplier &lt; 1
      */
     public static PartitionPolicy threadMultiply(final int multiplier) {
         if (multiplier < 1) {
             throw new IllegalArgumentException("Partition multiplier must be 
>= 1, got: " + multiplier);
         }
-        return new PartitionPolicy(0, multiplier);
+        return new PartitionPolicy(0, multiplier, false);
+    }
+
+    /**
+     * Adaptive partition count with default threshold multiplier (25).
+     *
+     * <p>The partition count grows as handlers are registered via
+     * {@link BatchQueue#addHandler}:
+     * <ul>
+     *   <li>Threshold = {@code threadCount * 25}</li>
+     *   <li>handlerCount &lt;= threshold: one partition per handler (1:1)</li>
+     *   <li>handlerCount &gt; threshold: {@code threshold + (handlerCount - 
threshold) / 2}</li>
+     *   <li>handlerCount == 0: returns {@code threadCount} as initial 
count</li>
+     * </ul>
+     *
+     * <p>Examples with 8 threads (threshold = 200):
+     * <pre>
+     *     0 handlers →   8 partitions  (initial = threadCount)
+     *   100 handlers → 100 partitions  (1:1, below threshold)
+     *   200 handlers → 200 partitions  (1:1, at threshold)
+     *   500 handlers → 350 partitions  (200 + 300/2)
+     *  1000 handlers → 600 partitions  (200 + 800/2)
+     *  2000 handlers → 1100 partitions (200 + 1800/2)
+     * </pre>
+     */
+    public static PartitionPolicy adaptive() {
+        return new PartitionPolicy(0, DEFAULT_ADAPTIVE_MULTIPLIER, true);
+    }
+
+    /**
+     * Adaptive partition count with custom threshold multiplier.
+     *
+     * <p>Threshold = {@code threadCount * multiplier}. Below threshold, one
+     * partition per handler (1:1). Above threshold, excess handlers share
+     * at 1:2 ratio: {@code threshold + (handlerCount - threshold) / 2}.
+     *
+     * @param multiplier threshold per thread (default 25)
+     * @throws IllegalArgumentException if multiplier &lt; 1
+     */
+    public static PartitionPolicy adaptive(final int multiplier) {
+        if (multiplier < 1) {
+            throw new IllegalArgumentException(
+                "adaptive multiplier must be >= 1, got: " + multiplier);
+        }
+        return new PartitionPolicy(0, multiplier, true);
     }
 
     /**
      * Resolve the actual partition count.
-     * For fixed mode, returns fixedCount.
-     * For threadMultiply mode, returns multiplier * resolvedThreadCount (min 
1).
+     * <ul>
+     *   <li>fixed: returns the pre-set count (both parameters ignored).</li>
+     *   <li>threadMultiply: returns multiplier * resolvedThreadCount 
(handlerCount ignored).</li>
+     *   <li>adaptive: when handlerCount is 0, returns resolvedThreadCount as 
a sensible
+     *       initial count. Otherwise, threshold = threadCount * multiplier; 
if handlerCount
+     *       &lt;= threshold, returns handlerCount (1:1). If above, returns
+     *       threshold + (handlerCount - threshold) / 2.</li>
+     * </ul>
      */
-    public int resolve(final int resolvedThreadCount) {
+    public int resolve(final int resolvedThreadCount, final int handlerCount) {
         if (fixedCount > 0) {
             return fixedCount;
         }
+        if (adaptive) {
+            if (handlerCount == 0) {
+                return Math.max(1, resolvedThreadCount);
+            }
+            final int threshold = Math.max(1, multiplier * 
resolvedThreadCount);
+            if (handlerCount <= threshold) {
+                return handlerCount;
+            }
+            return threshold + (handlerCount - threshold) / 2;
+        }
         return Math.max(1, multiplier * resolvedThreadCount);
     }
 
@@ -79,6 +151,9 @@ public class PartitionPolicy {
         if (fixedCount > 0) {
             return "fixed(" + fixedCount + ")";
         }
+        if (adaptive) {
+            return "adaptive(multiplier=" + multiplier + ")";
+        }
         return "threadMultiply(" + multiplier + ")";
     }
 }
diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java
index e4e8ceee25..115a03fcad 100644
--- 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java
@@ -26,50 +26,56 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 /**
- * Throughput benchmark for BatchQueue consuming with handler-map dispatch.
+ * Throughput benchmark for BatchQueue with handler-map dispatch.
  *
- * <p>Measures how many metric items per second the queue can consume
- * when there are 40 or 200 distinct metric types, each routed
- * to its own handler via class-based dispatch.
+ * <p>Simulates production OAP metrics aggregation: 500-2000 distinct metric 
types,
+ * 32 gRPC producer threads, 8 consumer threads. Tests various partition 
strategies
+ * from fixed small counts to 1:1 partition-per-type binding.
  *
  * <p>Run with: mvn test -pl oap-server/server-library/library-batch-queue
  *           -Dtest=BatchQueueBenchmark -DfailIfNoTests=false
  *
  * <h3>Reference results (Apple M3 Max, 128 GB RAM, macOS 26.2, JDK 17)</h3>
+ *
+ * <p><b>Fixed partitions (typeHash selector):</b>
  * <pre>
- * Benchmark                    Types  Threads  Partitions       IF_POSSIBLE   
    BLOCKING
- * ---------------------------- ------ -------- --------------- ------------- 
-------------
- * 40-types-1thread               40   fixed(1)  fixed(1)        ~33,200,000   
~32,200,000
- * 200-types-1thread             200   fixed(1)  fixed(1)        ~34,600,000   
~33,900,000
- * direct-consumer-4threads        1   fixed(4)  multiply(8)     ~19,200,000   
~18,800,000
- * 40-types-4threads              40   fixed(4)  multiply(8)     ~31,200,000   
~44,300,000
- * 200-types-4threads            200   fixed(4)  multiply(8)     ~43,900,000   
~43,400,000
- * 200reg/50hot, 100 producers   200   fixed(4)  multiply(8)     ~26,900,000   
~26,100,000
+ * Partitions  BufSize   500 types (IF/BLK)      1000 types (IF/BLK)     2000 
types (IF/BLK)
+ * ---------- -------- ----------------------- ----------------------- 
-----------------------
+ * fixed(16)   50,000   ~19.0M / ~12.3M         ~17.9M / ~17.1M         ~15.7M 
/ ~15.2M
+ * fixed(64)  500,000   ~17.0M / ~18.0M         ~17.7M / ~18.4M         ~17.7M 
/ ~18.0M
+ * fixed(128) 500,000   ~24.1M / ~23.2M         ~24.9M / ~26.1M         ~25.3M 
/ ~24.8M
+ * </pre>
  *
- * All runs: bufferSize=50,000, 0% drop rate.
- * Single-partition queues (fixed(1)) bypass PartitionSelector entirely.
- * Multi-partition queues default to PartitionSelector.typeHash() —
- * same type always hits the same partition, so each consumer thread
- * drains pre-grouped batches. Custom selectors can be set via config.
+ * <p><b>Type-aware partitions (typeId selector, 50K buffer each):</b>
+ * <pre>
+ * Ratio       Partitions   500 types (IF/BLK)      1000 types (IF/BLK)     
2000 types (IF/BLK)
+ * ---------- ------------ ----------------------- ----------------------- 
-----------------------
+ * 1:4         types/4      ~32.1M / ~29.9M         ~40.0M / ~40.5M         
~47.1M / ~50.7M
+ * 1:2         types/2      ~38.2M / ~35.9M         ~49.6M / ~35.9M         
~52.6M / ~58.6M
+ * adaptive    350/600/1100 ~45.7M / ~46.3M         ~50.5M / ~54.1M         
~64.0M / ~60.3M
+ * 1:1         types        ~51.3M / ~54.4M         ~61.2M / ~62.5M         
~75.7M / ~67.4M
  * </pre>
  *
- * <h3>Comparison with DataCarrier — production case (4 consumer threads)</h3>
+ * <p><b>DataCarrier baseline (N independent carriers, raw Long values, 
pool(8)):</b>
  * <pre>
- * Scenario                        DataCarrier     BatchQueue
- *                                 (N carriers     (1 queue +
- *                                 + BulkPool)     handler map)
- * ------------------------------- -----------     -----------
- * 40 types, 4 producers            ~42,000,000    ~31,200,000
- * 200 types, 4 producers           ~25,100,000    ~43,900,000
- * 200 reg / 50 hot, 100 producers  ~17,600,000    ~26,900,000  (IF_POSSIBLE)
- * 200 reg / 50 hot, 100 producers  ~23,800,000    ~26,100,000  (BLOCKING)
+ *   500 types:  ~33.4M IF_POSSIBLE / ~32.5M BLOCKING
+ *  1000 types:  ~37.6M IF_POSSIBLE / ~36.0M BLOCKING
+ *  2000 types:  ~38.0M IF_POSSIBLE / ~42.1M BLOCKING
+ * </pre>
  *
- * BatchQueue uses configurable PartitionSelector (default: typeHash).
- * Same-type items land on the same partition, so dispatch() grouping is
- * effectively a no-op. The production-realistic case (200reg/50hot/100
- * producers) shows BatchQueue at ~27M vs DataCarrier at ~18M, a 1.5x
- * advantage.
+ * <p><b>BatchQueue vs DataCarrier (IF_POSSIBLE):</b>
+ * <pre>
+ *              500 types   1000 types   2000 types
+ * 1:4           -4%         +6%          +24%
+ * 1:2          +14%        +32%          +38%
+ * adaptive     +37%        +34%          +68%
+ * 1:1          +53%        +63%          +99%
  * </pre>
+ *
+ * <p>All runs: 32 producers, fixed(8) threads, minIdleMs=1, maxIdleMs=50, 0% 
drop rate.
+ * 2000 metric types generated at runtime via bytecode (see {@link 
BenchmarkMetricTypes}).
+ * Adaptive policy: {@link PartitionPolicy#adaptive()} with threshold = 
threadCount * 25.
+ * Below threshold: 1:1 (one partition per type). Above: excess at 1:2 ratio.
  */
 @Slf4j
 @SuppressWarnings("all")
@@ -77,274 +83,345 @@ public class BatchQueueBenchmark {
 
     private static final int WARMUP_SECONDS = 2;
     private static final int MEASURE_SECONDS = 5;
-    private static final int PRODUCER_THREADS = 4;
+    private static final int PRODUCER_THREADS = 32;
+    private static final ThreadPolicy THREADS = ThreadPolicy.fixed(8);
 
     @AfterEach
     public void cleanup() {
         BatchQueueManager.reset();
     }
 
-    // ---- IF_POSSIBLE (non-blocking) benchmarks ----
+    // ---- Fixed partitions, typeHash selector ----
+    // fixed(16)  50K:  ~19.0M/~12.3M  ~17.9M/~17.1M  ~15.7M/~15.2M
+    // fixed(64) 500K:  ~17.0M/~18.0M  ~17.7M/~18.4M  ~17.7M/~18.0M
+    // fixed(128)500K:  ~24.1M/~23.2M  ~24.9M/~26.1M  ~25.3M/~24.8M
 
     @Test
-    public void benchmarkHandlerMapDispatch40Types() throws Exception {
-        runBenchmark("40-types-4t-ifpossible", 40, ThreadPolicy.fixed(4),
-            PartitionPolicy.threadMultiply(8), BufferStrategy.IF_POSSIBLE);
+    public void benchmark500Types() throws Exception {
+        runBenchmark("500-types", 500, 16, 50_000, BufferStrategy.IF_POSSIBLE);
     }
 
     @Test
-    public void benchmarkHandlerMapDispatch200Types() throws Exception {
-        runBenchmark("200-types-4t-ifpossible", 200, ThreadPolicy.fixed(4),
-            PartitionPolicy.threadMultiply(8), BufferStrategy.IF_POSSIBLE);
+    public void benchmark1000Types() throws Exception {
+        runBenchmark("1000-types", 1000, 16, 50_000, 
BufferStrategy.IF_POSSIBLE);
     }
 
     @Test
-    public void benchmarkDirectConsumer() throws Exception {
-        runDirectBenchmark("direct-4t-ifpossible", ThreadPolicy.fixed(4),
-            PartitionPolicy.threadMultiply(8), BufferStrategy.IF_POSSIBLE);
+    public void benchmark2000Types() throws Exception {
+        runBenchmark("2000-types", 2000, 16, 50_000, 
BufferStrategy.IF_POSSIBLE);
     }
 
     @Test
-    public void benchmarkHandlerMapDispatch40TypesSingleThread() throws 
Exception {
-        runBenchmark("40-types-1t-ifpossible", 40, ThreadPolicy.fixed(1),
-            PartitionPolicy.fixed(1), BufferStrategy.IF_POSSIBLE);
+    public void benchmark500TypesBlocking() throws Exception {
+        runBenchmark("500-types-blocking", 500, 16, 50_000, 
BufferStrategy.BLOCKING);
     }
 
     @Test
-    public void benchmarkHandlerMapDispatch200TypesSingleThread() throws 
Exception {
-        runBenchmark("200-types-1t-ifpossible", 200, ThreadPolicy.fixed(1),
-            PartitionPolicy.fixed(1), BufferStrategy.IF_POSSIBLE);
+    public void benchmark1000TypesBlocking() throws Exception {
+        runBenchmark("1000-types-blocking", 1000, 16, 50_000, 
BufferStrategy.BLOCKING);
     }
 
-    // ---- BLOCKING benchmarks ----
+    @Test
+    public void benchmark2000TypesBlocking() throws Exception {
+        runBenchmark("2000-types-blocking", 2000, 16, 50_000, 
BufferStrategy.BLOCKING);
+    }
 
     @Test
-    public void benchmarkHandlerMapDispatch40TypesBlocking() throws Exception {
-        runBenchmark("40-types-4t-blocking", 40, ThreadPolicy.fixed(4),
-            PartitionPolicy.threadMultiply(8), BufferStrategy.BLOCKING);
+    public void benchmark500Types_64p() throws Exception {
+        runBenchmark("500-types-64p", 500, 64, 500_000, 
BufferStrategy.IF_POSSIBLE);
     }
 
     @Test
-    public void benchmarkHandlerMapDispatch200TypesBlocking() throws Exception 
{
-        runBenchmark("200-types-4t-blocking", 200, ThreadPolicy.fixed(4),
-            PartitionPolicy.threadMultiply(8), BufferStrategy.BLOCKING);
+    public void benchmark1000Types_64p() throws Exception {
+        runBenchmark("1000-types-64p", 1000, 64, 500_000, 
BufferStrategy.IF_POSSIBLE);
     }
 
     @Test
-    public void benchmarkDirectConsumerBlocking() throws Exception {
-        runDirectBenchmark("direct-4t-blocking", ThreadPolicy.fixed(4),
-            PartitionPolicy.threadMultiply(8), BufferStrategy.BLOCKING);
+    public void benchmark2000Types_64p() throws Exception {
+        runBenchmark("2000-types-64p", 2000, 64, 500_000, 
BufferStrategy.IF_POSSIBLE);
     }
 
     @Test
-    public void benchmarkHandlerMapDispatch40TypesSingleThreadBlocking() 
throws Exception {
-        runBenchmark("40-types-1t-blocking", 40, ThreadPolicy.fixed(1),
-            PartitionPolicy.fixed(1), BufferStrategy.BLOCKING);
+    public void benchmark500TypesBlocking_64p() throws Exception {
+        runBenchmark("500-types-blocking-64p", 500, 64, 500_000, 
BufferStrategy.BLOCKING);
     }
 
     @Test
-    public void benchmarkHandlerMapDispatch200TypesSingleThreadBlocking() 
throws Exception {
-        runBenchmark("200-types-1t-blocking", 200, ThreadPolicy.fixed(1),
-            PartitionPolicy.fixed(1), BufferStrategy.BLOCKING);
+    public void benchmark1000TypesBlocking_64p() throws Exception {
+        runBenchmark("1000-types-blocking-64p", 1000, 64, 500_000, 
BufferStrategy.BLOCKING);
     }
 
-    // ---- Production-realistic: 200 types, 50 hot, 100 gRPC producers ----
+    @Test
+    public void benchmark2000TypesBlocking_64p() throws Exception {
+        runBenchmark("2000-types-blocking-64p", 2000, 64, 500_000, 
BufferStrategy.BLOCKING);
+    }
 
     @Test
-    public void benchmarkHotTypes200x50x100producers() throws Exception {
-        runHotTypesBenchmark("200reg-50hot-100p-ifpossible", 200, 50, 100,
-            ThreadPolicy.fixed(4), PartitionPolicy.threadMultiply(8),
-            BufferStrategy.IF_POSSIBLE);
+    public void benchmark500Types_128p() throws Exception {
+        runBenchmark("500-types-128p", 500, 128, 500_000, 
BufferStrategy.IF_POSSIBLE);
     }
 
     @Test
-    public void benchmarkHotTypes200x50x100producersBlocking() throws 
Exception {
-        runHotTypesBenchmark("200reg-50hot-100p-blocking", 200, 50, 100,
-            ThreadPolicy.fixed(4), PartitionPolicy.threadMultiply(8),
-            BufferStrategy.BLOCKING);
+    public void benchmark1000Types_128p() throws Exception {
+        runBenchmark("1000-types-128p", 1000, 128, 500_000, 
BufferStrategy.IF_POSSIBLE);
     }
 
-    private void runHotTypesBenchmark(final String label, final int totalTypes,
-                                      final int hotTypes, final int 
producerCount,
-                                      final ThreadPolicy threads,
-                                      final PartitionPolicy partitions,
-                                      final BufferStrategy strategy) throws 
Exception {
-        final AtomicLong consumed = new AtomicLong(0);
+    @Test
+    public void benchmark2000Types_128p() throws Exception {
+        runBenchmark("2000-types-128p", 2000, 128, 500_000, 
BufferStrategy.IF_POSSIBLE);
+    }
 
-        final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue = 
BatchQueueManager.create(
-            "bench-" + label,
-            BatchQueueConfig.<BenchmarkMetricTypes.TypedMetric>builder()
-                .threads(threads)
-                .partitions(partitions)
-                .bufferSize(50_000)
-                .strategy(strategy)
-                .minIdleMs(1)
-                .maxIdleMs(50)
-                .build());
+    @Test
+    public void benchmark500TypesBlocking_128p() throws Exception {
+        runBenchmark("500-types-blocking-128p", 500, 128, 500_000, 
BufferStrategy.BLOCKING);
+    }
 
-        // Register handlers for ALL 200 types
-        for (int t = 0; t < totalTypes; t++) {
-            queue.addHandler(BenchmarkMetricTypes.CLASSES[t],
-                (HandlerConsumer<BenchmarkMetricTypes.TypedMetric>) data ->
-                    consumed.addAndGet(data.size()));
-        }
+    @Test
+    public void benchmark1000TypesBlocking_128p() throws Exception {
+        runBenchmark("1000-types-blocking-128p", 1000, 128, 500_000, 
BufferStrategy.BLOCKING);
+    }
 
-        // Warmup — only hot types get traffic
-        final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 
1000L;
-        runProducersHot(queue, hotTypes, producerCount, warmupEnd);
-        Thread.sleep(200);
-        consumed.set(0);
+    @Test
+    public void benchmark2000TypesBlocking_128p() throws Exception {
+        runBenchmark("2000-types-blocking-128p", 2000, 128, 500_000, 
BufferStrategy.BLOCKING);
+    }
 
-        // Measure
-        final long measureStart = System.currentTimeMillis();
-        final long measureEnd = measureStart + MEASURE_SECONDS * 1000L;
-        final long produced = runProducersHot(queue, hotTypes, producerCount, 
measureEnd);
-        final long measureDuration = System.currentTimeMillis() - measureStart;
+    // ---- Type-aware partitions, typeId selector, 50K buffer each ----
+    // 1:4       types/4:       ~32.1M/~29.9M  ~40.0M/~40.5M  ~47.1M/~50.7M
+    // 1:2       types/2:       ~38.2M/~35.9M  ~49.6M/~35.9M  ~52.6M/~58.6M
+    // adaptive  350/600/1100:  ~45.7M/~46.3M  ~50.5M/~54.1M  ~64.0M/~60.3M
+    // 1:1       types:         ~51.3M/~54.4M  ~61.2M/~62.5M  ~75.7M/~67.4M
 
-        Thread.sleep(500);
-        final long totalConsumed = consumed.get();
+    private static final PartitionSelector<BenchmarkMetricTypes.TypedMetric> 
TYPE_ID_SELECTOR =
+        (data, count) -> data.typeId % count;
 
-        log.info("\n=== BatchQueue Benchmark: {} ===\n"
-                + "  Total types: {} (handlers registered)\n"
-                + "  Hot types:   {} (receiving traffic)\n"
-                + "  Threads:     {}\n"
-                + "  Partitions:  {}\n"
-                + "  Producers:   {} (simulating gRPC connections)\n"
-                + "  Duration:    {} ms\n"
-                + "  Produced:    {}\n"
-                + "  Consumed:    {}\n"
-                + "  Produce rate:  {} items/sec\n"
-                + "  Consume rate:  {} items/sec\n"
-                + "  Drop rate:     {}%\n",
-            label, totalTypes, hotTypes, threads, partitions, producerCount,
-            measureDuration,
-            String.format("%,d", produced), String.format("%,d", 
totalConsumed),
-            String.format("%,.0f", produced * 1000.0 / measureDuration),
-            String.format("%,.0f", totalConsumed * 1000.0 / measureDuration),
-            String.format("%.2f", produced > 0
-                ? (produced - totalConsumed) * 100.0 / produced : 0));
+    @Test
+    public void benchmark500Types_quarter() throws Exception {
+        runBenchmark("500-types-quarter", 500, 125, 50_000,
+            BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true);
     }
 
-    private long runProducersHot(final 
BatchQueue<BenchmarkMetricTypes.TypedMetric> queue,
-                                 final int hotTypes, final int producerCount,
-                                 final long endTimeMs) throws 
InterruptedException {
-        final AtomicLong totalProduced = new AtomicLong(0);
-        final CountDownLatch done = new CountDownLatch(producerCount);
+    @Test
+    public void benchmark1000Types_quarter() throws Exception {
+        runBenchmark("1000-types-quarter", 1000, 250, 50_000,
+            BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true);
+    }
 
-        for (int p = 0; p < producerCount; p++) {
-            final int producerIndex = p;
-            final Thread thread = new Thread(() -> {
-                long count = 0;
-                int typeIndex = producerIndex;
-                while (System.currentTimeMillis() < endTimeMs) {
-                    for (int batch = 0; batch < 100; batch++) {
-                        final int type = typeIndex % hotTypes;
-                        if 
(queue.produce(BenchmarkMetricTypes.FACTORIES[type].create(count))) {
-                            count++;
-                        }
-                        typeIndex++;
-                    }
-                }
-                totalProduced.addAndGet(count);
-                done.countDown();
-            });
-            thread.setName("gRPC-" + producerIndex);
-            thread.setDaemon(true);
-            thread.start();
-        }
+    @Test
+    public void benchmark2000Types_quarter() throws Exception {
+        runBenchmark("2000-types-quarter", 2000, 500, 50_000,
+            BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true);
+    }
 
-        done.await(MEASURE_SECONDS + 10, TimeUnit.SECONDS);
-        return totalProduced.get();
+    @Test
+    public void benchmark500TypesBlocking_quarter() throws Exception {
+        runBenchmark("500-types-blocking-quarter", 500, 125, 50_000,
+            BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true);
     }
 
-    private void runBenchmark(final String label, final int typeCount,
-                              final ThreadPolicy threads,
-                              final PartitionPolicy partitions,
-                              final BufferStrategy strategy) throws Exception {
-        final AtomicLong consumed = new AtomicLong(0);
+    @Test
+    public void benchmark1000TypesBlocking_quarter() throws Exception {
+        runBenchmark("1000-types-blocking-quarter", 1000, 250, 50_000,
+            BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true);
+    }
 
-        final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue = 
BatchQueueManager.create(
-            "bench-" + label,
-            BatchQueueConfig.<BenchmarkMetricTypes.TypedMetric>builder()
-                .threads(threads)
-                .partitions(partitions)
-                .bufferSize(50_000)
-                .strategy(strategy)
-                .minIdleMs(1)
-                .maxIdleMs(50)
-                .build());
+    @Test
+    public void benchmark2000TypesBlocking_quarter() throws Exception {
+        runBenchmark("2000-types-blocking-quarter", 2000, 500, 50_000,
+            BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true);
+    }
 
-        for (int t = 0; t < typeCount; t++) {
-            queue.addHandler(BenchmarkMetricTypes.CLASSES[t],
-                (HandlerConsumer<BenchmarkMetricTypes.TypedMetric>) data ->
-                    consumed.addAndGet(data.size()));
-        }
+    @Test
+    public void benchmark500Types_half() throws Exception {
+        runBenchmark("500-types-half", 500, 250, 50_000,
+            BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true);
+    }
 
-        // Warmup
-        final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 
1000L;
-        runProducers(queue, typeCount, warmupEnd);
-        Thread.sleep(200);
-        final long warmupConsumed = consumed.get();
-        consumed.set(0);
+    @Test
+    public void benchmark1000Types_half() throws Exception {
+        runBenchmark("1000-types-half", 1000, 500, 50_000,
+            BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true);
+    }
 
-        // Measure
-        final long measureStart = System.currentTimeMillis();
-        final long measureEnd = measureStart + MEASURE_SECONDS * 1000L;
-        final long produced = runProducers(queue, typeCount, measureEnd);
-        final long measureDuration = System.currentTimeMillis() - measureStart;
+    @Test
+    public void benchmark2000Types_half() throws Exception {
+        runBenchmark("2000-types-half", 2000, 1000, 50_000,
+            BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true);
+    }
 
-        Thread.sleep(500);
-        final long totalConsumed = consumed.get();
+    @Test
+    public void benchmark500TypesBlocking_half() throws Exception {
+        runBenchmark("500-types-blocking-half", 500, 250, 50_000,
+            BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true);
+    }
+
+    @Test
+    public void benchmark1000TypesBlocking_half() throws Exception {
+        runBenchmark("1000-types-blocking-half", 1000, 500, 50_000,
+            BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true);
+    }
+
+    @Test
+    public void benchmark2000TypesBlocking_half() throws Exception {
+        runBenchmark("2000-types-blocking-half", 2000, 1000, 50_000,
+            BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true);
+    }
+
+    @Test
+    public void benchmark500Types_adaptive() throws Exception {
+        runAdaptiveBenchmark("500-types-adaptive", 500, 
BufferStrategy.IF_POSSIBLE);
+    }
+
+    @Test
+    public void benchmark1000Types_adaptive() throws Exception {
+        runAdaptiveBenchmark("1000-types-adaptive", 1000, 
BufferStrategy.IF_POSSIBLE);
+    }
+
+    @Test
+    public void benchmark2000Types_adaptive() throws Exception {
+        runAdaptiveBenchmark("2000-types-adaptive", 2000, 
BufferStrategy.IF_POSSIBLE);
+    }
+
+    @Test
+    public void benchmark500TypesBlocking_adaptive() throws Exception {
+        runAdaptiveBenchmark("500-types-blocking-adaptive", 500, 
BufferStrategy.BLOCKING);
+    }
+
+    @Test
+    public void benchmark1000TypesBlocking_adaptive() throws Exception {
+        runAdaptiveBenchmark("1000-types-blocking-adaptive", 1000, 
BufferStrategy.BLOCKING);
+    }
+
+    @Test
+    public void benchmark2000TypesBlocking_adaptive() throws Exception {
+        runAdaptiveBenchmark("2000-types-blocking-adaptive", 2000, 
BufferStrategy.BLOCKING);
+    }
+
+    @Test
+    public void benchmark500Types_1to1() throws Exception {
+        runBenchmark("500-types-1to1", 500, 500, 50_000,
+            BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true);
+    }
+
+    @Test
+    public void benchmark1000Types_1to1() throws Exception {
+        runBenchmark("1000-types-1to1", 1000, 1000, 50_000,
+            BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true);
+    }
+
+    @Test
+    public void benchmark2000Types_1to1() throws Exception {
+        runBenchmark("2000-types-1to1", 2000, 2000, 50_000,
+            BufferStrategy.IF_POSSIBLE, TYPE_ID_SELECTOR, true);
+    }
+
+    @Test
+    public void benchmark500TypesBlocking_1to1() throws Exception {
+        runBenchmark("500-types-blocking-1to1", 500, 500, 50_000,
+            BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true);
+    }
+
+    @Test
+    public void benchmark1000TypesBlocking_1to1() throws Exception {
+        runBenchmark("1000-types-blocking-1to1", 1000, 1000, 50_000,
+            BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true);
+    }
 
-        printResults(label, typeCount, threads, partitions,
-            produced, totalConsumed, measureDuration, warmupConsumed);
+    @Test
+    public void benchmark2000TypesBlocking_1to1() throws Exception {
+        runBenchmark("2000-types-blocking-1to1", 2000, 2000, 50_000,
+            BufferStrategy.BLOCKING, TYPE_ID_SELECTOR, true);
     }
 
-    private void runDirectBenchmark(final String label,
-                                    final ThreadPolicy threads,
-                                    final PartitionPolicy partitions,
-                                    final BufferStrategy strategy) throws 
Exception {
+    private void runAdaptiveBenchmark(final String label, final int typeCount,
+                                      final BufferStrategy strategy) throws 
Exception {
+        // adaptive(): threshold = threadCount * 25 = 200
+        //   500 types → 350p (200 + 300/2)
+        //  1000 types → 600p (200 + 800/2)
+        //  2000 types → 1100p (200 + 1800/2)
+        final int partitionCount = PartitionPolicy.adaptive()
+            .resolve(THREADS.resolve(), typeCount);
+        runBenchmark(label, typeCount, partitionCount, 50_000,
+            strategy, TYPE_ID_SELECTOR, true);
+    }
+
+    private void runBenchmark(final String label, final int typeCount,
+                              final int partitionCount, final int bufferSize,
+                              final BufferStrategy strategy) throws Exception {
+        runBenchmark(label, typeCount, partitionCount, bufferSize, strategy, 
null, true);
+    }
+
+    private void runBenchmark(final String label, final int typeCount,
+                              final int partitionCount, final int bufferSize,
+                              final BufferStrategy strategy,
+                              final 
PartitionSelector<BenchmarkMetricTypes.TypedMetric> selector,
+                              final boolean ignored) throws Exception {
         final AtomicLong consumed = new AtomicLong(0);
+        final PartitionPolicy partitions = 
PartitionPolicy.fixed(partitionCount);
 
-        final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue = 
BatchQueueManager.create(
-            "bench-" + label,
+        final 
BatchQueueConfig.BatchQueueConfigBuilder<BenchmarkMetricTypes.TypedMetric> 
configBuilder =
             BatchQueueConfig.<BenchmarkMetricTypes.TypedMetric>builder()
-                .threads(threads)
+                .threads(THREADS)
                 .partitions(partitions)
-                .bufferSize(50_000)
+                .bufferSize(bufferSize)
                 .strategy(strategy)
-                .consumer(data -> consumed.addAndGet(data.size()))
                 .minIdleMs(1)
-                .maxIdleMs(50)
-                .build());
+                .maxIdleMs(50);
+        if (selector != null) {
+            configBuilder.partitionSelector(selector);
+        }
+
+        final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue = 
BatchQueueManager.create(
+            "bench-" + label, configBuilder.build());
+
+        for (int t = 0; t < typeCount; t++) {
+            queue.addHandler(BenchmarkMetricTypes.CLASSES[t],
+                (HandlerConsumer<BenchmarkMetricTypes.TypedMetric>) data ->
+                    consumed.addAndGet(data.size()));
+        }
 
         // Warmup
         final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 
1000L;
-        runProducers(queue, 1, warmupEnd);
+        runProducers(queue, typeCount, PRODUCER_THREADS, warmupEnd);
         Thread.sleep(200);
-        final long warmupConsumed = consumed.get();
         consumed.set(0);
 
         // Measure
         final long measureStart = System.currentTimeMillis();
         final long measureEnd = measureStart + MEASURE_SECONDS * 1000L;
-        final long produced = runProducers(queue, 1, measureEnd);
+        final long produced = runProducers(queue, typeCount, PRODUCER_THREADS, 
measureEnd);
         final long measureDuration = System.currentTimeMillis() - measureStart;
 
         Thread.sleep(500);
         final long totalConsumed = consumed.get();
 
-        printResults(label, 1, threads, partitions,
-            produced, totalConsumed, measureDuration, warmupConsumed);
+        log.info("\n=== BatchQueue Benchmark: {} ===\n"
+                + "  Types:       {}\n"
+                + "  Threads:     {}\n"
+                + "  Partitions:  {}\n"
+                + "  BufferSize:  {}\n"
+                + "  Strategy:    {}\n"
+                + "  Producers:   {}\n"
+                + "  Duration:    {} ms\n"
+                + "  Produced:    {}\n"
+                + "  Consumed:    {}\n"
+                + "  Consume rate:  {} items/sec\n"
+                + "  Drop rate:     {}%\n",
+            label, typeCount, THREADS, partitions, bufferSize, strategy, 
PRODUCER_THREADS,
+            measureDuration,
+            String.format("%,d", produced), String.format("%,d", 
totalConsumed),
+            String.format("%,.0f", totalConsumed * 1000.0 / measureDuration),
+            String.format("%.2f", produced > 0
+                ? (produced - totalConsumed) * 100.0 / produced : 0));
     }
 
     private long runProducers(final 
BatchQueue<BenchmarkMetricTypes.TypedMetric> queue,
-                              final int typeCount,
+                              final int typeCount, final int producerCount,
                               final long endTimeMs) throws 
InterruptedException {
         final AtomicLong totalProduced = new AtomicLong(0);
-        final CountDownLatch done = new CountDownLatch(PRODUCER_THREADS);
+        final CountDownLatch done = new CountDownLatch(producerCount);
 
-        for (int p = 0; p < PRODUCER_THREADS; p++) {
+        for (int p = 0; p < producerCount; p++) {
             final int producerIndex = p;
             final Thread thread = new Thread(() -> {
                 long count = 0;
@@ -362,39 +439,11 @@ public class BatchQueueBenchmark {
                 done.countDown();
             });
             thread.setName("Producer-" + producerIndex);
+            thread.setDaemon(true);
             thread.start();
         }
 
         done.await(MEASURE_SECONDS + 10, TimeUnit.SECONDS);
         return totalProduced.get();
     }
-
-    private void printResults(final String label, final int typeCount,
-                              final ThreadPolicy threads,
-                              final PartitionPolicy partitions,
-                              final long produced, final long consumed,
-                              final long durationMs, final long 
warmupConsumed) {
-        final double producedPerSec = produced * 1000.0 / durationMs;
-        final double consumedPerSec = consumed * 1000.0 / durationMs;
-        final double dropRate = produced > 0 ? (produced - consumed) * 100.0 / 
produced : 0;
-
-        log.info("\n=== BatchQueue Benchmark: {} ===\n"
-                + "  Types:       {}\n"
-                + "  Threads:     {}\n"
-                + "  Partitions:  {}\n"
-                + "  Producers:   {}\n"
-                + "  Duration:    {} ms\n"
-                + "  Warmup consumed: {}\n"
-                + "  Produced:    {}\n"
-                + "  Consumed:    {}\n"
-                + "  Produce rate:  {} items/sec\n"
-                + "  Consume rate:  {} items/sec\n"
-                + "  Drop rate:     {}%\n",
-            label, typeCount, threads, partitions, PRODUCER_THREADS,
-            durationMs, String.format("%,d", warmupConsumed),
-            String.format("%,d", produced), String.format("%,d", consumed),
-            String.format("%,.0f", producedPerSec),
-            String.format("%,.0f", consumedPerSec),
-            String.format("%.2f", dropRate));
-    }
 }
diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java
index 0717dae4fe..f74add79f9 100644
--- 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java
@@ -104,7 +104,7 @@ public class BatchQueueConfigTest {
             .build();
         assertEquals(10_000, config.getBufferSize());
         assertEquals(BufferStrategy.BLOCKING, config.getStrategy());
-        assertEquals(5, config.getMinIdleMs());
-        assertEquals(200, config.getMaxIdleMs());
+        assertEquals(1, config.getMinIdleMs());
+        assertEquals(50, config.getMaxIdleMs());
     }
 }
diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java
index 4f95544e40..a63bbdf400 100644
--- 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java
@@ -127,41 +127,6 @@ public class BatchQueueTest {
         assertEquals(50, receivedB.size());
     }
 
-    @Test
-    public void testHandlerMapWithManyTypes() {
-        // Simulate the 40-type scenario
-        final int typeCount = 40;
-        final int itemsPerType = 100;
-        final AtomicInteger totalReceived = new AtomicInteger(0);
-
-        final BatchQueue<Object> queue = 
BatchQueueManager.create("many-types-test",
-            BatchQueueConfig.<Object>builder()
-                .threads(ThreadPolicy.fixed(2))
-                .partitions(PartitionPolicy.threadMultiply(2))
-                .bufferSize(10_000)
-                .build());
-
-        // Register one handler per "type" using MetricA as representative
-        // In real use, each type would be a different class.
-        // Here we use a direct consumer to simplify.
-        // We'll test the actual handler-map routing with fewer types above.
-        // This test validates throughput with many items.
-        final List<Object> received = new CopyOnWriteArrayList<>();
-        queue.addHandler(MetricA.class, data -> {
-            totalReceived.addAndGet(data.size());
-            received.addAll(data);
-        });
-
-        for (int i = 0; i < typeCount * itemsPerType; i++) {
-            queue.produce(new MetricA("item-" + i));
-        }
-
-        Awaitility.await().atMost(10, TimeUnit.SECONDS)
-            .until(() -> totalReceived.get() == typeCount * itemsPerType);
-
-        assertEquals(typeCount * itemsPerType, totalReceived.get());
-    }
-
     // --- Partition assignment ---
 
     @Test
@@ -233,6 +198,43 @@ public class BatchQueueTest {
         }
     }
 
+    @Test
+    public void testAdaptivePartitionGrowsWithHandlers() {
+        final BatchQueue<Object> queue = 
BatchQueueManager.create("adaptive-test",
+            BatchQueueConfig.<Object>builder()
+                .threads(ThreadPolicy.fixed(8))
+                .partitions(PartitionPolicy.adaptive())
+                .bufferSize(100)
+                .build());
+
+        // Initial: 8 partitions (threadCount)
+        assertEquals(8, queue.getPartitionCount());
+
+        // Register 500 handlers — threshold = 8*25 = 200, so 200 + 300/2 = 350
+        for (int i = 0; i < 500; i++) {
+            queue.addHandler(BenchmarkMetricTypes.CLASSES[i], data -> { });
+        }
+
+        assertEquals(350, queue.getPartitionCount());
+    }
+
+    @Test
+    public void testAdaptiveBelowThresholdIs1to1() {
+        final BatchQueue<Object> queue = 
BatchQueueManager.create("adaptive-below",
+            BatchQueueConfig.<Object>builder()
+                .threads(ThreadPolicy.fixed(8))
+                .partitions(PartitionPolicy.adaptive())
+                .bufferSize(100)
+                .build());
+
+        // Register 100 handlers — below threshold (200), so 1:1
+        for (int i = 0; i < 100; i++) {
+            queue.addHandler(BenchmarkMetricTypes.CLASSES[i], data -> { });
+        }
+
+        assertEquals(100, queue.getPartitionCount());
+    }
+
     // --- Shared scheduler ---
 
     @Test
@@ -298,17 +300,13 @@ public class BatchQueueTest {
                 .build());
 
         // Produce enough to fill the buffer while consumer is blocked
-        // First batch will be drained and block the consumer.
-        // Wait for it to be picked up, then fill the buffer.
         Awaitility.await().atMost(2, TimeUnit.SECONDS)
             .pollInterval(10, TimeUnit.MILLISECONDS)
             .until(() -> {
-                // Keep producing — once consumer is blocked, buffer fills up
                 queue.produce("x");
                 return !queue.produce("overflow");
             });
 
-        // At least one produce should have returned false (dropped)
         blockLatch.countDown();
     }
 
@@ -339,7 +337,6 @@ public class BatchQueueTest {
                 .threads(ThreadPolicy.fixed(1))
                 .consumer(data -> {
                     if (consumeCalls.getAndIncrement() == 0) {
-                        // Block on first consume to let items accumulate
                         try {
                             blockLatch.await(2, TimeUnit.SECONDS);
                         } catch (final InterruptedException e) {
@@ -352,7 +349,6 @@ public class BatchQueueTest {
                 .strategy(BufferStrategy.IF_POSSIBLE)
                 .build());
 
-        // Produce items
         int produced = 0;
         for (int i = 0; i < 500; i++) {
             if (queue.produce("item-" + i)) {
@@ -360,12 +356,10 @@ public class BatchQueueTest {
             }
         }
 
-        // Unblock consumer and immediately shutdown — should drain remaining
         blockLatch.countDown();
-        Thread.sleep(50); // Let the drain loop pick up some
+        Thread.sleep(50);
         queue.shutdown();
 
-        // After shutdown, all produced items should be consumed
         assertEquals(produced, received.size());
     }
 
@@ -381,7 +375,6 @@ public class BatchQueueTest {
                 .consumer(new HandlerConsumer<String>() {
                     @Override
                     public void consume(final List<String> data) {
-                        // no-op
                     }
 
                     @Override
@@ -392,7 +385,6 @@ public class BatchQueueTest {
                 .bufferSize(100)
                 .build());
 
-        // Queue starts with no data, so idle should be called quickly
         Awaitility.await().atMost(3, TimeUnit.SECONDS)
             .until(() -> idleCalls.get() > 0);
 
@@ -461,9 +453,6 @@ public class BatchQueueTest {
 
     @Test
     public void testAdaptiveBackoffIncreasesDelay() throws Exception {
-        // When queue is idle, consecutive idle cycles increase.
-        // We verify by checking that with no data, the queue
-        // doesn't spin rapidly — idle callbacks should slow down.
         final List<Long> idleTimestamps = new CopyOnWriteArrayList<>();
 
         BatchQueueManager.create("backoff-test",
@@ -484,11 +473,9 @@ public class BatchQueueTest {
                 .bufferSize(100)
                 .build());
 
-        // Wait for several idle cycles
         Awaitility.await().atMost(10, TimeUnit.SECONDS)
             .until(() -> idleTimestamps.size() >= 8);
 
-        // Verify that intervals increase (later gaps should be larger than 
early gaps)
         final long earlyGap = idleTimestamps.get(1) - idleTimestamps.get(0);
         final long laterGap = idleTimestamps.get(idleTimestamps.size() - 1)
             - idleTimestamps.get(idleTimestamps.size() - 2);
@@ -521,21 +508,17 @@ public class BatchQueueTest {
                 .bufferSize(100)
                 .build());
 
-        // Let backoff build up
         Awaitility.await().atMost(5, TimeUnit.SECONDS)
             .until(() -> idleTimestamps.size() >= 5);
 
-        // Now send data — this should reset the backoff
         final int beforeIdles = idleTimestamps.size();
         for (int i = 0; i < 10; i++) {
             queue.produce("reset-" + i);
         }
 
-        // Wait for the data to be consumed
         Awaitility.await().atMost(5, TimeUnit.SECONDS)
             .until(() -> consumeCount.get() >= 10);
 
-        // After data is consumed, next idle should come quickly (backoff 
reset)
         final long postDataTime = System.currentTimeMillis();
         Awaitility.await().atMost(3, TimeUnit.SECONDS)
             .until(() -> {
@@ -548,26 +531,4 @@ public class BatchQueueTest {
             });
     }
 
-    // --- Round robin produce ---
-
-    @Test
-    public void testRoundRobinDistributesAcrossPartitions() {
-        final List<String> received = new CopyOnWriteArrayList<>();
-        final BatchQueue<String> queue = 
BatchQueueManager.create("roundrobin-test",
-            BatchQueueConfig.<String>builder()
-                .threads(ThreadPolicy.fixed(2))
-                .partitions(PartitionPolicy.fixed(4))
-                .consumer(data -> received.addAll(data))
-                .bufferSize(1000)
-                .build());
-
-        for (int i = 0; i < 400; i++) {
-            queue.produce("item-" + i);
-        }
-
-        Awaitility.await().atMost(5, TimeUnit.SECONDS)
-            .until(() -> received.size() == 400);
-
-        assertEquals(400, received.size());
-    }
 }
diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java
index b055c36f0a..910c5b0ccf 100644
--- 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java
@@ -18,9 +18,22 @@
 
 package org.apache.skywalking.oap.server.library.batchqueue;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+
 /**
- * 200 distinct metric subclasses for benchmark handler-map dispatch testing.
- * Each class represents a distinct metric type that gets routed to its own 
handler.
+ * 2000 distinct metric subclasses generated at class-load time for benchmark
+ * handler-map dispatch testing. Each class represents a distinct metric type
+ * that gets routed to its own handler via {@code data.getClass().hashCode()}.
+ *
+ * <p>Classes are generated as bytecode at runtime using
+ * {@link MethodHandles.Lookup#defineClass(byte[])} to avoid a 6000+ line
+ * source file. Each generated class {@code Dyn0..Dyn1999} extends
+ * {@link TypedMetric} with a constructor that calls {@code super(typeId, v)}.
  */
 @SuppressWarnings("all")
 class BenchmarkMetricTypes {
@@ -40,610 +53,151 @@ class BenchmarkMetricTypes {
         TypedMetric create(long value);
     }
 
-    static final int MAX_TYPES = 200;
+    static final int MAX_TYPES = 2000;
     static final Class<? extends TypedMetric>[] CLASSES = new Class[MAX_TYPES];
     static final MetricFactory[] FACTORIES = new MetricFactory[MAX_TYPES];
 
+    private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup();
+    private static final String SUPER_INTERNAL =
+        
"org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes$TypedMetric";
+
     static {
-        CLASSES[0] = M0.class; FACTORIES[0] = v -> new M0(v);
-        CLASSES[1] = M1.class; FACTORIES[1] = v -> new M1(v);
-        CLASSES[2] = M2.class; FACTORIES[2] = v -> new M2(v);
-        CLASSES[3] = M3.class; FACTORIES[3] = v -> new M3(v);
-        CLASSES[4] = M4.class; FACTORIES[4] = v -> new M4(v);
-        CLASSES[5] = M5.class; FACTORIES[5] = v -> new M5(v);
-        CLASSES[6] = M6.class; FACTORIES[6] = v -> new M6(v);
-        CLASSES[7] = M7.class; FACTORIES[7] = v -> new M7(v);
-        CLASSES[8] = M8.class; FACTORIES[8] = v -> new M8(v);
-        CLASSES[9] = M9.class; FACTORIES[9] = v -> new M9(v);
-        CLASSES[10] = M10.class; FACTORIES[10] = v -> new M10(v);
-        CLASSES[11] = M11.class; FACTORIES[11] = v -> new M11(v);
-        CLASSES[12] = M12.class; FACTORIES[12] = v -> new M12(v);
-        CLASSES[13] = M13.class; FACTORIES[13] = v -> new M13(v);
-        CLASSES[14] = M14.class; FACTORIES[14] = v -> new M14(v);
-        CLASSES[15] = M15.class; FACTORIES[15] = v -> new M15(v);
-        CLASSES[16] = M16.class; FACTORIES[16] = v -> new M16(v);
-        CLASSES[17] = M17.class; FACTORIES[17] = v -> new M17(v);
-        CLASSES[18] = M18.class; FACTORIES[18] = v -> new M18(v);
-        CLASSES[19] = M19.class; FACTORIES[19] = v -> new M19(v);
-        CLASSES[20] = M20.class; FACTORIES[20] = v -> new M20(v);
-        CLASSES[21] = M21.class; FACTORIES[21] = v -> new M21(v);
-        CLASSES[22] = M22.class; FACTORIES[22] = v -> new M22(v);
-        CLASSES[23] = M23.class; FACTORIES[23] = v -> new M23(v);
-        CLASSES[24] = M24.class; FACTORIES[24] = v -> new M24(v);
-        CLASSES[25] = M25.class; FACTORIES[25] = v -> new M25(v);
-        CLASSES[26] = M26.class; FACTORIES[26] = v -> new M26(v);
-        CLASSES[27] = M27.class; FACTORIES[27] = v -> new M27(v);
-        CLASSES[28] = M28.class; FACTORIES[28] = v -> new M28(v);
-        CLASSES[29] = M29.class; FACTORIES[29] = v -> new M29(v);
-        CLASSES[30] = M30.class; FACTORIES[30] = v -> new M30(v);
-        CLASSES[31] = M31.class; FACTORIES[31] = v -> new M31(v);
-        CLASSES[32] = M32.class; FACTORIES[32] = v -> new M32(v);
-        CLASSES[33] = M33.class; FACTORIES[33] = v -> new M33(v);
-        CLASSES[34] = M34.class; FACTORIES[34] = v -> new M34(v);
-        CLASSES[35] = M35.class; FACTORIES[35] = v -> new M35(v);
-        CLASSES[36] = M36.class; FACTORIES[36] = v -> new M36(v);
-        CLASSES[37] = M37.class; FACTORIES[37] = v -> new M37(v);
-        CLASSES[38] = M38.class; FACTORIES[38] = v -> new M38(v);
-        CLASSES[39] = M39.class; FACTORIES[39] = v -> new M39(v);
-        CLASSES[40] = M40.class; FACTORIES[40] = v -> new M40(v);
-        CLASSES[41] = M41.class; FACTORIES[41] = v -> new M41(v);
-        CLASSES[42] = M42.class; FACTORIES[42] = v -> new M42(v);
-        CLASSES[43] = M43.class; FACTORIES[43] = v -> new M43(v);
-        CLASSES[44] = M44.class; FACTORIES[44] = v -> new M44(v);
-        CLASSES[45] = M45.class; FACTORIES[45] = v -> new M45(v);
-        CLASSES[46] = M46.class; FACTORIES[46] = v -> new M46(v);
-        CLASSES[47] = M47.class; FACTORIES[47] = v -> new M47(v);
-        CLASSES[48] = M48.class; FACTORIES[48] = v -> new M48(v);
-        CLASSES[49] = M49.class; FACTORIES[49] = v -> new M49(v);
-        CLASSES[50] = M50.class; FACTORIES[50] = v -> new M50(v);
-        CLASSES[51] = M51.class; FACTORIES[51] = v -> new M51(v);
-        CLASSES[52] = M52.class; FACTORIES[52] = v -> new M52(v);
-        CLASSES[53] = M53.class; FACTORIES[53] = v -> new M53(v);
-        CLASSES[54] = M54.class; FACTORIES[54] = v -> new M54(v);
-        CLASSES[55] = M55.class; FACTORIES[55] = v -> new M55(v);
-        CLASSES[56] = M56.class; FACTORIES[56] = v -> new M56(v);
-        CLASSES[57] = M57.class; FACTORIES[57] = v -> new M57(v);
-        CLASSES[58] = M58.class; FACTORIES[58] = v -> new M58(v);
-        CLASSES[59] = M59.class; FACTORIES[59] = v -> new M59(v);
-        CLASSES[60] = M60.class; FACTORIES[60] = v -> new M60(v);
-        CLASSES[61] = M61.class; FACTORIES[61] = v -> new M61(v);
-        CLASSES[62] = M62.class; FACTORIES[62] = v -> new M62(v);
-        CLASSES[63] = M63.class; FACTORIES[63] = v -> new M63(v);
-        CLASSES[64] = M64.class; FACTORIES[64] = v -> new M64(v);
-        CLASSES[65] = M65.class; FACTORIES[65] = v -> new M65(v);
-        CLASSES[66] = M66.class; FACTORIES[66] = v -> new M66(v);
-        CLASSES[67] = M67.class; FACTORIES[67] = v -> new M67(v);
-        CLASSES[68] = M68.class; FACTORIES[68] = v -> new M68(v);
-        CLASSES[69] = M69.class; FACTORIES[69] = v -> new M69(v);
-        CLASSES[70] = M70.class; FACTORIES[70] = v -> new M70(v);
-        CLASSES[71] = M71.class; FACTORIES[71] = v -> new M71(v);
-        CLASSES[72] = M72.class; FACTORIES[72] = v -> new M72(v);
-        CLASSES[73] = M73.class; FACTORIES[73] = v -> new M73(v);
-        CLASSES[74] = M74.class; FACTORIES[74] = v -> new M74(v);
-        CLASSES[75] = M75.class; FACTORIES[75] = v -> new M75(v);
-        CLASSES[76] = M76.class; FACTORIES[76] = v -> new M76(v);
-        CLASSES[77] = M77.class; FACTORIES[77] = v -> new M77(v);
-        CLASSES[78] = M78.class; FACTORIES[78] = v -> new M78(v);
-        CLASSES[79] = M79.class; FACTORIES[79] = v -> new M79(v);
-        CLASSES[80] = M80.class; FACTORIES[80] = v -> new M80(v);
-        CLASSES[81] = M81.class; FACTORIES[81] = v -> new M81(v);
-        CLASSES[82] = M82.class; FACTORIES[82] = v -> new M82(v);
-        CLASSES[83] = M83.class; FACTORIES[83] = v -> new M83(v);
-        CLASSES[84] = M84.class; FACTORIES[84] = v -> new M84(v);
-        CLASSES[85] = M85.class; FACTORIES[85] = v -> new M85(v);
-        CLASSES[86] = M86.class; FACTORIES[86] = v -> new M86(v);
-        CLASSES[87] = M87.class; FACTORIES[87] = v -> new M87(v);
-        CLASSES[88] = M88.class; FACTORIES[88] = v -> new M88(v);
-        CLASSES[89] = M89.class; FACTORIES[89] = v -> new M89(v);
-        CLASSES[90] = M90.class; FACTORIES[90] = v -> new M90(v);
-        CLASSES[91] = M91.class; FACTORIES[91] = v -> new M91(v);
-        CLASSES[92] = M92.class; FACTORIES[92] = v -> new M92(v);
-        CLASSES[93] = M93.class; FACTORIES[93] = v -> new M93(v);
-        CLASSES[94] = M94.class; FACTORIES[94] = v -> new M94(v);
-        CLASSES[95] = M95.class; FACTORIES[95] = v -> new M95(v);
-        CLASSES[96] = M96.class; FACTORIES[96] = v -> new M96(v);
-        CLASSES[97] = M97.class; FACTORIES[97] = v -> new M97(v);
-        CLASSES[98] = M98.class; FACTORIES[98] = v -> new M98(v);
-        CLASSES[99] = M99.class; FACTORIES[99] = v -> new M99(v);
-        CLASSES[100] = M100.class; FACTORIES[100] = v -> new M100(v);
-        CLASSES[101] = M101.class; FACTORIES[101] = v -> new M101(v);
-        CLASSES[102] = M102.class; FACTORIES[102] = v -> new M102(v);
-        CLASSES[103] = M103.class; FACTORIES[103] = v -> new M103(v);
-        CLASSES[104] = M104.class; FACTORIES[104] = v -> new M104(v);
-        CLASSES[105] = M105.class; FACTORIES[105] = v -> new M105(v);
-        CLASSES[106] = M106.class; FACTORIES[106] = v -> new M106(v);
-        CLASSES[107] = M107.class; FACTORIES[107] = v -> new M107(v);
-        CLASSES[108] = M108.class; FACTORIES[108] = v -> new M108(v);
-        CLASSES[109] = M109.class; FACTORIES[109] = v -> new M109(v);
-        CLASSES[110] = M110.class; FACTORIES[110] = v -> new M110(v);
-        CLASSES[111] = M111.class; FACTORIES[111] = v -> new M111(v);
-        CLASSES[112] = M112.class; FACTORIES[112] = v -> new M112(v);
-        CLASSES[113] = M113.class; FACTORIES[113] = v -> new M113(v);
-        CLASSES[114] = M114.class; FACTORIES[114] = v -> new M114(v);
-        CLASSES[115] = M115.class; FACTORIES[115] = v -> new M115(v);
-        CLASSES[116] = M116.class; FACTORIES[116] = v -> new M116(v);
-        CLASSES[117] = M117.class; FACTORIES[117] = v -> new M117(v);
-        CLASSES[118] = M118.class; FACTORIES[118] = v -> new M118(v);
-        CLASSES[119] = M119.class; FACTORIES[119] = v -> new M119(v);
-        CLASSES[120] = M120.class; FACTORIES[120] = v -> new M120(v);
-        CLASSES[121] = M121.class; FACTORIES[121] = v -> new M121(v);
-        CLASSES[122] = M122.class; FACTORIES[122] = v -> new M122(v);
-        CLASSES[123] = M123.class; FACTORIES[123] = v -> new M123(v);
-        CLASSES[124] = M124.class; FACTORIES[124] = v -> new M124(v);
-        CLASSES[125] = M125.class; FACTORIES[125] = v -> new M125(v);
-        CLASSES[126] = M126.class; FACTORIES[126] = v -> new M126(v);
-        CLASSES[127] = M127.class; FACTORIES[127] = v -> new M127(v);
-        CLASSES[128] = M128.class; FACTORIES[128] = v -> new M128(v);
-        CLASSES[129] = M129.class; FACTORIES[129] = v -> new M129(v);
-        CLASSES[130] = M130.class; FACTORIES[130] = v -> new M130(v);
-        CLASSES[131] = M131.class; FACTORIES[131] = v -> new M131(v);
-        CLASSES[132] = M132.class; FACTORIES[132] = v -> new M132(v);
-        CLASSES[133] = M133.class; FACTORIES[133] = v -> new M133(v);
-        CLASSES[134] = M134.class; FACTORIES[134] = v -> new M134(v);
-        CLASSES[135] = M135.class; FACTORIES[135] = v -> new M135(v);
-        CLASSES[136] = M136.class; FACTORIES[136] = v -> new M136(v);
-        CLASSES[137] = M137.class; FACTORIES[137] = v -> new M137(v);
-        CLASSES[138] = M138.class; FACTORIES[138] = v -> new M138(v);
-        CLASSES[139] = M139.class; FACTORIES[139] = v -> new M139(v);
-        CLASSES[140] = M140.class; FACTORIES[140] = v -> new M140(v);
-        CLASSES[141] = M141.class; FACTORIES[141] = v -> new M141(v);
-        CLASSES[142] = M142.class; FACTORIES[142] = v -> new M142(v);
-        CLASSES[143] = M143.class; FACTORIES[143] = v -> new M143(v);
-        CLASSES[144] = M144.class; FACTORIES[144] = v -> new M144(v);
-        CLASSES[145] = M145.class; FACTORIES[145] = v -> new M145(v);
-        CLASSES[146] = M146.class; FACTORIES[146] = v -> new M146(v);
-        CLASSES[147] = M147.class; FACTORIES[147] = v -> new M147(v);
-        CLASSES[148] = M148.class; FACTORIES[148] = v -> new M148(v);
-        CLASSES[149] = M149.class; FACTORIES[149] = v -> new M149(v);
-        CLASSES[150] = M150.class; FACTORIES[150] = v -> new M150(v);
-        CLASSES[151] = M151.class; FACTORIES[151] = v -> new M151(v);
-        CLASSES[152] = M152.class; FACTORIES[152] = v -> new M152(v);
-        CLASSES[153] = M153.class; FACTORIES[153] = v -> new M153(v);
-        CLASSES[154] = M154.class; FACTORIES[154] = v -> new M154(v);
-        CLASSES[155] = M155.class; FACTORIES[155] = v -> new M155(v);
-        CLASSES[156] = M156.class; FACTORIES[156] = v -> new M156(v);
-        CLASSES[157] = M157.class; FACTORIES[157] = v -> new M157(v);
-        CLASSES[158] = M158.class; FACTORIES[158] = v -> new M158(v);
-        CLASSES[159] = M159.class; FACTORIES[159] = v -> new M159(v);
-        CLASSES[160] = M160.class; FACTORIES[160] = v -> new M160(v);
-        CLASSES[161] = M161.class; FACTORIES[161] = v -> new M161(v);
-        CLASSES[162] = M162.class; FACTORIES[162] = v -> new M162(v);
-        CLASSES[163] = M163.class; FACTORIES[163] = v -> new M163(v);
-        CLASSES[164] = M164.class; FACTORIES[164] = v -> new M164(v);
-        CLASSES[165] = M165.class; FACTORIES[165] = v -> new M165(v);
-        CLASSES[166] = M166.class; FACTORIES[166] = v -> new M166(v);
-        CLASSES[167] = M167.class; FACTORIES[167] = v -> new M167(v);
-        CLASSES[168] = M168.class; FACTORIES[168] = v -> new M168(v);
-        CLASSES[169] = M169.class; FACTORIES[169] = v -> new M169(v);
-        CLASSES[170] = M170.class; FACTORIES[170] = v -> new M170(v);
-        CLASSES[171] = M171.class; FACTORIES[171] = v -> new M171(v);
-        CLASSES[172] = M172.class; FACTORIES[172] = v -> new M172(v);
-        CLASSES[173] = M173.class; FACTORIES[173] = v -> new M173(v);
-        CLASSES[174] = M174.class; FACTORIES[174] = v -> new M174(v);
-        CLASSES[175] = M175.class; FACTORIES[175] = v -> new M175(v);
-        CLASSES[176] = M176.class; FACTORIES[176] = v -> new M176(v);
-        CLASSES[177] = M177.class; FACTORIES[177] = v -> new M177(v);
-        CLASSES[178] = M178.class; FACTORIES[178] = v -> new M178(v);
-        CLASSES[179] = M179.class; FACTORIES[179] = v -> new M179(v);
-        CLASSES[180] = M180.class; FACTORIES[180] = v -> new M180(v);
-        CLASSES[181] = M181.class; FACTORIES[181] = v -> new M181(v);
-        CLASSES[182] = M182.class; FACTORIES[182] = v -> new M182(v);
-        CLASSES[183] = M183.class; FACTORIES[183] = v -> new M183(v);
-        CLASSES[184] = M184.class; FACTORIES[184] = v -> new M184(v);
-        CLASSES[185] = M185.class; FACTORIES[185] = v -> new M185(v);
-        CLASSES[186] = M186.class; FACTORIES[186] = v -> new M186(v);
-        CLASSES[187] = M187.class; FACTORIES[187] = v -> new M187(v);
-        CLASSES[188] = M188.class; FACTORIES[188] = v -> new M188(v);
-        CLASSES[189] = M189.class; FACTORIES[189] = v -> new M189(v);
-        CLASSES[190] = M190.class; FACTORIES[190] = v -> new M190(v);
-        CLASSES[191] = M191.class; FACTORIES[191] = v -> new M191(v);
-        CLASSES[192] = M192.class; FACTORIES[192] = v -> new M192(v);
-        CLASSES[193] = M193.class; FACTORIES[193] = v -> new M193(v);
-        CLASSES[194] = M194.class; FACTORIES[194] = v -> new M194(v);
-        CLASSES[195] = M195.class; FACTORIES[195] = v -> new M195(v);
-        CLASSES[196] = M196.class; FACTORIES[196] = v -> new M196(v);
-        CLASSES[197] = M197.class; FACTORIES[197] = v -> new M197(v);
-        CLASSES[198] = M198.class; FACTORIES[198] = v -> new M198(v);
-        CLASSES[199] = M199.class; FACTORIES[199] = v -> new M199(v);
+        try {
+            for (int i = 0; i < MAX_TYPES; i++) {
+                final String name = 
"org/apache/skywalking/oap/server/library/batchqueue/Dyn" + i;
+                final byte[] bytes = buildClassBytes(name, SUPER_INTERNAL, i);
+                final Class<? extends TypedMetric> cls =
+                    (Class<? extends TypedMetric>) LOOKUP.defineClass(bytes);
+                CLASSES[i] = cls;
+                final MethodHandle mh = LOOKUP.findConstructor(
+                    cls, MethodType.methodType(void.class, long.class));
+                FACTORIES[i] = v -> {
+                    try {
+                        return (TypedMetric) mh.invoke(v);
+                    } catch (final Throwable e) {
+                        throw new RuntimeException(e);
+                    }
+                };
+            }
+        } catch (final Exception e) {
+            throw new ExceptionInInitializerError(e);
+        }
     }
 
-    static class M0 extends TypedMetric { M0(final long v) { super(0, v); } }
-
-    static class M1 extends TypedMetric { M1(final long v) { super(1, v); } }
-
-    static class M2 extends TypedMetric { M2(final long v) { super(2, v); } }
-
-    static class M3 extends TypedMetric { M3(final long v) { super(3, v); } }
-
-    static class M4 extends TypedMetric { M4(final long v) { super(4, v); } }
-
-    static class M5 extends TypedMetric { M5(final long v) { super(5, v); } }
-
-    static class M6 extends TypedMetric { M6(final long v) { super(6, v); } }
-
-    static class M7 extends TypedMetric { M7(final long v) { super(7, v); } }
-
-    static class M8 extends TypedMetric { M8(final long v) { super(8, v); } }
-
-    static class M9 extends TypedMetric { M9(final long v) { super(9, v); } }
-
-    static class M10 extends TypedMetric { M10(final long v) { super(10, v); } 
}
-
-    static class M11 extends TypedMetric { M11(final long v) { super(11, v); } 
}
-
-    static class M12 extends TypedMetric { M12(final long v) { super(12, v); } 
}
-
-    static class M13 extends TypedMetric { M13(final long v) { super(13, v); } 
}
-
-    static class M14 extends TypedMetric { M14(final long v) { super(14, v); } 
}
-
-    static class M15 extends TypedMetric { M15(final long v) { super(15, v); } 
}
-
-    static class M16 extends TypedMetric { M16(final long v) { super(16, v); } 
}
-
-    static class M17 extends TypedMetric { M17(final long v) { super(17, v); } 
}
-
-    static class M18 extends TypedMetric { M18(final long v) { super(18, v); } 
}
-
-    static class M19 extends TypedMetric { M19(final long v) { super(19, v); } 
}
-
-    static class M20 extends TypedMetric { M20(final long v) { super(20, v); } 
}
-
-    static class M21 extends TypedMetric { M21(final long v) { super(21, v); } 
}
-
-    static class M22 extends TypedMetric { M22(final long v) { super(22, v); } 
}
-
-    static class M23 extends TypedMetric { M23(final long v) { super(23, v); } 
}
-
-    static class M24 extends TypedMetric { M24(final long v) { super(24, v); } 
}
-
-    static class M25 extends TypedMetric { M25(final long v) { super(25, v); } 
}
-
-    static class M26 extends TypedMetric { M26(final long v) { super(26, v); } 
}
-
-    static class M27 extends TypedMetric { M27(final long v) { super(27, v); } 
}
-
-    static class M28 extends TypedMetric { M28(final long v) { super(28, v); } 
}
-
-    static class M29 extends TypedMetric { M29(final long v) { super(29, v); } 
}
-
-    static class M30 extends TypedMetric { M30(final long v) { super(30, v); } 
}
-
-    static class M31 extends TypedMetric { M31(final long v) { super(31, v); } 
}
-
-    static class M32 extends TypedMetric { M32(final long v) { super(32, v); } 
}
-
-    static class M33 extends TypedMetric { M33(final long v) { super(33, v); } 
}
-
-    static class M34 extends TypedMetric { M34(final long v) { super(34, v); } 
}
-
-    static class M35 extends TypedMetric { M35(final long v) { super(35, v); } 
}
-
-    static class M36 extends TypedMetric { M36(final long v) { super(36, v); } 
}
-
-    static class M37 extends TypedMetric { M37(final long v) { super(37, v); } 
}
-
-    static class M38 extends TypedMetric { M38(final long v) { super(38, v); } 
}
-
-    static class M39 extends TypedMetric { M39(final long v) { super(39, v); } 
}
-
-    static class M40 extends TypedMetric { M40(final long v) { super(40, v); } 
}
-
-    static class M41 extends TypedMetric { M41(final long v) { super(41, v); } 
}
-
-    static class M42 extends TypedMetric { M42(final long v) { super(42, v); } 
}
-
-    static class M43 extends TypedMetric { M43(final long v) { super(43, v); } 
}
-
-    static class M44 extends TypedMetric { M44(final long v) { super(44, v); } 
}
-
-    static class M45 extends TypedMetric { M45(final long v) { super(45, v); } 
}
-
-    static class M46 extends TypedMetric { M46(final long v) { super(46, v); } 
}
-
-    static class M47 extends TypedMetric { M47(final long v) { super(47, v); } 
}
-
-    static class M48 extends TypedMetric { M48(final long v) { super(48, v); } 
}
-
-    static class M49 extends TypedMetric { M49(final long v) { super(49, v); } 
}
-
-    static class M50 extends TypedMetric { M50(final long v) { super(50, v); } 
}
-
-    static class M51 extends TypedMetric { M51(final long v) { super(51, v); } 
}
-
-    static class M52 extends TypedMetric { M52(final long v) { super(52, v); } 
}
-
-    static class M53 extends TypedMetric { M53(final long v) { super(53, v); } 
}
-
-    static class M54 extends TypedMetric { M54(final long v) { super(54, v); } 
}
-
-    static class M55 extends TypedMetric { M55(final long v) { super(55, v); } 
}
-
-    static class M56 extends TypedMetric { M56(final long v) { super(56, v); } 
}
-
-    static class M57 extends TypedMetric { M57(final long v) { super(57, v); } 
}
-
-    static class M58 extends TypedMetric { M58(final long v) { super(58, v); } 
}
-
-    static class M59 extends TypedMetric { M59(final long v) { super(59, v); } 
}
-
-    static class M60 extends TypedMetric { M60(final long v) { super(60, v); } 
}
-
-    static class M61 extends TypedMetric { M61(final long v) { super(61, v); } 
}
-
-    static class M62 extends TypedMetric { M62(final long v) { super(62, v); } 
}
-
-    static class M63 extends TypedMetric { M63(final long v) { super(63, v); } 
}
-
-    static class M64 extends TypedMetric { M64(final long v) { super(64, v); } 
}
-
-    static class M65 extends TypedMetric { M65(final long v) { super(65, v); } 
}
-
-    static class M66 extends TypedMetric { M66(final long v) { super(66, v); } 
}
-
-    static class M67 extends TypedMetric { M67(final long v) { super(67, v); } 
}
-
-    static class M68 extends TypedMetric { M68(final long v) { super(68, v); } 
}
-
-    static class M69 extends TypedMetric { M69(final long v) { super(69, v); } 
}
-
-    static class M70 extends TypedMetric { M70(final long v) { super(70, v); } 
}
-
-    static class M71 extends TypedMetric { M71(final long v) { super(71, v); } 
}
-
-    static class M72 extends TypedMetric { M72(final long v) { super(72, v); } 
}
-
-    static class M73 extends TypedMetric { M73(final long v) { super(73, v); } 
}
-
-    static class M74 extends TypedMetric { M74(final long v) { super(74, v); } 
}
-
-    static class M75 extends TypedMetric { M75(final long v) { super(75, v); } 
}
-
-    static class M76 extends TypedMetric { M76(final long v) { super(76, v); } 
}
-
-    static class M77 extends TypedMetric { M77(final long v) { super(77, v); } 
}
-
-    static class M78 extends TypedMetric { M78(final long v) { super(78, v); } 
}
-
-    static class M79 extends TypedMetric { M79(final long v) { super(79, v); } 
}
-
-    static class M80 extends TypedMetric { M80(final long v) { super(80, v); } 
}
-
-    static class M81 extends TypedMetric { M81(final long v) { super(81, v); } 
}
-
-    static class M82 extends TypedMetric { M82(final long v) { super(82, v); } 
}
-
-    static class M83 extends TypedMetric { M83(final long v) { super(83, v); } 
}
-
-    static class M84 extends TypedMetric { M84(final long v) { super(84, v); } 
}
-
-    static class M85 extends TypedMetric { M85(final long v) { super(85, v); } 
}
-
-    static class M86 extends TypedMetric { M86(final long v) { super(86, v); } 
}
-
-    static class M87 extends TypedMetric { M87(final long v) { super(87, v); } 
}
-
-    static class M88 extends TypedMetric { M88(final long v) { super(88, v); } 
}
-
-    static class M89 extends TypedMetric { M89(final long v) { super(89, v); } 
}
-
-    static class M90 extends TypedMetric { M90(final long v) { super(90, v); } 
}
-
-    static class M91 extends TypedMetric { M91(final long v) { super(91, v); } 
}
-
-    static class M92 extends TypedMetric { M92(final long v) { super(92, v); } 
}
-
-    static class M93 extends TypedMetric { M93(final long v) { super(93, v); } 
}
-
-    static class M94 extends TypedMetric { M94(final long v) { super(94, v); } 
}
-
-    static class M95 extends TypedMetric { M95(final long v) { super(95, v); } 
}
-
-    static class M96 extends TypedMetric { M96(final long v) { super(96, v); } 
}
-
-    static class M97 extends TypedMetric { M97(final long v) { super(97, v); } 
}
-
-    static class M98 extends TypedMetric { M98(final long v) { super(98, v); } 
}
-
-    static class M99 extends TypedMetric { M99(final long v) { super(99, v); } 
}
-
-    static class M100 extends TypedMetric { M100(final long v) { super(100, 
v); } }
-
-    static class M101 extends TypedMetric { M101(final long v) { super(101, 
v); } }
-
-    static class M102 extends TypedMetric { M102(final long v) { super(102, 
v); } }
-
-    static class M103 extends TypedMetric { M103(final long v) { super(103, 
v); } }
-
-    static class M104 extends TypedMetric { M104(final long v) { super(104, 
v); } }
-
-    static class M105 extends TypedMetric { M105(final long v) { super(105, 
v); } }
-
-    static class M106 extends TypedMetric { M106(final long v) { super(106, 
v); } }
-
-    static class M107 extends TypedMetric { M107(final long v) { super(107, 
v); } }
-
-    static class M108 extends TypedMetric { M108(final long v) { super(108, 
v); } }
-
-    static class M109 extends TypedMetric { M109(final long v) { super(109, 
v); } }
-
-    static class M110 extends TypedMetric { M110(final long v) { super(110, 
v); } }
-
-    static class M111 extends TypedMetric { M111(final long v) { super(111, 
v); } }
-
-    static class M112 extends TypedMetric { M112(final long v) { super(112, 
v); } }
-
-    static class M113 extends TypedMetric { M113(final long v) { super(113, 
v); } }
-
-    static class M114 extends TypedMetric { M114(final long v) { super(114, 
v); } }
-
-    static class M115 extends TypedMetric { M115(final long v) { super(115, 
v); } }
-
-    static class M116 extends TypedMetric { M116(final long v) { super(116, 
v); } }
-
-    static class M117 extends TypedMetric { M117(final long v) { super(117, 
v); } }
-
-    static class M118 extends TypedMetric { M118(final long v) { super(118, 
v); } }
-
-    static class M119 extends TypedMetric { M119(final long v) { super(119, 
v); } }
-
-    static class M120 extends TypedMetric { M120(final long v) { super(120, 
v); } }
-
-    static class M121 extends TypedMetric { M121(final long v) { super(121, 
v); } }
-
-    static class M122 extends TypedMetric { M122(final long v) { super(122, 
v); } }
-
-    static class M123 extends TypedMetric { M123(final long v) { super(123, 
v); } }
-
-    static class M124 extends TypedMetric { M124(final long v) { super(124, 
v); } }
-
-    static class M125 extends TypedMetric { M125(final long v) { super(125, 
v); } }
-
-    static class M126 extends TypedMetric { M126(final long v) { super(126, 
v); } }
-
-    static class M127 extends TypedMetric { M127(final long v) { super(127, 
v); } }
-
-    static class M128 extends TypedMetric { M128(final long v) { super(128, 
v); } }
-
-    static class M129 extends TypedMetric { M129(final long v) { super(129, 
v); } }
-
-    static class M130 extends TypedMetric { M130(final long v) { super(130, 
v); } }
-
-    static class M131 extends TypedMetric { M131(final long v) { super(131, 
v); } }
-
-    static class M132 extends TypedMetric { M132(final long v) { super(132, 
v); } }
-
-    static class M133 extends TypedMetric { M133(final long v) { super(133, 
v); } }
-
-    static class M134 extends TypedMetric { M134(final long v) { super(134, 
v); } }
-
-    static class M135 extends TypedMetric { M135(final long v) { super(135, 
v); } }
-
-    static class M136 extends TypedMetric { M136(final long v) { super(136, 
v); } }
-
-    static class M137 extends TypedMetric { M137(final long v) { super(137, 
v); } }
-
-    static class M138 extends TypedMetric { M138(final long v) { super(138, 
v); } }
-
-    static class M139 extends TypedMetric { M139(final long v) { super(139, 
v); } }
-
-    static class M140 extends TypedMetric { M140(final long v) { super(140, 
v); } }
-
-    static class M141 extends TypedMetric { M141(final long v) { super(141, 
v); } }
-
-    static class M142 extends TypedMetric { M142(final long v) { super(142, 
v); } }
-
-    static class M143 extends TypedMetric { M143(final long v) { super(143, 
v); } }
-
-    static class M144 extends TypedMetric { M144(final long v) { super(144, 
v); } }
-
-    static class M145 extends TypedMetric { M145(final long v) { super(145, 
v); } }
-
-    static class M146 extends TypedMetric { M146(final long v) { super(146, 
v); } }
-
-    static class M147 extends TypedMetric { M147(final long v) { super(147, 
v); } }
-
-    static class M148 extends TypedMetric { M148(final long v) { super(148, 
v); } }
-
-    static class M149 extends TypedMetric { M149(final long v) { super(149, 
v); } }
-
-    static class M150 extends TypedMetric { M150(final long v) { super(150, 
v); } }
-
-    static class M151 extends TypedMetric { M151(final long v) { super(151, 
v); } }
-
-    static class M152 extends TypedMetric { M152(final long v) { super(152, 
v); } }
-
-    static class M153 extends TypedMetric { M153(final long v) { super(153, 
v); } }
-
-    static class M154 extends TypedMetric { M154(final long v) { super(154, 
v); } }
-
-    static class M155 extends TypedMetric { M155(final long v) { super(155, 
v); } }
-
-    static class M156 extends TypedMetric { M156(final long v) { super(156, 
v); } }
-
-    static class M157 extends TypedMetric { M157(final long v) { super(157, 
v); } }
-
-    static class M158 extends TypedMetric { M158(final long v) { super(158, 
v); } }
-
-    static class M159 extends TypedMetric { M159(final long v) { super(159, 
v); } }
-
-    static class M160 extends TypedMetric { M160(final long v) { super(160, 
v); } }
-
-    static class M161 extends TypedMetric { M161(final long v) { super(161, 
v); } }
-
-    static class M162 extends TypedMetric { M162(final long v) { super(162, 
v); } }
-
-    static class M163 extends TypedMetric { M163(final long v) { super(163, 
v); } }
-
-    static class M164 extends TypedMetric { M164(final long v) { super(164, 
v); } }
-
-    static class M165 extends TypedMetric { M165(final long v) { super(165, 
v); } }
-
-    static class M166 extends TypedMetric { M166(final long v) { super(166, 
v); } }
-
-    static class M167 extends TypedMetric { M167(final long v) { super(167, 
v); } }
-
-    static class M168 extends TypedMetric { M168(final long v) { super(168, 
v); } }
-
-    static class M169 extends TypedMetric { M169(final long v) { super(169, 
v); } }
-
-    static class M170 extends TypedMetric { M170(final long v) { super(170, 
v); } }
-
-    static class M171 extends TypedMetric { M171(final long v) { super(171, 
v); } }
-
-    static class M172 extends TypedMetric { M172(final long v) { super(172, 
v); } }
-
-    static class M173 extends TypedMetric { M173(final long v) { super(173, 
v); } }
-
-    static class M174 extends TypedMetric { M174(final long v) { super(174, 
v); } }
-
-    static class M175 extends TypedMetric { M175(final long v) { super(175, 
v); } }
-
-    static class M176 extends TypedMetric { M176(final long v) { super(176, 
v); } }
-
-    static class M177 extends TypedMetric { M177(final long v) { super(177, 
v); } }
-
-    static class M178 extends TypedMetric { M178(final long v) { super(178, 
v); } }
-
-    static class M179 extends TypedMetric { M179(final long v) { super(179, 
v); } }
-
-    static class M180 extends TypedMetric { M180(final long v) { super(180, 
v); } }
-
-    static class M181 extends TypedMetric { M181(final long v) { super(181, 
v); } }
-
-    static class M182 extends TypedMetric { M182(final long v) { super(182, 
v); } }
-
-    static class M183 extends TypedMetric { M183(final long v) { super(183, 
v); } }
-
-    static class M184 extends TypedMetric { M184(final long v) { super(184, 
v); } }
-
-    static class M185 extends TypedMetric { M185(final long v) { super(185, 
v); } }
-
-    static class M186 extends TypedMetric { M186(final long v) { super(186, 
v); } }
-
-    static class M187 extends TypedMetric { M187(final long v) { super(187, 
v); } }
-
-    static class M188 extends TypedMetric { M188(final long v) { super(188, 
v); } }
-
-    static class M189 extends TypedMetric { M189(final long v) { super(189, 
v); } }
-
-    static class M190 extends TypedMetric { M190(final long v) { super(190, 
v); } }
-
-    static class M191 extends TypedMetric { M191(final long v) { super(191, 
v); } }
-
-    static class M192 extends TypedMetric { M192(final long v) { super(192, 
v); } }
-
-    static class M193 extends TypedMetric { M193(final long v) { super(193, 
v); } }
-
-    static class M194 extends TypedMetric { M194(final long v) { super(194, 
v); } }
-
-    static class M195 extends TypedMetric { M195(final long v) { super(195, 
v); } }
-
-    static class M196 extends TypedMetric { M196(final long v) { super(196, 
v); } }
-
-    static class M197 extends TypedMetric { M197(final long v) { super(197, 
v); } }
-
-    static class M198 extends TypedMetric { M198(final long v) { super(198, 
v); } }
+    /**
+     * Build minimal class bytecode:
+     * <pre>
+     * class {name} extends TypedMetric {
+     *     {name}(long v) { super(typeId, v); }
+     * }
+     * </pre>
+     */
+    private static byte[] buildClassBytes(final String thisClass, final String 
superClass,
+                                          final int typeId) throws IOException 
{
+        final ByteArrayOutputStream buf = new ByteArrayOutputStream(256);
+        final DataOutputStream out = new DataOutputStream(buf);
+
+        // Magic + version (Java 11 = 55)
+        out.writeInt(0xCAFEBABE);
+        out.writeShort(0);
+        out.writeShort(55);
+
+        // Constant pool (10 entries, 1-indexed => count = 11)
+        out.writeShort(11);
+        // #1 Methodref -> #3.#7  (superClass.<init>:(IJ)V)
+        out.writeByte(10);
+        out.writeShort(3);
+        out.writeShort(7);
+        // #2 Class -> #8  (this class)
+        out.writeByte(7);
+        out.writeShort(8);
+        // #3 Class -> #9  (super class)
+        out.writeByte(7);
+        out.writeShort(9);
+        // #4 Utf8 "<init>"
+        out.writeByte(1);
+        out.writeUTF("<init>");
+        // #5 Utf8 "(J)V"
+        out.writeByte(1);
+        out.writeUTF("(J)V");
+        // #6 Utf8 "Code"
+        out.writeByte(1);
+        out.writeUTF("Code");
+        // #7 NameAndType -> #4:#10  (<init>:(IJ)V)
+        out.writeByte(12);
+        out.writeShort(4);
+        out.writeShort(10);
+        // #8 Utf8 this class name
+        out.writeByte(1);
+        out.writeUTF(thisClass);
+        // #9 Utf8 super class name
+        out.writeByte(1);
+        out.writeUTF(superClass);
+        // #10 Utf8 "(IJ)V"
+        out.writeByte(1);
+        out.writeUTF("(IJ)V");
+
+        // Access flags: ACC_SUPER (0x0020)
+        out.writeShort(0x0020);
+        // This class (#2), Super class (#3)
+        out.writeShort(2);
+        out.writeShort(3);
+        // Interfaces: 0
+        out.writeShort(0);
+        // Fields: 0
+        out.writeShort(0);
+
+        // Methods: 1 (constructor)
+        out.writeShort(1);
+        // Method access_flags: 0 (package-private)
+        out.writeShort(0);
+        // Method name: #4 (<init>)
+        out.writeShort(4);
+        // Method descriptor: #5 ((J)V)
+        out.writeShort(5);
+        // Method attributes: 1 (Code)
+        out.writeShort(1);
+
+        // Code attribute
+        out.writeShort(6); // attribute_name_index -> "Code"
+
+        // Build bytecode first to know length
+        final byte[] code = buildConstructorCode(typeId);
+        final int codeAttrLen = 2 + 2 + 4 + code.length + 2 + 2; // max_stack 
+ max_locals + code_length + code + exception_table_length + attributes_count
+        out.writeInt(codeAttrLen);
+        out.writeShort(4); // max_stack (this + int + long[2 slots])
+        out.writeShort(3); // max_locals (this + long[2 slots])
+        out.writeInt(code.length);
+        out.write(code);
+        out.writeShort(0); // exception_table_length
+        out.writeShort(0); // code attributes_count
+
+        // Class attributes: 0
+        out.writeShort(0);
+
+        out.flush();
+        return buf.toByteArray();
+    }
 
-    static class M199 extends TypedMetric { M199(final long v) { super(199, 
v); } }
+    /**
+     * Constructor bytecode: aload_0, sipush typeId, lload_1, invokespecial 
#1, return
+     */
+    private static byte[] buildConstructorCode(final int typeId) {
+        final ByteArrayOutputStream code = new ByteArrayOutputStream(16);
+        // aload_0
+        code.write(0x2A);
+        // sipush typeId (works for 0..32767)
+        code.write(0x11);
+        code.write((typeId >> 8) & 0xFF);
+        code.write(typeId & 0xFF);
+        // lload_1
+        code.write(0x1F);
+        // invokespecial #1
+        code.write(0xB7);
+        code.write(0x00);
+        code.write(0x01);
+        // return
+        code.write(0xB1);
+        return code.toByteArray();
+    }
 }
diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
index 0f95402c56..9cdb5d883b 100644
--- 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
@@ -26,16 +26,16 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 public class PartitionPolicyTest {
 
     @Test
-    public void testFixedReturnsExactCount() {
-        assertEquals(1, PartitionPolicy.fixed(1).resolve(4));
-        assertEquals(8, PartitionPolicy.fixed(8).resolve(4));
-    }
+    public void testFixedResolve() {
+        // Returns exact count, ignores both threadCount and handlerCount
+        assertEquals(1, PartitionPolicy.fixed(1).resolve(4, 0));
+        assertEquals(8, PartitionPolicy.fixed(8).resolve(4, 0));
+        assertEquals(4, PartitionPolicy.fixed(4).resolve(8, 500));
 
-    @Test
-    public void testFixedIgnoresThreadCount() {
+        // Same result regardless of threadCount
         final PartitionPolicy policy = PartitionPolicy.fixed(5);
-        assertEquals(5, policy.resolve(1));
-        assertEquals(5, policy.resolve(100));
+        assertEquals(5, policy.resolve(1, 0));
+        assertEquals(5, policy.resolve(100, 0));
     }
 
     @Test
@@ -49,15 +49,14 @@ public class PartitionPolicyTest {
     }
 
     @Test
-    public void testThreadMultiply() {
-        assertEquals(8, PartitionPolicy.threadMultiply(2).resolve(4));
-        assertEquals(12, PartitionPolicy.threadMultiply(3).resolve(4));
-    }
+    public void testThreadMultiplyResolve() {
+        // multiplier * threadCount, ignores handlerCount
+        assertEquals(8, PartitionPolicy.threadMultiply(2).resolve(4, 0));
+        assertEquals(12, PartitionPolicy.threadMultiply(3).resolve(4, 0));
+        assertEquals(16, PartitionPolicy.threadMultiply(2).resolve(8, 500));
 
-    @Test
-    public void testThreadMultiplyMinOne() {
         // Even with 0 thread count, should return at least 1
-        assertEquals(1, PartitionPolicy.threadMultiply(1).resolve(0));
+        assertEquals(1, PartitionPolicy.threadMultiply(1).resolve(0, 0));
     }
 
     @Test
@@ -65,9 +64,56 @@ public class PartitionPolicyTest {
         assertThrows(IllegalArgumentException.class, () -> 
PartitionPolicy.threadMultiply(0));
     }
 
+    @Test
+    public void testAdaptiveZeroHandlersReturnsThreadCount() {
+        assertEquals(8, PartitionPolicy.adaptive().resolve(8, 0));
+        assertEquals(4, PartitionPolicy.adaptive().resolve(4, 0));
+        assertEquals(1, PartitionPolicy.adaptive().resolve(0, 0));
+    }
+
+    @Test
+    public void testAdaptiveBelowThreshold() {
+        // 8 threads * 25 = 200 threshold, handlerCount <= 200 -> 1:1
+        assertEquals(50, PartitionPolicy.adaptive().resolve(8, 50));
+        assertEquals(100, PartitionPolicy.adaptive().resolve(8, 100));
+        assertEquals(200, PartitionPolicy.adaptive().resolve(8, 200));
+    }
+
+    @Test
+    public void testAdaptiveAboveThreshold() {
+        // 8 threads * 25 = 200 threshold, excess at 1:2 ratio
+        assertEquals(350, PartitionPolicy.adaptive().resolve(8, 500));   // 
200 + 300/2
+        assertEquals(600, PartitionPolicy.adaptive().resolve(8, 1000));  // 
200 + 800/2
+        assertEquals(1100, PartitionPolicy.adaptive().resolve(8, 2000)); // 
200 + 1800/2
+    }
+
+    @Test
+    public void testAdaptiveCustomMultiplier() {
+        // 8 threads * 50 = 400 threshold
+        assertEquals(100, PartitionPolicy.adaptive(50).resolve(8, 100));  // 
1:1
+        assertEquals(400, PartitionPolicy.adaptive(50).resolve(8, 400));  // 
1:1 at threshold
+        assertEquals(500, PartitionPolicy.adaptive(50).resolve(8, 600));  // 
400 + 200/2
+    }
+
+    @Test
+    public void testAdaptiveWithDifferentThreadCounts() {
+        // 4 threads * 25 = 100 threshold
+        assertEquals(50, PartitionPolicy.adaptive().resolve(4, 50));     // 1:1
+        assertEquals(100, PartitionPolicy.adaptive().resolve(4, 100));   // 1:1
+        assertEquals(350, PartitionPolicy.adaptive().resolve(4, 600));   // 
100 + 500/2
+    }
+
+    @Test
+    public void testAdaptiveRejectsInvalidMultiplier() {
+        assertThrows(IllegalArgumentException.class,
+            () -> PartitionPolicy.adaptive(0));
+    }
+
     @Test
     public void testToString() {
         assertEquals("fixed(4)", PartitionPolicy.fixed(4).toString());
         assertEquals("threadMultiply(2)", 
PartitionPolicy.threadMultiply(2).toString());
+        assertEquals("adaptive(multiplier=25)", 
PartitionPolicy.adaptive().toString());
+        assertEquals("adaptive(multiplier=50)", 
PartitionPolicy.adaptive(50).toString());
     }
 }
diff --git 
a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierBenchmark.java
 
b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierBenchmark.java
index f6efad4171..ad80a94761 100644
--- 
a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierBenchmark.java
+++ 
b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierBenchmark.java
@@ -28,137 +28,84 @@ import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
 import org.junit.jupiter.api.Test;
 
 /**
- * Throughput benchmark for DataCarrier, comparable to BatchQueueBenchmark.
+ * Throughput benchmark for DataCarrier as a baseline for BatchQueue 
comparison.
  *
  * <p>Simulates the real production pattern: N DataCarrier instances (one per
  * metric type), each with 1 channel, all sharing a {@link BulkConsumePool}
- * with 4 {@link 
org.apache.skywalking.oap.server.library.datacarrier.consumer.MultipleChannelsConsumer}
 threads.
- * Producers round-robin across the N carriers by type.
+ * with 8 consumer threads. 32 producer threads simulate gRPC connections.
  *
  * <p>Run with: mvn test -pl 
oap-server/server-library/library-datacarrier-queue
  *           -Dtest=DataCarrierBenchmark -DfailIfNoTests=false
  *
  * <h3>Reference results (Apple M3 Max, 128 GB RAM, macOS 26.2, JDK 17)</h3>
  * <pre>
- * Benchmark                       Config                  IF_POSSIBLE       
BLOCKING
- * ------------------------------- ---------------------- ------------- 
-------------
- * direct-1ch-1consumer            1ch, 1 consumer         ~35,300,000   
~35,300,000
- * direct-8ch-4consumer            8ch, 4 consumers        ~31,100,000   
~21,900,000
- * 40-types-pool4t (prod-like)     40 carriers, pool(4)    ~42,000,000   
~40,600,000
- * 200-types-pool4t (prod-like)    200 carriers, pool(4)   ~25,100,000   
~23,900,000
- * 200reg/50hot, 100 producers     200 carriers, pool(4)   ~17,600,000   
~23,800,000
+ * Types  Producers  Pool threads    IF_POSSIBLE       BLOCKING
+ * ------ --------- -------------- ------------- -------------
+ *   500      32      pool(8)       ~33,400,000   ~32,500,000
+ *  1000      32      pool(8)       ~37,600,000   ~36,000,000
+ *  2000      32      pool(8)       ~38,000,000   ~42,100,000
  *
- * All runs: 4 producer threads (except hot-types: 100), bufferSize=50,000,
- * consumeCycle=1ms, 0% drop rate.
+ * All runs: 1 channel per carrier, bufferSize=50,000, consumeCycle=1ms, 0% 
drop rate.
  * </pre>
  *
- * <h3>Comparison with BatchQueue — production case (4 consumer threads)</h3>
+ * <p><b>BatchQueue comparison (type-aware partitions, typed objects):</b>
  * <pre>
- * Scenario                        DataCarrier     BatchQueue
- *                                 (N carriers     (1 queue +
- *                                 + BulkPool)     handler map)
- * ------------------------------- -----------     -----------
- * 40 types, 4 producers            ~42,000,000    ~31,200,000
- * 200 types, 4 producers           ~25,100,000    ~43,900,000
- * 200 reg / 50 hot, 100 producers  ~17,600,000    ~26,900,000  (IF_POSSIBLE)
- * 200 reg / 50 hot, 100 producers  ~23,800,000    ~26,100,000  (BLOCKING)
- *
- * BatchQueue uses configurable PartitionSelector (default: typeHash).
- * Same-type items always land on the same partition, so dispatch grouping
- * is effectively a no-op. In the 200-type case, BatchQueue reaches ~44M/s
- * (1.75x DataCarrier's 25M) because 200 types spread across 32 partitions
- * with minimal collision, while DataCarrier's consumer threads must iterate
- * all 200 groups per cycle. In the production-realistic hot-types scenario
- * (200reg/50hot/100 producers), BatchQueue leads ~27M vs ~18M (IF_POSSIBLE).
- * DataCarrier wins the 40-type case (~42M vs ~31M) where fewer types mean
- * less iteration overhead for its per-carrier polling model.
+ *              500 types   1000 types   2000 types
+ * 1:4           -4%         +6%          +24%
+ * 1:2          +14%        +32%          +38%
+ * adaptive     +37%        +34%          +68%
+ * 1:1          +53%        +63%          +99%
  * </pre>
+ *
+ * <p>BatchQueue adaptive() = threshold(threadCount * 25), 1:1 below, 1:2
+ * above. Consistently outperforms DataCarrier across all type counts.
+ * See BatchQueueBenchmark for full details.
  */
 @SuppressWarnings("all")
 public class DataCarrierBenchmark {
 
     private static final int WARMUP_SECONDS = 2;
     private static final int MEASURE_SECONDS = 5;
-    private static final int PRODUCER_THREADS = 4;
+    private static final int PRODUCER_THREADS = 32;
+    private static final int POOL_THREADS = 8;
     private static final int BUFFER_SIZE = 50_000;
 
-    // ---- Production-like: N types sharing BulkConsumePool ----
-
-    @Test
-    public void benchmark40TypesSharedPool4Threads() throws Exception {
-        runSharedPoolBenchmark("40-types-pool4t-ifpossible",
-            40, 4, 1, BufferStrategy.IF_POSSIBLE);
-    }
-
-    @Test
-    public void benchmark200TypesSharedPool4Threads() throws Exception {
-        runSharedPoolBenchmark("200-types-pool4t-ifpossible",
-            200, 4, 1, BufferStrategy.IF_POSSIBLE);
-    }
-
-    @Test
-    public void benchmark40TypesSharedPool4ThreadsBlocking() throws Exception {
-        runSharedPoolBenchmark("40-types-pool4t-blocking",
-            40, 4, 1, BufferStrategy.BLOCKING);
-    }
-
-    @Test
-    public void benchmark200TypesSharedPool4ThreadsBlocking() throws Exception 
{
-        runSharedPoolBenchmark("200-types-pool4t-blocking",
-            200, 4, 1, BufferStrategy.BLOCKING);
-    }
-
-    // ---- Production-realistic: 200 types, 50 hot, 100 gRPC producers ----
-
     @Test
-    public void benchmarkHotTypes200x50x100producers() throws Exception {
-        runHotTypesPoolBenchmark("200reg-50hot-100p-ifpossible",
-            200, 50, 100, 4, 1, BufferStrategy.IF_POSSIBLE);
+    public void benchmark500Types() throws Exception {
+        runSharedPoolBenchmark("500-types", 500, BufferStrategy.IF_POSSIBLE);
     }
 
     @Test
-    public void benchmarkHotTypes200x50x100producersBlocking() throws 
Exception {
-        runHotTypesPoolBenchmark("200reg-50hot-100p-blocking",
-            200, 50, 100, 4, 1, BufferStrategy.BLOCKING);
+    public void benchmark1000Types() throws Exception {
+        runSharedPoolBenchmark("1000-types", 1000, BufferStrategy.IF_POSSIBLE);
     }
 
-    // ---- Single carrier baselines ----
-
     @Test
-    public void benchmarkDirectConsumer1Thread() throws Exception {
-        runSingleBenchmark("direct-1ch-1consumer-ifpossible",
-            1, 1, 1, BufferStrategy.IF_POSSIBLE);
+    public void benchmark2000Types() throws Exception {
+        runSharedPoolBenchmark("2000-types", 2000, BufferStrategy.IF_POSSIBLE);
     }
 
     @Test
-    public void benchmarkDirectConsumer4Threads() throws Exception {
-        runSingleBenchmark("direct-8ch-4consumer-ifpossible",
-            8, 4, 1, BufferStrategy.IF_POSSIBLE);
+    public void benchmark500TypesBlocking() throws Exception {
+        runSharedPoolBenchmark("500-types-blocking", 500, 
BufferStrategy.BLOCKING);
     }
 
     @Test
-    public void benchmarkDirectConsumer1ThreadBlocking() throws Exception {
-        runSingleBenchmark("direct-1ch-1consumer-blocking",
-            1, 1, 1, BufferStrategy.BLOCKING);
+    public void benchmark1000TypesBlocking() throws Exception {
+        runSharedPoolBenchmark("1000-types-blocking", 1000, 
BufferStrategy.BLOCKING);
     }
 
     @Test
-    public void benchmarkDirectConsumer4ThreadsBlocking() throws Exception {
-        runSingleBenchmark("direct-8ch-4consumer-blocking",
-            8, 4, 1, BufferStrategy.BLOCKING);
+    public void benchmark2000TypesBlocking() throws Exception {
+        runSharedPoolBenchmark("2000-types-blocking", 2000, 
BufferStrategy.BLOCKING);
     }
 
-    /**
-     * Production-like benchmark: N DataCarrier instances sharing one 
BulkConsumePool.
-     * Each carrier has 1 channel. Producers round-robin across carriers by 
type.
-     */
     private void runSharedPoolBenchmark(final String label, final int 
typeCount,
-                                        final int poolThreads, final long 
consumeCycleMs,
                                         final BufferStrategy strategy) throws 
Exception {
         final AtomicLong consumed = new AtomicLong(0);
 
         final BulkConsumePool pool = new BulkConsumePool(
-            "bench-pool", poolThreads, consumeCycleMs, false);
+            "bench-pool", POOL_THREADS, 1, false);
 
         final DataCarrier<Long>[] carriers = new DataCarrier[typeCount];
         for (int i = 0; i < typeCount; i++) {
@@ -181,7 +128,6 @@ public class DataCarrierBenchmark {
         final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 
1000L;
         runProducers(carriers, warmupEnd);
         Thread.sleep(200);
-        final long warmupConsumed = consumed.get();
         consumed.set(0);
 
         // Measure
@@ -198,188 +144,16 @@ public class DataCarrierBenchmark {
         System.out.printf("%n=== DataCarrier Benchmark: %s ===%n"
                 + "  Types:       %d (1 DataCarrier per type, 1 channel 
each)%n"
                 + "  Pool threads:%d%n"
-                + "  ConsumeCycle:%d ms%n"
-                + "  Strategy:    %s%n"
-                + "  Producers:   %d%n"
-                + "  Duration:    %d ms%n"
-                + "  Warmup consumed: %,d%n"
-                + "  Produced:    %,d%n"
-                + "  Consumed:    %,d%n"
-                + "  Produce rate:  %,.0f items/sec%n"
-                + "  Consume rate:  %,.0f items/sec%n"
-                + "  Drop rate:     %.2f%%%n",
-            label, typeCount, poolThreads, consumeCycleMs, strategy,
-            PRODUCER_THREADS, measureDuration, warmupConsumed,
-            produced, totalConsumed,
-            produced * 1000.0 / measureDuration,
-            totalConsumed * 1000.0 / measureDuration,
-            produced > 0 ? (produced - totalConsumed) * 100.0 / produced : 0);
-    }
-
-    /**
-     * Production-realistic: 200 DataCarrier instances created, but only 50 
hot types
-     * get traffic from 100 producer threads (simulating gRPC connections).
-     * Consumer threads must iterate all 200 groups each cycle, including 150 
idle ones.
-     */
-    private void runHotTypesPoolBenchmark(final String label, final int 
totalTypes,
-                                          final int hotTypes, final int 
producerCount,
-                                          final int poolThreads, final long 
consumeCycleMs,
-                                          final BufferStrategy strategy) 
throws Exception {
-        final AtomicLong consumed = new AtomicLong(0);
-
-        final BulkConsumePool pool = new BulkConsumePool(
-            "bench-hot-pool", poolThreads, consumeCycleMs, false);
-
-        // Create ALL carriers (200), each registered with the shared pool
-        final DataCarrier<Long>[] carriers = new DataCarrier[totalTypes];
-        for (int i = 0; i < totalTypes; i++) {
-            carriers[i] = new DataCarrier<>(
-                "type-" + i, "bench", 1, BUFFER_SIZE, strategy);
-            carriers[i].consume(pool, new IConsumer<Long>() {
-                @Override
-                public void consume(final List<Long> data) {
-                    consumed.addAndGet(data.size());
-                }
-
-                @Override
-                public void onError(final List<Long> data, final Throwable t) {
-                    t.printStackTrace();
-                }
-            });
-        }
-
-        // Only hot carriers get traffic
-        final DataCarrier<Long>[] hotCarriers = new DataCarrier[hotTypes];
-        System.arraycopy(carriers, 0, hotCarriers, 0, hotTypes);
-
-        // Warmup
-        final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 
1000L;
-        runProducersN(hotCarriers, producerCount, warmupEnd);
-        Thread.sleep(200);
-        consumed.set(0);
-
-        // Measure
-        final long measureStart = System.currentTimeMillis();
-        final long measureEnd = measureStart + MEASURE_SECONDS * 1000L;
-        final long produced = runProducersN(hotCarriers, producerCount, 
measureEnd);
-        final long measureDuration = System.currentTimeMillis() - measureStart;
-
-        Thread.sleep(500);
-        final long totalConsumed = consumed.get();
-
-        pool.close(null);
-
-        System.out.printf("%n=== DataCarrier Benchmark: %s ===%n"
-                + "  Total types: %d (all registered with pool)%n"
-                + "  Hot types:   %d (receiving traffic)%n"
-                + "  Pool threads:%d%n"
-                + "  ConsumeCycle:%d ms%n"
-                + "  Strategy:    %s%n"
-                + "  Producers:   %d (simulating gRPC connections)%n"
-                + "  Duration:    %d ms%n"
-                + "  Produced:    %,d%n"
-                + "  Consumed:    %,d%n"
-                + "  Produce rate:  %,.0f items/sec%n"
-                + "  Consume rate:  %,.0f items/sec%n"
-                + "  Drop rate:     %.2f%%%n",
-            label, totalTypes, hotTypes, poolThreads, consumeCycleMs, strategy,
-            producerCount, measureDuration,
-            produced, totalConsumed,
-            produced * 1000.0 / measureDuration,
-            totalConsumed * 1000.0 / measureDuration,
-            produced > 0 ? (produced - totalConsumed) * 100.0 / produced : 0);
-    }
-
-    private long runProducersN(final DataCarrier<Long>[] carriers,
-                               final int producerCount,
-                               final long endTimeMs) throws 
InterruptedException {
-        final int carrierCount = carriers.length;
-        final AtomicLong totalProduced = new AtomicLong(0);
-        final CountDownLatch done = new CountDownLatch(producerCount);
-
-        for (int p = 0; p < producerCount; p++) {
-            final int producerIndex = p;
-            final Thread thread = new Thread(() -> {
-                long count = 0;
-                int typeIndex = producerIndex;
-                while (System.currentTimeMillis() < endTimeMs) {
-                    for (int batch = 0; batch < 100; batch++) {
-                        final int type = typeIndex % carrierCount;
-                        if (carriers[type].produce(count)) {
-                            count++;
-                        }
-                        typeIndex++;
-                    }
-                }
-                totalProduced.addAndGet(count);
-                done.countDown();
-            });
-            thread.setName("gRPC-" + producerIndex);
-            thread.setDaemon(true);
-            thread.start();
-        }
-
-        done.await(MEASURE_SECONDS + 10, TimeUnit.SECONDS);
-        return totalProduced.get();
-    }
-
-    private void runSingleBenchmark(final String label, final int channelSize,
-                                    final int consumerThreads, final long 
consumeCycleMs,
-                                    final BufferStrategy strategy) throws 
Exception {
-        final AtomicLong consumed = new AtomicLong(0);
-
-        final DataCarrier<Long> carrier = new DataCarrier<>(
-            "bench-" + label, "bench", channelSize, BUFFER_SIZE, strategy);
-
-        carrier.consume(new IConsumer<Long>() {
-            @Override
-            public void consume(final List<Long> data) {
-                consumed.addAndGet(data.size());
-            }
-
-            @Override
-            public void onError(final List<Long> data, final Throwable t) {
-                t.printStackTrace();
-            }
-        }, consumerThreads, consumeCycleMs);
-
-        final DataCarrier<Long>[] carriers = new DataCarrier[] {carrier};
-
-        // Warmup
-        final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 
1000L;
-        runProducers(carriers, warmupEnd);
-        Thread.sleep(200);
-        final long warmupConsumed = consumed.get();
-        consumed.set(0);
-
-        // Measure
-        final long measureStart = System.currentTimeMillis();
-        final long measureEnd = measureStart + MEASURE_SECONDS * 1000L;
-        final long produced = runProducers(carriers, measureEnd);
-        final long measureDuration = System.currentTimeMillis() - measureStart;
-
-        Thread.sleep(500);
-        final long totalConsumed = consumed.get();
-
-        carrier.shutdownConsumers();
-
-        System.out.printf("%n=== DataCarrier Benchmark: %s ===%n"
-                + "  Channels:    %d%n"
-                + "  Consumers:   %d%n"
-                + "  ConsumeCycle:%d ms%n"
                 + "  Strategy:    %s%n"
                 + "  Producers:   %d%n"
                 + "  Duration:    %d ms%n"
-                + "  Warmup consumed: %,d%n"
                 + "  Produced:    %,d%n"
                 + "  Consumed:    %,d%n"
-                + "  Produce rate:  %,.0f items/sec%n"
                 + "  Consume rate:  %,.0f items/sec%n"
                 + "  Drop rate:     %.2f%%%n",
-            label, channelSize, consumerThreads, consumeCycleMs, strategy,
-            PRODUCER_THREADS, measureDuration, warmupConsumed,
+            label, typeCount, POOL_THREADS, strategy,
+            PRODUCER_THREADS, measureDuration,
             produced, totalConsumed,
-            produced * 1000.0 / measureDuration,
             totalConsumed * 1000.0 / measureDuration,
             produced > 0 ? (produced - totalConsumed) * 100.0 / produced : 0);
     }
@@ -408,6 +182,7 @@ public class DataCarrierBenchmark {
                 done.countDown();
             });
             thread.setName("Producer-" + producerIndex);
+            thread.setDaemon(true);
             thread.start();
         }
 

Reply via email to