This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch library-batch-queue in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 9cb4134b3aa79553dd0d429e25af6bca918cb972 Author: Wu Sheng <[email protected]> AuthorDate: Sat Feb 14 10:25:03 2026 +0800 Replace DataCarrier with BatchQueue for L1 metrics aggregation Unify OAL and MAL metric types into a single shared BatchQueue with adaptive partitioning, replacing the per-type DataCarrier + dual BulkConsumePool architecture. This reduces thread count from 26 to 8 (on 8-core) while improving throughput via better work distribution. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- oap-server/server-core/pom.xml | 5 + .../analysis/worker/MetricsAggregateMALWorker.java | 65 ------- .../analysis/worker/MetricsAggregateOALWorker.java | 48 ----- .../analysis/worker/MetricsAggregateWorker.java | 136 ++++++-------- .../analysis/worker/MetricsStreamProcessor.java | 15 +- .../library-batch-queue/MIGRATION.md | 196 +++++++++++++++++++++ 6 files changed, 257 insertions(+), 208 deletions(-) diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml index f6d233587a..94889deaea 100644 --- a/oap-server/server-core/pom.xml +++ b/oap-server/server-core/pom.xml @@ -79,6 +79,11 @@ <artifactId>library-datacarrier-queue</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>library-batch-queue</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.skywalking</groupId> <artifactId>apm-network</artifactId> diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java deleted file mode 100644 index 6ce0bb447d..0000000000 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.core.analysis.worker; - -import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; -import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; -import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; - -/** - * MetricsAggregateMALWorker provides an in-memory metrics merging capability for MAL - */ -@Slf4j -public class MetricsAggregateMALWorker extends MetricsAggregateWorker { - private final static String POOL_NAME = "METRICS_L1_AGGREGATION_MAL"; - private final BulkConsumePool pool; - - MetricsAggregateMALWorker(ModuleDefineHolder moduleDefineHolder, - AbstractWorker<Metrics> nextWorker, - String modelName, - long l1FlushPeriod, - MetricStreamKind kind) { - super( - moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind, - POOL_NAME, - calculatePoolSize(), - true, - 1, - 1_000 - ); - this.pool = (BulkConsumePool) ConsumerPoolFactory.INSTANCE.get(POOL_NAME); - } - - /** - * MetricsAggregateWorker#in operation does include enqueue only - */ - @Override - public final void in(Metrics metrics) { - super.in(metrics); - pool.notifyConsumers(); - } - - private static int calculatePoolSize() { - int size = BulkConsumePool.Creator.recommendMaxSize() / 8; - return size == 0 ? 1 : size; - } -} \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java deleted file mode 100644 index 833b24419a..0000000000 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.core.analysis.worker; - -import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; -import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; -import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; - -/** - * MetricsAggregateOALWorker provides an in-memory metrics merging capability for OAL - */ -@Slf4j -public class MetricsAggregateOALWorker extends MetricsAggregateWorker { - private final static String POOL_NAME = "METRICS_L1_AGGREGATION_OAL"; - - MetricsAggregateOALWorker(ModuleDefineHolder moduleDefineHolder, - AbstractWorker<Metrics> nextWorker, - String modelName, - long l1FlushPeriod, - MetricStreamKind kind) { - super( - moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind, - POOL_NAME, - (int) Math.ceil(BulkConsumePool.Creator.recommendMaxSize() * 1.5), - false, - 2, - 10_000 - ); - } -} \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index b0a8bffa34..ff69e6f8ec 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -18,23 +18,21 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import java.util.Arrays; import java.util.List; import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; -import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy; -import org.apache.skywalking.oap.server.library.datacarrier.buffer.QueueBuffer; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueConfig; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueManager; +import org.apache.skywalking.oap.server.library.batchqueue.BufferStrategy; +import org.apache.skywalking.oap.server.library.batchqueue.HandlerConsumer; +import org.apache.skywalking.oap.server.library.batchqueue.PartitionPolicy; +import org.apache.skywalking.oap.server.library.batchqueue.ThreadPolicy; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; -import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; @@ -43,46 +41,47 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; * it merges the data just after the receiver analysis. The metrics belonging to the same entity, metrics type and time * bucket, the L1 aggregation will merge them into one metrics object to reduce the unnecessary memory and network * payload. + * + * <p>All metric types (OAL and MAL) share a single {@link BatchQueue} with adaptive partitioning. + * The {@code typeHash()} partition selector ensures same metric class lands on the same partition, + * so each handler's {@link MergableBufferedData} is only accessed by one drain thread. */ @Slf4j -public abstract class MetricsAggregateWorker extends AbstractWorker<Metrics> { - public final long l1FlushPeriod; - private AbstractWorker<Metrics> nextWorker; - private final DataCarrier<Metrics> dataCarrier; +public class MetricsAggregateWorker extends AbstractWorker<Metrics> { + private static final String L1_QUEUE_NAME = "METRICS_L1_AGGREGATION"; + private static final BatchQueueConfig<Metrics> L1_QUEUE_CONFIG = + BatchQueueConfig.<Metrics>builder() + .threads(ThreadPolicy.cpuCores(1.0)) + .partitions(PartitionPolicy.adaptive()) + .bufferSize(20_000) + .strategy(BufferStrategy.IF_POSSIBLE) + .minIdleMs(1) + .maxIdleMs(50) + .build(); + + private final BatchQueue<Metrics> l1Queue; + private final long l1FlushPeriod; + private final AbstractWorker<Metrics> nextWorker; private final MergableBufferedData<Metrics> mergeDataCache; - private CounterMetrics abandonCounter; - private CounterMetrics aggregationCounter; - private GaugeMetrics queuePercentageGauge; + private final CounterMetrics abandonCounter; + private final CounterMetrics aggregationCounter; + // TODO: add queue usage telemetry after per-partition metrics are designed private long lastSendTime = 0; - private final int queueTotalSize; - MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, - AbstractWorker<Metrics> nextWorker, - String modelName, - long l1FlushPeriod, - MetricStreamKind kind, - String poolName, - int poolSize, - boolean isSignalDrivenMode, - int queueChannelSize, - int queueBufferSize - ) { + MetricsAggregateWorker(final ModuleDefineHolder moduleDefineHolder, + final AbstractWorker<Metrics> nextWorker, + final String modelName, + final long l1FlushPeriod, + final Class<? extends Metrics> metricsClass) { super(moduleDefineHolder); this.nextWorker = nextWorker; - this.mergeDataCache = new MergableBufferedData(); - BulkConsumePool.Creator creator = new BulkConsumePool.Creator(poolName, poolSize, 200, isSignalDrivenMode); - this.dataCarrier = new DataCarrier<>( - "MetricsAggregateWorker." + modelName, poolName, queueChannelSize, queueBufferSize, BufferStrategy.IF_POSSIBLE); - try { - ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator); - } catch (Exception e) { - throw new UnexpectedException(e.getMessage(), e); - } - this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), new AggregatorConsumer()); + this.mergeDataCache = new MergableBufferedData<>(); + this.l1FlushPeriod = l1FlushPeriod; + this.l1Queue = BatchQueueManager.create(L1_QUEUE_NAME, L1_QUEUE_CONFIG); - MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME) - .provider() - .getService(MetricsCreator.class); + final MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME) + .provider() + .getService(MetricsCreator.class); abandonCounter = metricsCreator.createCounter( "metrics_aggregator_abandon", "The abandon number of rows received in aggregation.", new MetricsTag.Keys("metricName", "level", "dimensionality"), @@ -93,69 +92,42 @@ public abstract class MetricsAggregateWorker extends AbstractWorker<Metrics> { new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "1", "minute") ); - queuePercentageGauge = metricsCreator.createGauge( - "metrics_aggregation_queue_used_percentage", "The percentage of queue used in aggregation.", - new MetricsTag.Keys("metricName", "level", "kind"), - new MetricsTag.Values(modelName, "1", kind.name()) - ); - this.l1FlushPeriod = l1FlushPeriod; - queueTotalSize = Arrays.stream(dataCarrier.getChannels().getBufferChannels()) - .mapToInt(QueueBuffer::getBufferSize) - .sum(); + + l1Queue.addHandler(metricsClass, new L1Handler()); } - /** - * MetricsAggregateWorker#in operation does include enqueue only - */ @Override - public void in(Metrics metrics) { - if (!dataCarrier.produce(metrics)) { + public void in(final Metrics metrics) { + if (!l1Queue.produce(metrics)) { abandonCounter.inc(); } } - /** - * Dequeue consuming. According to {@link IConsumer#consume(List)}, this is a serial operation for every work - * instance. - * - * @param metricsList from the queue. - */ - private void onWork(List<Metrics> metricsList) { - metricsList.forEach(metrics -> { + private void onWork(final List<Metrics> metricsList) { + for (final Metrics metrics : metricsList) { aggregationCounter.inc(); mergeDataCache.accept(metrics); - }); - + } flush(); } private void flush() { - long currentTime = System.currentTimeMillis(); + final long currentTime = System.currentTimeMillis(); if (currentTime - lastSendTime > l1FlushPeriod) { - mergeDataCache.read().forEach( - data -> { - nextWorker.in(data); - } - ); + mergeDataCache.read().forEach(nextWorker::in); lastSendTime = currentTime; } } - protected class AggregatorConsumer implements IConsumer<Metrics> { - @Override - public void consume(List<Metrics> data) { - queuePercentageGauge.setValue(Math.round(100 * (double) data.size() / queueTotalSize)); - MetricsAggregateWorker.this.onWork(data); - } - + private class L1Handler implements HandlerConsumer<Metrics> { @Override - public void onError(List<Metrics> data, Throwable t) { - log.error(t.getMessage(), t); + public void consume(final List<Metrics> data) { + onWork(data); } @Override - public void nothingToConsume() { + public void onIdle() { flush(); } } -} \ No newline at end of file +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index fe70ebdd7f..38361ebd10 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -194,19 +194,8 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, kind, metricsClass); MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName); - MetricsAggregateWorker aggregateWorker; - switch (kind) { - case OAL: - aggregateWorker = new MetricsAggregateOALWorker( - moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod, kind); - break; - case MAL: - aggregateWorker = new MetricsAggregateMALWorker( - moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod, kind); - break; - default: - throw new IllegalArgumentException("Unsupported MetricStreamKind: " + kind); - } + MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker( + moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod, metricsClass); entryWorkers.put(metricsClass, aggregateWorker); } diff --git a/oap-server/server-library/library-batch-queue/MIGRATION.md b/oap-server/server-library/library-batch-queue/MIGRATION.md new file mode 100644 index 0000000000..3cdc251ec3 --- /dev/null +++ b/oap-server/server-library/library-batch-queue/MIGRATION.md @@ -0,0 +1,196 @@ +# DataCarrier to BatchQueue Migration + +This document tracks the step-by-step replacement of DataCarrier with BatchQueue +across the SkyWalking OAP codebase. Each section covers one replacement target +with its current state, proposed changes, and key parameters. + +## 1. L1 Metrics Aggregation (MetricsAggregateWorker) + +### Current Architecture + +Each metric type (~620 OAL + ~100 MAL) creates its own `MetricsAggregateWorker` +with a dedicated `DataCarrier`. OAL and MAL workers use separate +`BulkConsumePool` instances with different thread counts and configurations, +despite doing the same work (merge + flush). + +``` +MetricsStreamProcessor.in(metrics) + └─ entryWorkers.get(metrics.getClass()) // one worker per metric type + └─ worker.in(metrics) + └─ dataCarrier.produce(metrics) // one DataCarrier per worker + └─ BulkConsumePool drains // OAL pool or MAL pool + └─ AggregatorConsumer.consume(batch) + └─ mergeDataCache.accept(each) // per-worker merge cache + └─ flush() if l1FlushPeriod elapsed +``` + +**Files involved:** +- `server-core/.../worker/MetricsAggregateWorker.java` — base class, owns DataCarrier + merge cache +- `server-core/.../worker/MetricsAggregateOALWorker.java` — OAL pool config +- `server-core/.../worker/MetricsAggregateMALWorker.java` — MAL pool config + signal-driven +- `server-core/.../worker/MetricsStreamProcessor.java` — creates workers, routes `in()` calls + +**Current parameters (8-core machine):** + +| Parameter | OAL pool | MAL pool | +|-----------|----------|----------| +| Pool name | `METRICS_L1_AGGREGATION_OAL` | `METRICS_L1_AGGREGATION_MAL` | +| Pool threads | `ceil(cores * 3)` = 24 | `cores / 4` = 2 | +| Signal-driven | no (200ms poll cycle) | yes (`notifyConsumers()` per `in()`) | +| Channels per type | 2 | 1 | +| Buffer per channel | 10,000 | 1,000 | +| Total buffer per type | 20,000 | 1,000 | +| Strategy | IF_POSSIBLE | IF_POSSIBLE | +| **Total threads** | **26** | | +| **Total DataCarriers** | **~720** | | + +**Per-worker state:** +- `MergableBufferedData<Metrics> mergeDataCache` — not thread-safe, merge cache +- `long lastSendTime` — tracks l1FlushPeriod +- `AbstractWorker<Metrics> nextWorker` — MetricsRemoteWorker +- `CounterMetrics abandonCounter` — tagged by metricName, level=1, dimensionality=minute +- `CounterMetrics aggregationCounter` — tagged by metricName, level=1, dimensionality=minute +- `GaugeMetrics queuePercentageGauge` — tagged by metricName, level=1, kind=OAL/MAL + +### Proposed: Single BatchQueue for All Metric Types + +OAL and MAL use the same queue. There is no reason to separate them — the +aggregation logic (merge + periodic flush) is identical. Splitting by kind only +wastes threads and complicates configuration. `MetricStreamKind` becomes +irrelevant at the L1 aggregation layer. + +One `BatchQueue<Metrics>` with adaptive partitioning handles all ~720 metric +types. Each type registers a handler via `addHandler()`. + +**BatchQueue parameters:** + +| Parameter | Value | Rationale | +|-----------|-------|-----------| +| Queue name | `METRICS_L1_AGGREGATION` | Single queue for all metric types (OAL + MAL) | +| ThreadPolicy | `cpuCores(1.0)` | 8 threads on 8-core. Benchmark: 8 threads + adaptive beats 26-thread DataCarrier by 34-68% | +| PartitionPolicy | `adaptive()` (multiplier=25) | threshold=8*25=200. ~720 types → 200+520/2 = 460 partitions | +| BufferSize | 20,000 | Matches current OAL per-type capacity | +| Strategy | `IF_POSSIBLE` | Same as current — drop if full | +| PartitionSelector | `typeHash()` (default) | Same type → same partition → single-thread access to merge cache | +| minIdleMs | 1 | Fast reaction (current MAL used signal-driven wakeup; 1ms backoff is equivalent) | +| maxIdleMs | 50 | Backoff cap when idle | + +**Thread-safety guarantee:** `typeHash()` ensures same metric class → same +partition → same drain thread. Each handler's `MergableBufferedData` is only +accessed by one thread, preserving the current invariant without synchronization. + +**Memory comparison (8-core, ~720 types):** + +| | Current (OAL + MAL pools) | Proposed (single queue) | +|---|---------------------------|-------------------------| +| Queues | ~1340 channels | 460 partitions | +| Buffer slots | ~12.5M | ~9.2M | +| Threads | 26 | 8 | + +### Telemetry + +| Metric | Plan | +|--------|------| +| `metrics_aggregator_abandon` | Keep. Per-type counter in handler. Incremented when `produce()` returns false. | +| `metrics_aggregation` | Keep. Per-type counter in handler. Incremented per item in `consume()`. | +| `metrics_aggregation_queue_used_percentage` | TODO: Redesign for shared-partition model. Leave as placeholder. | + +### Replacement Steps + +#### Step 1: Create L1 aggregation handler + +Create a handler class that captures the per-type state currently spread across +`MetricsAggregateWorker` / `MetricsAggregateOALWorker` / `MetricsAggregateMALWorker`. +No OAL/MAL distinction — the handler is the same for all metric types. + +``` +MetricsL1Handler implements HandlerConsumer<Metrics> + - MergableBufferedData<Metrics> mergeDataCache + - long lastSendTime + - long l1FlushPeriod + - AbstractWorker<Metrics> nextWorker + - CounterMetrics aggregationCounter + + consume(List<Metrics> batch): + for each metric: aggregationCounter.inc(); mergeDataCache.accept(metric) + flush() + + onIdle(): + flush() + + flush(): + if currentTime - lastSendTime > l1FlushPeriod: + mergeDataCache.read().forEach(nextWorker::in) + lastSendTime = currentTime +``` + +#### Step 2: Create the shared BatchQueue in MetricsStreamProcessor + +One queue, created once, shared by all metric types regardless of kind: + +```java +BatchQueue<Metrics> l1Queue = BatchQueueManager.create( + "METRICS_L1_AGGREGATION", + BatchQueueConfig.<Metrics>builder() + .threads(ThreadPolicy.cpuCores(1.0)) + .partitions(PartitionPolicy.adaptive()) + .bufferSize(20_000) + .strategy(BufferStrategy.IF_POSSIBLE) + .minIdleMs(1) + .maxIdleMs(50) + .build()); +``` + +#### Step 3: Register handlers per metric type + +In `MetricsStreamProcessor.create()`, replace the OAL/MAL worker switch with +a single handler registration. The `kind` parameter is no longer needed: + +```java +// Replace: +// switch (kind) { +// case OAL: aggregateWorker = new MetricsAggregateOALWorker(...); break; +// case MAL: aggregateWorker = new MetricsAggregateMALWorker(...); break; +// } +// entryWorkers.put(metricsClass, aggregateWorker); + +// With: +MetricsL1Handler handler = new MetricsL1Handler( + remoteWorker, l1FlushPeriod, modelName, metricsCreator); +l1Queue.addHandler(metricsClass, handler); +``` + +#### Step 4: Replace `in()` routing + +The `MetricsStreamProcessor.in()` method no longer looks up a per-type worker. +It produces directly to the queue: + +```java +// Replace: +// MetricsAggregateWorker worker = entryWorkers.get(metrics.getClass()); +// if (worker != null) { worker.in(metrics); } + +// With: +if (!l1Queue.produce(metrics)) { + abandonCounters.get(metrics.getClass()).inc(); +} +``` + +`abandonCounters` is a `Map<Class<?>, CounterMetrics>` populated during +handler registration (Step 3). + +#### Step 5: Verify + +- All existing unit tests for MetricsStreamProcessor pass +- E2E tests with metrics aggregation pass +- Telemetry counters (abandon, aggregation) still reported per metric type + +--- + +## 2. L2 Metrics Persistence (MetricsPersistentMinWorker) + +*To be planned after L1 replacement is complete.* + +## 3. Individual Consumer Replacements + +*To be planned: GRPCRemoteClient, TopNWorker, exporters, JDBCBatchDAO.*
