This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch library-batch-queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 9cb4134b3aa79553dd0d429e25af6bca918cb972
Author: Wu Sheng <[email protected]>
AuthorDate: Sat Feb 14 10:25:03 2026 +0800

    Replace DataCarrier with BatchQueue for L1 metrics aggregation
    
    Unify OAL and MAL metric types into a single shared BatchQueue with
    adaptive partitioning, replacing the per-type DataCarrier + dual
    BulkConsumePool architecture. This reduces thread count from 26 to
    8 (on 8-core) while improving throughput via better work distribution.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 oap-server/server-core/pom.xml                     |   5 +
 .../analysis/worker/MetricsAggregateMALWorker.java |  65 -------
 .../analysis/worker/MetricsAggregateOALWorker.java |  48 -----
 .../analysis/worker/MetricsAggregateWorker.java    | 136 ++++++--------
 .../analysis/worker/MetricsStreamProcessor.java    |  15 +-
 .../library-batch-queue/MIGRATION.md               | 196 +++++++++++++++++++++
 6 files changed, 257 insertions(+), 208 deletions(-)

diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml
index f6d233587a..94889deaea 100644
--- a/oap-server/server-core/pom.xml
+++ b/oap-server/server-core/pom.xml
@@ -79,6 +79,11 @@
             <artifactId>library-datacarrier-queue</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>library-batch-queue</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.skywalking</groupId>
             <artifactId>apm-network</artifactId>
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java
deleted file mode 100644
index 6ce0bb447d..0000000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.worker;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
-import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
-import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-
-/**
- * MetricsAggregateMALWorker provides an in-memory metrics merging capability 
for MAL
- */
-@Slf4j
-public class MetricsAggregateMALWorker extends MetricsAggregateWorker {
-    private final static String POOL_NAME = "METRICS_L1_AGGREGATION_MAL";
-    private final BulkConsumePool pool;
-
-    MetricsAggregateMALWorker(ModuleDefineHolder moduleDefineHolder,
-                              AbstractWorker<Metrics> nextWorker,
-                              String modelName,
-                              long l1FlushPeriod,
-                              MetricStreamKind kind) {
-        super(
-            moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind,
-            POOL_NAME,
-            calculatePoolSize(),
-            true,
-            1,
-            1_000
-        );
-        this.pool = (BulkConsumePool) 
ConsumerPoolFactory.INSTANCE.get(POOL_NAME);
-    }
-
-    /**
-     * MetricsAggregateWorker#in operation does include enqueue only
-     */
-    @Override
-    public final void in(Metrics metrics) {
-        super.in(metrics);
-        pool.notifyConsumers();
-    }
-
-    private static int calculatePoolSize() {
-        int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
-        return size == 0 ? 1 : size;
-    }
-}
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java
deleted file mode 100644
index 833b24419a..0000000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.worker;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-
-/**
- * MetricsAggregateOALWorker provides an in-memory metrics merging capability 
for OAL
- */
-@Slf4j
-public class MetricsAggregateOALWorker extends MetricsAggregateWorker {
-    private final static String POOL_NAME = "METRICS_L1_AGGREGATION_OAL";
-
-    MetricsAggregateOALWorker(ModuleDefineHolder moduleDefineHolder,
-                              AbstractWorker<Metrics> nextWorker,
-                              String modelName,
-                              long l1FlushPeriod,
-                              MetricStreamKind kind) {
-        super(
-            moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind,
-            POOL_NAME,
-            (int) Math.ceil(BulkConsumePool.Creator.recommendMaxSize() * 1.5),
-            false,
-            2,
-            10_000
-        );
-    }
-}
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index b0a8bffa34..ff69e6f8ec 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -18,23 +18,21 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.Arrays;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import 
org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
-import 
org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
-import org.apache.skywalking.oap.server.library.datacarrier.buffer.QueueBuffer;
-import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
-import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueConfig;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueManager;
+import org.apache.skywalking.oap.server.library.batchqueue.BufferStrategy;
+import org.apache.skywalking.oap.server.library.batchqueue.HandlerConsumer;
+import org.apache.skywalking.oap.server.library.batchqueue.PartitionPolicy;
+import org.apache.skywalking.oap.server.library.batchqueue.ThreadPolicy;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
-import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 
@@ -43,46 +41,47 @@ import 
org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
  * it merges the data just after the receiver analysis. The metrics belonging 
to the same entity, metrics type and time
  * bucket, the L1 aggregation will merge them into one metrics object to 
reduce the unnecessary memory and network
  * payload.
+ *
+ * <p>All metric types (OAL and MAL) share a single {@link BatchQueue} with 
adaptive partitioning.
+ * The {@code typeHash()} partition selector ensures same metric class lands 
on the same partition,
+ * so each handler's {@link MergableBufferedData} is only accessed by one 
drain thread.
  */
 @Slf4j
-public abstract class MetricsAggregateWorker extends AbstractWorker<Metrics> {
-    public final long l1FlushPeriod;
-    private AbstractWorker<Metrics> nextWorker;
-    private final DataCarrier<Metrics> dataCarrier;
+public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
+    private static final String L1_QUEUE_NAME = "METRICS_L1_AGGREGATION";
+    private static final BatchQueueConfig<Metrics> L1_QUEUE_CONFIG =
+        BatchQueueConfig.<Metrics>builder()
+            .threads(ThreadPolicy.cpuCores(1.0))
+            .partitions(PartitionPolicy.adaptive())
+            .bufferSize(20_000)
+            .strategy(BufferStrategy.IF_POSSIBLE)
+            .minIdleMs(1)
+            .maxIdleMs(50)
+            .build();
+
+    private final BatchQueue<Metrics> l1Queue;
+    private final long l1FlushPeriod;
+    private final AbstractWorker<Metrics> nextWorker;
     private final MergableBufferedData<Metrics> mergeDataCache;
-    private CounterMetrics abandonCounter;
-    private CounterMetrics aggregationCounter;
-    private GaugeMetrics queuePercentageGauge;
+    private final CounterMetrics abandonCounter;
+    private final CounterMetrics aggregationCounter;
+    // TODO: add queue usage telemetry after per-partition metrics are designed
     private long lastSendTime = 0;
-    private final int queueTotalSize;
 
-    MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder,
-                           AbstractWorker<Metrics> nextWorker,
-                           String modelName,
-                           long l1FlushPeriod,
-                           MetricStreamKind kind,
-                           String poolName,
-                           int poolSize,
-                           boolean isSignalDrivenMode,
-                           int queueChannelSize,
-                           int queueBufferSize
-                           ) {
+    MetricsAggregateWorker(final ModuleDefineHolder moduleDefineHolder,
+                           final AbstractWorker<Metrics> nextWorker,
+                           final String modelName,
+                           final long l1FlushPeriod,
+                           final Class<? extends Metrics> metricsClass) {
         super(moduleDefineHolder);
         this.nextWorker = nextWorker;
-        this.mergeDataCache = new MergableBufferedData();
-        BulkConsumePool.Creator creator = new 
BulkConsumePool.Creator(poolName, poolSize, 200, isSignalDrivenMode);
-        this.dataCarrier = new DataCarrier<>(
-            "MetricsAggregateWorker." + modelName, poolName, queueChannelSize, 
queueBufferSize, BufferStrategy.IF_POSSIBLE);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
-        }
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), 
new AggregatorConsumer());
+        this.mergeDataCache = new MergableBufferedData<>();
+        this.l1FlushPeriod = l1FlushPeriod;
+        this.l1Queue = BatchQueueManager.create(L1_QUEUE_NAME, 
L1_QUEUE_CONFIG);
 
-        MetricsCreator metricsCreator = 
moduleDefineHolder.find(TelemetryModule.NAME)
-                                                          .provider()
-                                                          
.getService(MetricsCreator.class);
+        final MetricsCreator metricsCreator = 
moduleDefineHolder.find(TelemetryModule.NAME)
+                                                                .provider()
+                                                                
.getService(MetricsCreator.class);
         abandonCounter = metricsCreator.createCounter(
             "metrics_aggregator_abandon", "The abandon number of rows received 
in aggregation.",
             new MetricsTag.Keys("metricName", "level", "dimensionality"),
@@ -93,69 +92,42 @@ public abstract class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
             new MetricsTag.Keys("metricName", "level", "dimensionality"),
             new MetricsTag.Values(modelName, "1", "minute")
         );
-        queuePercentageGauge = metricsCreator.createGauge(
-            "metrics_aggregation_queue_used_percentage", "The percentage of 
queue used in aggregation.",
-            new MetricsTag.Keys("metricName", "level", "kind"),
-            new MetricsTag.Values(modelName, "1", kind.name())
-        );
-        this.l1FlushPeriod = l1FlushPeriod;
-        queueTotalSize = 
Arrays.stream(dataCarrier.getChannels().getBufferChannels())
-                               .mapToInt(QueueBuffer::getBufferSize)
-                               .sum();
+
+        l1Queue.addHandler(metricsClass, new L1Handler());
     }
 
-    /**
-     * MetricsAggregateWorker#in operation does include enqueue only
-     */
     @Override
-    public void in(Metrics metrics) {
-        if (!dataCarrier.produce(metrics)) {
+    public void in(final Metrics metrics) {
+        if (!l1Queue.produce(metrics)) {
             abandonCounter.inc();
         }
     }
 
-    /**
-     * Dequeue consuming. According to {@link IConsumer#consume(List)}, this 
is a serial operation for every work
-     * instance.
-     *
-     * @param metricsList from the queue.
-     */
-    private void onWork(List<Metrics> metricsList) {
-        metricsList.forEach(metrics -> {
+    private void onWork(final List<Metrics> metricsList) {
+        for (final Metrics metrics : metricsList) {
             aggregationCounter.inc();
             mergeDataCache.accept(metrics);
-        });
-
+        }
         flush();
     }
 
     private void flush() {
-        long currentTime = System.currentTimeMillis();
+        final long currentTime = System.currentTimeMillis();
         if (currentTime - lastSendTime > l1FlushPeriod) {
-            mergeDataCache.read().forEach(
-                data -> {
-                    nextWorker.in(data);
-                }
-            );
+            mergeDataCache.read().forEach(nextWorker::in);
             lastSendTime = currentTime;
         }
     }
 
-    protected class AggregatorConsumer implements IConsumer<Metrics> {
-        @Override
-        public void consume(List<Metrics> data) {
-            queuePercentageGauge.setValue(Math.round(100 * (double) 
data.size() / queueTotalSize));
-            MetricsAggregateWorker.this.onWork(data);
-        }
-
+    private class L1Handler implements HandlerConsumer<Metrics> {
         @Override
-        public void onError(List<Metrics> data, Throwable t) {
-            log.error(t.getMessage(), t);
+        public void consume(final List<Metrics> data) {
+            onWork(data);
         }
 
         @Override
-        public void nothingToConsume() {
+        public void onIdle() {
             flush();
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index fe70ebdd7f..38361ebd10 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -194,19 +194,8 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         workerInstanceSetter.put(remoteReceiverWorkerName, 
minutePersistentWorker, kind, metricsClass);
 
         MetricsRemoteWorker remoteWorker = new 
MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
-        MetricsAggregateWorker aggregateWorker;
-        switch (kind) {
-            case OAL:
-                aggregateWorker = new MetricsAggregateOALWorker(
-                    moduleDefineHolder, remoteWorker, stream.getName(), 
l1FlushPeriod, kind);
-                break;
-            case MAL:
-                aggregateWorker = new MetricsAggregateMALWorker(
-                    moduleDefineHolder, remoteWorker, stream.getName(), 
l1FlushPeriod, kind);
-                break;
-            default:
-                throw new IllegalArgumentException("Unsupported 
MetricStreamKind: " + kind);
-        }
+        MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
+            moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod, 
metricsClass);
         entryWorkers.put(metricsClass, aggregateWorker);
     }
 
diff --git a/oap-server/server-library/library-batch-queue/MIGRATION.md 
b/oap-server/server-library/library-batch-queue/MIGRATION.md
new file mode 100644
index 0000000000..3cdc251ec3
--- /dev/null
+++ b/oap-server/server-library/library-batch-queue/MIGRATION.md
@@ -0,0 +1,196 @@
+# DataCarrier to BatchQueue Migration
+
+This document tracks the step-by-step replacement of DataCarrier with 
BatchQueue
+across the SkyWalking OAP codebase. Each section covers one replacement target
+with its current state, proposed changes, and key parameters.
+
+## 1. L1 Metrics Aggregation (MetricsAggregateWorker)
+
+### Current Architecture
+
+Each metric type (~620 OAL + ~100 MAL) creates its own `MetricsAggregateWorker`
+with a dedicated `DataCarrier`. OAL and MAL workers use separate
+`BulkConsumePool` instances with different thread counts and configurations,
+despite doing the same work (merge + flush).
+
+```
+MetricsStreamProcessor.in(metrics)
+  └─ entryWorkers.get(metrics.getClass())    // one worker per metric type
+       └─ worker.in(metrics)
+            └─ dataCarrier.produce(metrics)   // one DataCarrier per worker
+                 └─ BulkConsumePool drains    // OAL pool or MAL pool
+                      └─ AggregatorConsumer.consume(batch)
+                           └─ mergeDataCache.accept(each)  // per-worker merge 
cache
+                           └─ flush() if l1FlushPeriod elapsed
+```
+
+**Files involved:**
+- `server-core/.../worker/MetricsAggregateWorker.java` — base class, owns 
DataCarrier + merge cache
+- `server-core/.../worker/MetricsAggregateOALWorker.java` — OAL pool config
+- `server-core/.../worker/MetricsAggregateMALWorker.java` — MAL pool config + 
signal-driven
+- `server-core/.../worker/MetricsStreamProcessor.java` — creates workers, 
routes `in()` calls
+
+**Current parameters (8-core machine):**
+
+| Parameter | OAL pool | MAL pool |
+|-----------|----------|----------|
+| Pool name | `METRICS_L1_AGGREGATION_OAL` | `METRICS_L1_AGGREGATION_MAL` |
+| Pool threads | `ceil(cores * 3)` = 24 | `cores / 4` = 2 |
+| Signal-driven | no (200ms poll cycle) | yes (`notifyConsumers()` per `in()`) 
|
+| Channels per type | 2 | 1 |
+| Buffer per channel | 10,000 | 1,000 |
+| Total buffer per type | 20,000 | 1,000 |
+| Strategy | IF_POSSIBLE | IF_POSSIBLE |
+| **Total threads** | **26** | |
+| **Total DataCarriers** | **~720** | |
+
+**Per-worker state:**
+- `MergableBufferedData<Metrics> mergeDataCache` — not thread-safe, merge cache
+- `long lastSendTime` — tracks l1FlushPeriod
+- `AbstractWorker<Metrics> nextWorker` — MetricsRemoteWorker
+- `CounterMetrics abandonCounter` — tagged by metricName, level=1, 
dimensionality=minute
+- `CounterMetrics aggregationCounter` — tagged by metricName, level=1, 
dimensionality=minute
+- `GaugeMetrics queuePercentageGauge` — tagged by metricName, level=1, 
kind=OAL/MAL
+
+### Proposed: Single BatchQueue for All Metric Types
+
+OAL and MAL use the same queue. There is no reason to separate them — the
+aggregation logic (merge + periodic flush) is identical. Splitting by kind only
+wastes threads and complicates configuration. `MetricStreamKind` becomes
+irrelevant at the L1 aggregation layer.
+
+One `BatchQueue<Metrics>` with adaptive partitioning handles all ~720 metric
+types. Each type registers a handler via `addHandler()`.
+
+**BatchQueue parameters:**
+
+| Parameter | Value | Rationale |
+|-----------|-------|-----------|
+| Queue name | `METRICS_L1_AGGREGATION` | Single queue for all metric types 
(OAL + MAL) |
+| ThreadPolicy | `cpuCores(1.0)` | 8 threads on 8-core. Benchmark: 8 threads + 
adaptive beats 26-thread DataCarrier by 34-68% |
+| PartitionPolicy | `adaptive()` (multiplier=25) | threshold=8*25=200. ~720 
types → 200+520/2 = 460 partitions |
+| BufferSize | 20,000 | Matches current OAL per-type capacity |
+| Strategy | `IF_POSSIBLE` | Same as current — drop if full |
+| PartitionSelector | `typeHash()` (default) | Same type → same partition → 
single-thread access to merge cache |
+| minIdleMs | 1 | Fast reaction (current MAL used signal-driven wakeup; 1ms 
backoff is equivalent) |
+| maxIdleMs | 50 | Backoff cap when idle |
+
+**Thread-safety guarantee:** `typeHash()` ensures same metric class → same
+partition → same drain thread. Each handler's `MergableBufferedData` is only
+accessed by one thread, preserving the current invariant without 
synchronization.
+
+**Memory comparison (8-core, ~720 types):**
+
+| | Current (OAL + MAL pools) | Proposed (single queue) |
+|---|---------------------------|-------------------------|
+| Queues | ~1340 channels | 460 partitions |
+| Buffer slots | ~12.5M | ~9.2M |
+| Threads | 26 | 8 |
+
+### Telemetry
+
+| Metric | Plan |
+|--------|------|
+| `metrics_aggregator_abandon` | Keep. Per-type counter in handler. 
Incremented when `produce()` returns false. |
+| `metrics_aggregation` | Keep. Per-type counter in handler. Incremented per 
item in `consume()`. |
+| `metrics_aggregation_queue_used_percentage` | TODO: Redesign for 
shared-partition model. Leave as placeholder. |
+
+### Replacement Steps
+
+#### Step 1: Create L1 aggregation handler
+
+Create a handler class that captures the per-type state currently spread across
+`MetricsAggregateWorker` / `MetricsAggregateOALWorker` / 
`MetricsAggregateMALWorker`.
+No OAL/MAL distinction — the handler is the same for all metric types.
+
+```
+MetricsL1Handler implements HandlerConsumer<Metrics>
+  - MergableBufferedData<Metrics> mergeDataCache
+  - long lastSendTime
+  - long l1FlushPeriod
+  - AbstractWorker<Metrics> nextWorker
+  - CounterMetrics aggregationCounter
+
+  consume(List<Metrics> batch):
+    for each metric: aggregationCounter.inc(); mergeDataCache.accept(metric)
+    flush()
+
+  onIdle():
+    flush()
+
+  flush():
+    if currentTime - lastSendTime > l1FlushPeriod:
+      mergeDataCache.read().forEach(nextWorker::in)
+      lastSendTime = currentTime
+```
+
+#### Step 2: Create the shared BatchQueue in MetricsStreamProcessor
+
+One queue, created once, shared by all metric types regardless of kind:
+
+```java
+BatchQueue<Metrics> l1Queue = BatchQueueManager.create(
+    "METRICS_L1_AGGREGATION",
+    BatchQueueConfig.<Metrics>builder()
+        .threads(ThreadPolicy.cpuCores(1.0))
+        .partitions(PartitionPolicy.adaptive())
+        .bufferSize(20_000)
+        .strategy(BufferStrategy.IF_POSSIBLE)
+        .minIdleMs(1)
+        .maxIdleMs(50)
+        .build());
+```
+
+#### Step 3: Register handlers per metric type
+
+In `MetricsStreamProcessor.create()`, replace the OAL/MAL worker switch with
+a single handler registration. The `kind` parameter is no longer needed:
+
+```java
+// Replace:
+//   switch (kind) {
+//       case OAL:  aggregateWorker = new MetricsAggregateOALWorker(...); 
break;
+//       case MAL:  aggregateWorker = new MetricsAggregateMALWorker(...); 
break;
+//   }
+//   entryWorkers.put(metricsClass, aggregateWorker);
+
+// With:
+MetricsL1Handler handler = new MetricsL1Handler(
+    remoteWorker, l1FlushPeriod, modelName, metricsCreator);
+l1Queue.addHandler(metricsClass, handler);
+```
+
+#### Step 4: Replace `in()` routing
+
+The `MetricsStreamProcessor.in()` method no longer looks up a per-type worker.
+It produces directly to the queue:
+
+```java
+// Replace:
+//   MetricsAggregateWorker worker = entryWorkers.get(metrics.getClass());
+//   if (worker != null) { worker.in(metrics); }
+
+// With:
+if (!l1Queue.produce(metrics)) {
+    abandonCounters.get(metrics.getClass()).inc();
+}
+```
+
+`abandonCounters` is a `Map<Class<?>, CounterMetrics>` populated during
+handler registration (Step 3).
+
+#### Step 5: Verify
+
+- All existing unit tests for MetricsStreamProcessor pass
+- E2E tests with metrics aggregation pass
+- Telemetry counters (abandon, aggregation) still reported per metric type
+
+---
+
+## 2. L2 Metrics Persistence (MetricsPersistentMinWorker)
+
+*To be planned after L1 replacement is complete.*
+
+## 3. Individual Consumer Replacements
+
+*To be planned: GRPCRemoteClient, TopNWorker, exporters, JDBCBatchDAO.*

Reply via email to