This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch library-batch-queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit c86216c5d3765a5d3ad98908ef79c2a2b2e83a84
Author: Wu Sheng <[email protected]>
AuthorDate: Sat Feb 14 21:04:27 2026 +0800

    Replace DataCarrier with BatchQueue for L2 metrics persistence
    
    - Replace DataCarrier in MetricsPersistentMinWorker with shared
      BatchQueue (METRICS_L2_PERSISTENCE) using adaptive partitioning
    - Add ThreadPolicy.cpuCoresWithBase(base, multiplier) API for
      L2 thread scaling (4/6/8 threads on 8/16/24 cores)
    - Remove MetricsPersistentMinOALWorker and MetricsPersistentMinMALWorker
      subclasses — single unified worker for all metric types
    - Add BatchQueueStats for queue usage telemetry (total + top-N
      partition usage), applied to both L1 and L2 workers
    - Update Grafana dashboards and otel-rules for new telemetry model
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 docs/en/setup/backend/grafana-cluster.json         | 222 +--------------------
 docs/en/setup/backend/grafana-instance.json        | 212 +-------------------
 .../analysis/worker/MetricsAggregateWorker.java    |  46 ++++-
 .../worker/MetricsPersistentMinMALWorker.java      |  62 ------
 .../worker/MetricsPersistentMinOALWorker.java      |  52 -----
 .../worker/MetricsPersistentMinWorker.java         | 128 +++++++-----
 .../analysis/worker/MetricsStreamProcessor.java    |  26 +--
 .../library-batch-queue/MIGRATION.md               | 196 ------------------
 .../oap/server/library/batchqueue/BatchQueue.java  |  12 ++
 .../server/library/batchqueue/BatchQueueStats.java | 133 ++++++++++++
 .../server/library/batchqueue/ThreadPolicy.java    |  34 +++-
 .../server/library/batchqueue/BatchQueueTest.java  | 190 ++++++++++++++++++
 .../library/batchqueue/ThreadPolicyTest.java       |  32 +++
 .../src/main/resources/otel-rules/oap.yaml         |   2 +-
 .../so11y_oap/so11y-instance.json                  |  60 +-----
 15 files changed, 553 insertions(+), 854 deletions(-)

diff --git a/docs/en/setup/backend/grafana-cluster.json 
b/docs/en/setup/backend/grafana-cluster.json
index ea60f34863..3a61beb9eb 100644
--- a/docs/en/setup/backend/grafana-cluster.json
+++ b/docs/en/setup/backend/grafana-cluster.json
@@ -6693,17 +6693,17 @@
             "uid": "$datasource"
           },
           "editorMode": "code",
-          "expr": "topk(10,avg 
by(metricName)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"1\",kind=\"OAL\"}))",
+          "expr": "avg 
by(slot)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"1\"})",
           "format": "time_series",
           "hide": false,
           "interval": "",
           "intervalFactor": 1,
-          "legendFormat": "{{metricName}}",
+          "legendFormat": "{{slot}}",
           "range": true,
           "refId": "A"
         }
       ],
-      "title": "OAL L1 Aggregation Queue Percentage (%)",
+      "title": "L1 Aggregation Queue Percentage (%)",
       "type": "timeseries"
     },
     {
@@ -6793,17 +6793,17 @@
             "uid": "$datasource"
           },
           "editorMode": "code",
-          "expr": "topk(10,avg 
by(metricName)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"1\",kind=\"MAL\"}))",
+          "expr": "avg 
by(slot)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"2\"})",
           "format": "time_series",
           "hide": false,
           "interval": "",
           "intervalFactor": 1,
-          "legendFormat": "{{metricName}}",
+          "legendFormat": "{{slot}}",
           "range": true,
           "refId": "A"
         }
       ],
-      "title": "MAL L1 Aggregation Queue Percentage (%)",
+      "title": "L2 Aggregation Queue Percentage (%)",
       "type": "timeseries"
     },
     {
@@ -6871,206 +6871,6 @@
         "x": 8,
         "y": 122
       },
-      "id": 150,
-      "interval": "1m",
-      "options": {
-        "dataLinks": [],
-        "legend": {
-          "calcs": [],
-          "displayMode": "list",
-          "placement": "bottom",
-          "showLegend": true
-        },
-        "tooltip": {
-          "mode": "multi",
-          "sort": "none"
-        }
-      },
-      "pluginVersion": "11.3.1",
-      "targets": [
-        {
-          "datasource": {
-            "uid": "$datasource"
-          },
-          "editorMode": "code",
-          "expr": "topk(10,avg 
by(metricName)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"2\",kind=\"OAL\"}))",
-          "format": "time_series",
-          "hide": false,
-          "interval": "",
-          "intervalFactor": 1,
-          "legendFormat": "{{metricName}}",
-          "range": true,
-          "refId": "A"
-        }
-      ],
-      "title": "OAL L2 Aggregation Queue Percentage (%)",
-      "type": "timeseries"
-    },
-    {
-      "datasource": {
-        "uid": "$datasource"
-      },
-      "fieldConfig": {
-        "defaults": {
-          "color": {
-            "mode": "palette-classic"
-          },
-          "custom": {
-            "axisBorderShow": false,
-            "axisCenteredZero": false,
-            "axisColorMode": "text",
-            "axisLabel": "",
-            "axisPlacement": "auto",
-            "barAlignment": 0,
-            "barWidthFactor": 0.6,
-            "drawStyle": "line",
-            "fillOpacity": 10,
-            "gradientMode": "none",
-            "hideFrom": {
-              "legend": false,
-              "tooltip": false,
-              "viz": false
-            },
-            "insertNulls": false,
-            "lineInterpolation": "linear",
-            "lineWidth": 1,
-            "pointSize": 5,
-            "scaleDistribution": {
-              "type": "linear"
-            },
-            "showPoints": "never",
-            "spanNulls": false,
-            "stacking": {
-              "group": "A",
-              "mode": "none"
-            },
-            "thresholdsStyle": {
-              "mode": "off"
-            }
-          },
-          "mappings": [],
-          "thresholds": {
-            "mode": "absolute",
-            "steps": [
-              {
-                "color": "green",
-                "value": null
-              },
-              {
-                "color": "red",
-                "value": 80
-              }
-            ]
-          }
-        },
-        "overrides": []
-      },
-      "gridPos": {
-        "h": 6,
-        "w": 8,
-        "x": 16,
-        "y": 122
-      },
-      "id": 151,
-      "interval": "1m",
-      "options": {
-        "dataLinks": [],
-        "legend": {
-          "calcs": [],
-          "displayMode": "list",
-          "placement": "bottom",
-          "showLegend": true
-        },
-        "tooltip": {
-          "mode": "multi",
-          "sort": "none"
-        }
-      },
-      "pluginVersion": "11.3.1",
-      "targets": [
-        {
-          "datasource": {
-            "uid": "$datasource"
-          },
-          "editorMode": "code",
-          "expr": "topk(10,avg 
by(metricName)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"2\",kind=\"MAL\"}))",
-          "format": "time_series",
-          "hide": false,
-          "interval": "",
-          "intervalFactor": 1,
-          "legendFormat": "{{metricName}}",
-          "range": true,
-          "refId": "A"
-        }
-      ],
-      "title": "MAL L2 Aggregation Queue Percentage (%)",
-      "type": "timeseries"
-    },
-    {
-      "datasource": {
-        "uid": "$datasource"
-      },
-      "fieldConfig": {
-        "defaults": {
-          "color": {
-            "mode": "palette-classic"
-          },
-          "custom": {
-            "axisBorderShow": false,
-            "axisCenteredZero": false,
-            "axisColorMode": "text",
-            "axisLabel": "",
-            "axisPlacement": "auto",
-            "barAlignment": 0,
-            "barWidthFactor": 0.6,
-            "drawStyle": "line",
-            "fillOpacity": 10,
-            "gradientMode": "none",
-            "hideFrom": {
-              "legend": false,
-              "tooltip": false,
-              "viz": false
-            },
-            "insertNulls": false,
-            "lineInterpolation": "linear",
-            "lineWidth": 1,
-            "pointSize": 5,
-            "scaleDistribution": {
-              "type": "linear"
-            },
-            "showPoints": "never",
-            "spanNulls": false,
-            "stacking": {
-              "group": "A",
-              "mode": "none"
-            },
-            "thresholdsStyle": {
-              "mode": "off"
-            }
-          },
-          "mappings": [],
-          "thresholds": {
-            "mode": "absolute",
-            "steps": [
-              {
-                "color": "green",
-                "value": null
-              },
-              {
-                "color": "red",
-                "value": 80
-              }
-            ]
-          }
-        },
-        "overrides": []
-      },
-      "gridPos": {
-        "h": 6,
-        "w": 8,
-        "x": 0,
-        "y": 128
-      },
       "id": 149,
       "interval": "1m",
       "options": {
@@ -7168,8 +6968,8 @@
       "gridPos": {
         "h": 6,
         "w": 8,
-        "x": 8,
-        "y": 128
+        "x": 16,
+        "y": 122
       },
       "id": 152,
       "interval": "1m",
@@ -7269,7 +7069,7 @@
       "gridPos": {
         "h": 6,
         "w": 8,
-        "x": 16,
+        "x": 0,
         "y": 128
       },
       "id": 146,
@@ -7421,8 +7221,8 @@
       "gridPos": {
         "h": 6,
         "w": 8,
-        "x": 0,
-        "y": 134
+        "x": 8,
+        "y": 128
       },
       "id": 145,
       "interval": "1m",
diff --git a/docs/en/setup/backend/grafana-instance.json 
b/docs/en/setup/backend/grafana-instance.json
index 4c78f5c313..ef2070fa1d 100644
--- a/docs/en/setup/backend/grafana-instance.json
+++ b/docs/en/setup/backend/grafana-instance.json
@@ -7518,17 +7518,17 @@
             "uid": "$datasource"
           },
           "editorMode": "code",
-          "expr": 
"topk(10,metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"1\",kind=\"OAL\"})",
+          "expr": 
"metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"1\"}",
           "format": "time_series",
           "hide": false,
           "interval": "1m",
           "intervalFactor": 1,
-          "legendFormat": "{{metricName}}",
+          "legendFormat": "{{slot}}",
           "range": true,
           "refId": "A"
         }
       ],
-      "title": "OAL L1 Aggregation Queue Percentage (%)",
+      "title": "L1 Aggregation Queue Percentage (%)",
       "type": "timeseries"
     },
     {
@@ -7618,217 +7618,17 @@
             "uid": "$datasource"
           },
           "editorMode": "code",
-          "expr": 
"topk(10,metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"2\",kind=\"OAL\"})",
-          "format": "time_series",
-          "hide": false,
-          "interval": "1m",
-          "intervalFactor": 1,
-          "legendFormat": "{{metricName}}",
-          "range": true,
-          "refId": "A"
-        }
-      ],
-      "title": "OAL L2 Aggregation Queue Percentage (%)",
-      "type": "timeseries"
-    },
-    {
-      "datasource": {
-        "uid": "$datasource"
-      },
-      "fieldConfig": {
-        "defaults": {
-          "color": {
-            "mode": "palette-classic"
-          },
-          "custom": {
-            "axisBorderShow": false,
-            "axisCenteredZero": false,
-            "axisColorMode": "text",
-            "axisLabel": "",
-            "axisPlacement": "auto",
-            "barAlignment": 0,
-            "barWidthFactor": 0.6,
-            "drawStyle": "line",
-            "fillOpacity": 10,
-            "gradientMode": "none",
-            "hideFrom": {
-              "legend": false,
-              "tooltip": false,
-              "viz": false
-            },
-            "insertNulls": false,
-            "lineInterpolation": "linear",
-            "lineWidth": 1,
-            "pointSize": 5,
-            "scaleDistribution": {
-              "type": "linear"
-            },
-            "showPoints": "never",
-            "spanNulls": false,
-            "stacking": {
-              "group": "A",
-              "mode": "none"
-            },
-            "thresholdsStyle": {
-              "mode": "off"
-            }
-          },
-          "mappings": [],
-          "thresholds": {
-            "mode": "absolute",
-            "steps": [
-              {
-                "color": "green",
-                "value": null
-              },
-              {
-                "color": "red",
-                "value": 80
-              }
-            ]
-          }
-        },
-        "overrides": []
-      },
-      "gridPos": {
-        "h": 6,
-        "w": 8,
-        "x": 0,
-        "y": 148
-      },
-      "id": 148,
-      "interval": "1m",
-      "options": {
-        "dataLinks": [],
-        "legend": {
-          "calcs": [],
-          "displayMode": "list",
-          "placement": "bottom",
-          "showLegend": true
-        },
-        "tooltip": {
-          "mode": "multi",
-          "sort": "none"
-        }
-      },
-      "pluginVersion": "11.3.1",
-      "targets": [
-        {
-          "datasource": {
-            "uid": "$datasource"
-          },
-          "editorMode": "code",
-          "expr": 
"topk(10,metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"1\",kind=\"MAL\"})",
+          "expr": 
"metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"2\"}",
           "format": "time_series",
           "hide": false,
           "interval": "1m",
           "intervalFactor": 1,
-          "legendFormat": "{{metricName}}",
-          "range": true,
-          "refId": "A"
-        }
-      ],
-      "title": "MAL L1 Aggregation Queue Percentage (%)",
-      "type": "timeseries"
-    },
-    {
-      "datasource": {
-        "uid": "$datasource"
-      },
-      "fieldConfig": {
-        "defaults": {
-          "color": {
-            "mode": "palette-classic"
-          },
-          "custom": {
-            "axisBorderShow": false,
-            "axisCenteredZero": false,
-            "axisColorMode": "text",
-            "axisLabel": "",
-            "axisPlacement": "auto",
-            "barAlignment": 0,
-            "barWidthFactor": 0.6,
-            "drawStyle": "line",
-            "fillOpacity": 10,
-            "gradientMode": "none",
-            "hideFrom": {
-              "legend": false,
-              "tooltip": false,
-              "viz": false
-            },
-            "insertNulls": false,
-            "lineInterpolation": "linear",
-            "lineWidth": 1,
-            "pointSize": 5,
-            "scaleDistribution": {
-              "type": "linear"
-            },
-            "showPoints": "never",
-            "spanNulls": false,
-            "stacking": {
-              "group": "A",
-              "mode": "none"
-            },
-            "thresholdsStyle": {
-              "mode": "off"
-            }
-          },
-          "mappings": [],
-          "thresholds": {
-            "mode": "absolute",
-            "steps": [
-              {
-                "color": "green",
-                "value": null
-              },
-              {
-                "color": "red",
-                "value": 80
-              }
-            ]
-          }
-        },
-        "overrides": []
-      },
-      "gridPos": {
-        "h": 6,
-        "w": 8,
-        "x": 8,
-        "y": 148
-      },
-      "id": 151,
-      "interval": "1m",
-      "options": {
-        "dataLinks": [],
-        "legend": {
-          "calcs": [],
-          "displayMode": "list",
-          "placement": "bottom",
-          "showLegend": true
-        },
-        "tooltip": {
-          "mode": "multi",
-          "sort": "none"
-        }
-      },
-      "pluginVersion": "11.3.1",
-      "targets": [
-        {
-          "datasource": {
-            "uid": "$datasource"
-          },
-          "editorMode": "code",
-          "expr": 
"topk(10,metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"2\",kind=\"MAL\"})",
-          "format": "time_series",
-          "hide": false,
-          "interval": "1m",
-          "intervalFactor": 1,
-          "legendFormat": "{{metricName}}",
+          "legendFormat": "{{slot}}",
           "range": true,
           "refId": "A"
         }
       ],
-      "title": "MAL L2 Aggregation Queue Percentage (%)",
+      "title": "L2 Aggregation Queue Percentage (%)",
       "type": "timeseries"
     },
     {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index ff69e6f8ec..729834cf6b 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -18,7 +18,9 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -26,6 +28,7 @@ import 
org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
 import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueConfig;
 import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueManager;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueStats;
 import org.apache.skywalking.oap.server.library.batchqueue.BufferStrategy;
 import org.apache.skywalking.oap.server.library.batchqueue.HandlerConsumer;
 import org.apache.skywalking.oap.server.library.batchqueue.PartitionPolicy;
@@ -33,6 +36,7 @@ import 
org.apache.skywalking.oap.server.library.batchqueue.ThreadPolicy;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 
@@ -59,13 +63,16 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
             .maxIdleMs(50)
             .build();
 
+    private static final int TOP_N = 10;
+    /** slot label -> gauge instance. Keys: "total", "top1" .. "top10". */
+    private static Map<String, GaugeMetrics> QUEUE_USAGE_GAUGE;
+
     private final BatchQueue<Metrics> l1Queue;
     private final long l1FlushPeriod;
     private final AbstractWorker<Metrics> nextWorker;
     private final MergableBufferedData<Metrics> mergeDataCache;
     private final CounterMetrics abandonCounter;
     private final CounterMetrics aggregationCounter;
-    // TODO: add queue usage telemetry after per-partition metrics are designed
     private long lastSendTime = 0;
 
     MetricsAggregateWorker(final ModuleDefineHolder moduleDefineHolder,
@@ -93,6 +100,25 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
             new MetricsTag.Values(modelName, "1", "minute")
         );
 
+        if (QUEUE_USAGE_GAUGE == null) {
+            final Map<String, GaugeMetrics> gauge = new LinkedHashMap<>();
+            gauge.put("total", metricsCreator.createGauge(
+                "metrics_aggregation_queue_used_percentage",
+                "The percentage of queue used in L1 aggregation.",
+                new MetricsTag.Keys("level", "slot"),
+                new MetricsTag.Values("1", "total")
+            ));
+            for (int i = 1; i <= TOP_N; i++) {
+                gauge.put("top" + i, metricsCreator.createGauge(
+                    "metrics_aggregation_queue_used_percentage",
+                    "The percentage of queue used in L1 aggregation.",
+                    new MetricsTag.Keys("level", "slot"),
+                    new MetricsTag.Values("1", "top" + i)
+                ));
+            }
+            QUEUE_USAGE_GAUGE = gauge;
+        }
+
         l1Queue.addHandler(metricsClass, new L1Handler());
     }
 
@@ -108,9 +134,27 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
             aggregationCounter.inc();
             mergeDataCache.accept(metrics);
         }
+        updateQueueUsageGauges();
         flush();
     }
 
+    private void updateQueueUsageGauges() {
+        final Map<String, GaugeMetrics> gauge = QUEUE_USAGE_GAUGE;
+        if (gauge == null) {
+            return;
+        }
+        final BatchQueueStats stats = l1Queue.stats();
+        gauge.get("total").setValue(stats.totalUsedPercentage());
+        final List<BatchQueueStats.PartitionUsage> topPartitions = 
stats.topN(TOP_N);
+        for (int i = 1; i <= TOP_N; i++) {
+            if (i <= topPartitions.size()) {
+                gauge.get("top" + i).setValue(topPartitions.get(i - 
1).getUsedPercentage());
+            } else {
+                gauge.get("top" + i).setValue(0);
+            }
+        }
+    }
+
     private void flush() {
         final long currentTime = System.currentTimeMillis();
         if (currentTime - lastSendTime > l1FlushPeriod) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java
deleted file mode 100644
index 6ced828694..0000000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.worker;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
-import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
-import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-
-@Slf4j
-public class MetricsPersistentMinMALWorker extends MetricsPersistentMinWorker {
-    private final static String POOL_NAME = "METRICS_L2_AGGREGATION_MAL";
-    private final BulkConsumePool pool;
-
-    MetricsPersistentMinMALWorker(ModuleDefineHolder moduleDefineHolder, Model 
model, IMetricsDAO metricsDAO,
-                                  AbstractWorker<Metrics> nextAlarmWorker, 
AbstractWorker<ExportEvent> nextExportWorker,
-                                  MetricsTransWorker transWorker, boolean 
supportUpdate,
-                                  long storageSessionTimeout, int 
metricsDataTTL, MetricStreamKind kind) {
-        super(
-            moduleDefineHolder, model, metricsDAO, nextAlarmWorker, 
nextExportWorker, transWorker, supportUpdate,
-            storageSessionTimeout, metricsDataTTL, kind,
-            POOL_NAME,
-            calculatePoolSize(),
-            true,
-            1,
-            1000
-        );
-        this.pool = (BulkConsumePool) 
ConsumerPoolFactory.INSTANCE.get(POOL_NAME);
-    }
-
-    @Override
-    public void in(Metrics metrics) {
-        super.in(metrics);
-        pool.notifyConsumers();
-    }
-
-    private static int calculatePoolSize() {
-        int size = BulkConsumePool.Creator.recommendMaxSize() / 16;
-        return size == 0 ? 1 : size;
-    }
-}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java
deleted file mode 100644
index 534b50f9f5..0000000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.worker;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-
-@Slf4j
-public class MetricsPersistentMinOALWorker extends MetricsPersistentMinWorker {
-
-    MetricsPersistentMinOALWorker(ModuleDefineHolder moduleDefineHolder, Model 
model, IMetricsDAO metricsDAO,
-                                  AbstractWorker<Metrics> nextAlarmWorker, 
AbstractWorker<ExportEvent> nextExportWorker,
-                                  MetricsTransWorker transWorker, boolean 
supportUpdate,
-                                  long storageSessionTimeout, int 
metricsDataTTL, MetricStreamKind kind) {
-        super(
-            moduleDefineHolder, model, metricsDAO, nextAlarmWorker, 
nextExportWorker, transWorker, supportUpdate,
-            storageSessionTimeout, metricsDataTTL, kind,
-            "METRICS_L2_AGGREGATION_OAL",
-            calculatePoolSize(),
-            false,
-            1,
-            2000
-        );
-    }
-
-    private static int calculatePoolSize() {
-        int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
-        return size == 0 ? 1 : size;
-    }
-}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
index 2e2f667045..d5dc31d03d 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
@@ -18,22 +18,25 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
 import org.apache.skywalking.oap.server.core.status.ServerStatusService;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
-import org.apache.skywalking.oap.server.library.datacarrier.buffer.QueueBuffer;
-import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
-import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueConfig;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueManager;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueStats;
+import org.apache.skywalking.oap.server.library.batchqueue.BufferStrategy;
+import org.apache.skywalking.oap.server.library.batchqueue.HandlerConsumer;
+import org.apache.skywalking.oap.server.library.batchqueue.PartitionPolicy;
+import org.apache.skywalking.oap.server.library.batchqueue.ThreadPolicy;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
@@ -41,16 +44,32 @@ import 
org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 
 /**
- * MetricsPersistentMinWorker is an extension of {@link 
MetricsPersistentWorker} and focuses on the Minute Metrics data persistent.
+ * MetricsPersistentMinWorker is an extension of {@link 
MetricsPersistentWorker} and focuses on the
+ * Minute Metrics data persistent.
+ *
+ * <p>All metric types (OAL and MAL) share a single {@link BatchQueue} with 
adaptive partitioning.
+ * The {@code typeHash()} partition selector ensures same metric class lands 
on the same partition,
+ * so each handler's {@link 
org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache}
+ * is only accessed by one drain thread.
  */
 @Slf4j
-public abstract class MetricsPersistentMinWorker extends 
MetricsPersistentWorker {
-    private final DataCarrier<Metrics> dataCarrier;
+public class MetricsPersistentMinWorker extends MetricsPersistentWorker {
+    private static final String L2_QUEUE_NAME = "METRICS_L2_PERSISTENCE";
+    private static final BatchQueueConfig<Metrics> L2_QUEUE_CONFIG =
+        BatchQueueConfig.<Metrics>builder()
+            .threads(ThreadPolicy.cpuCoresWithBase(1, 0.25))
+            .partitions(PartitionPolicy.adaptive())
+            .bufferSize(2_000)
+            .strategy(BufferStrategy.IF_POSSIBLE)
+            .minIdleMs(1)
+            .maxIdleMs(50)
+            .build();
 
-    /**
-     * The percentage of queue used in aggregation
-     */
-    private final GaugeMetrics queuePercentageGauge;
+    private static final int TOP_N = 10;
+    /** slot label -> gauge instance. Keys: "total", "top1" .. "top10". */
+    private static Map<String, GaugeMetrics> QUEUE_USAGE_GAUGE;
+
+    private final BatchQueue<Metrics> l2Queue;
 
     /**
      * @since 9.4.0
@@ -59,41 +78,45 @@ public abstract class MetricsPersistentMinWorker extends 
MetricsPersistentWorker
 
     // Not going to expose this as a configuration, only for testing purpose
     private final boolean isTestingTTL = 
"true".equalsIgnoreCase(System.getenv("TESTING_TTL"));
-    private final int queueTotalSize;
 
     MetricsPersistentMinWorker(ModuleDefineHolder moduleDefineHolder, Model 
model, IMetricsDAO metricsDAO,
                                AbstractWorker<Metrics> nextAlarmWorker, 
AbstractWorker<ExportEvent> nextExportWorker,
                                MetricsTransWorker transWorker, boolean 
supportUpdate,
                                long storageSessionTimeout, int metricsDataTTL, 
MetricStreamKind kind,
-                               String poolName, int poolSize, boolean 
isSignalDrivenMode,
-                               int queueChannelSize, int queueBufferSize) {
+                               Class<? extends Metrics> metricsClass) {
         super(
             moduleDefineHolder, model, metricsDAO, nextAlarmWorker, 
nextExportWorker, transWorker, supportUpdate,
             storageSessionTimeout, metricsDataTTL, kind
         );
 
-        BulkConsumePool.Creator creator = new 
BulkConsumePool.Creator(poolName, poolSize, 200, isSignalDrivenMode);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
-        }
-        this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + 
model.getName(), poolName, queueChannelSize, queueBufferSize);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), 
new PersistentConsumer());
+        this.l2Queue = BatchQueueManager.create(L2_QUEUE_NAME, 
L2_QUEUE_CONFIG);
 
-        MetricsCreator metricsCreator = 
moduleDefineHolder.find(TelemetryModule.NAME)
-                                                          .provider()
-                                                          
.getService(MetricsCreator.class);
-        queuePercentageGauge = metricsCreator.createGauge(
-            "metrics_aggregation_queue_used_percentage", "The percentage of 
queue used in aggregation.",
-            new MetricsTag.Keys("metricName", "level", "kind"),
-            new MetricsTag.Values(model.getName(), "2", kind.name())
-        );
         serverStatusService = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServerStatusService.class);
         serverStatusService.registerWatcher(this);
-        queueTotalSize = 
Arrays.stream(dataCarrier.getChannels().getBufferChannels())
-                                    .mapToInt(QueueBuffer::getBufferSize)
-                                    .sum();
+
+        if (QUEUE_USAGE_GAUGE == null) {
+            final MetricsCreator metricsCreator = 
moduleDefineHolder.find(TelemetryModule.NAME)
+                                                                    .provider()
+                                                                    
.getService(MetricsCreator.class);
+            final Map<String, GaugeMetrics> gauge = new LinkedHashMap<>();
+            gauge.put("total", metricsCreator.createGauge(
+                "metrics_aggregation_queue_used_percentage",
+                "The percentage of queue used in L2 persistence.",
+                new MetricsTag.Keys("level", "slot"),
+                new MetricsTag.Values("2", "total")
+            ));
+            for (int i = 1; i <= TOP_N; i++) {
+                gauge.put("top" + i, metricsCreator.createGauge(
+                    "metrics_aggregation_queue_used_percentage",
+                    "The percentage of queue used in L2 persistence.",
+                    new MetricsTag.Keys("level", "slot"),
+                    new MetricsTag.Values("2", "top" + i)
+                ));
+            }
+            QUEUE_USAGE_GAUGE = gauge;
+        }
+
+        l2Queue.addHandler(metricsClass, new L2Handler());
     }
 
     /**
@@ -107,24 +130,31 @@ public abstract class MetricsPersistentMinWorker extends 
MetricsPersistentWorker
             return;
         }
         getAggregationCounter().inc();
-        dataCarrier.produce(metrics);
+        l2Queue.produce(metrics);
     }
 
-    /**
-     * Metrics queue processor, merge the received metrics if existing one 
with same ID(s) and time bucket.
-     *
-     * ID is declared through {@link Object#hashCode()} and {@link 
Object#equals(Object)} as usual.
-     */
-    private class PersistentConsumer implements IConsumer<Metrics> {
-        @Override
-        public void consume(List<Metrics> data) {
-            queuePercentageGauge.setValue(Math.round(100 * (double) 
data.size() / queueTotalSize));
-            MetricsPersistentMinWorker.this.onWork(data);
+    private void updateQueueUsageGauges() {
+        final Map<String, GaugeMetrics> gauge = QUEUE_USAGE_GAUGE;
+        if (gauge == null) {
+            return;
+        }
+        final BatchQueueStats stats = l2Queue.stats();
+        gauge.get("total").setValue(stats.totalUsedPercentage());
+        final List<BatchQueueStats.PartitionUsage> topPartitions = 
stats.topN(TOP_N);
+        for (int i = 1; i <= TOP_N; i++) {
+            if (i <= topPartitions.size()) {
+                gauge.get("top" + i).setValue(topPartitions.get(i - 
1).getUsedPercentage());
+            } else {
+                gauge.get("top" + i).setValue(0);
+            }
         }
+    }
 
+    private class L2Handler implements HandlerConsumer<Metrics> {
         @Override
-        public void onError(List<Metrics> data, Throwable t) {
-            log.error(t.getMessage(), t);
+        public void consume(List<Metrics> data) {
+            updateQueueUsageGauges();
+            onWork(data);
         }
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 38361ebd10..c405bb5b37 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -185,7 +185,7 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
             metricsClass, stream.getScopeId(), new Storage(stream.getName(), 
timeRelativeID, DownSampling.Minute)
         );
         MetricsPersistentWorker minutePersistentWorker = 
minutePersistentWorker(
-            moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate, 
kind);
+            moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate, 
kind, metricsClass);
 
         String remoteReceiverWorkerName = stream.getName() + "_rec";
         IWorkerInstanceSetter workerInstanceSetter = 
moduleDefineHolder.find(CoreModule.NAME)
@@ -204,27 +204,15 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
                                                            Model model,
                                                            MetricsTransWorker 
transWorker,
                                                            boolean 
supportUpdate,
-                                                           MetricStreamKind 
kind) {
+                                                           MetricStreamKind 
kind,
+                                                           Class<? extends 
Metrics> metricsClass) {
         AlarmNotifyWorker alarmNotifyWorker = new 
AlarmNotifyWorker(moduleDefineHolder);
         ExportMetricsWorker exportWorker = new 
ExportMetricsWorker(moduleDefineHolder);
 
-        MetricsPersistentWorker minutePersistentWorker;
-        switch (kind) {
-            case OAL:
-                minutePersistentWorker = new MetricsPersistentMinOALWorker(
-                    moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, 
exportWorker, transWorker,
-                    supportUpdate, storageSessionTimeout, metricsDataTTL, kind
-                );
-                break;
-            case MAL:
-                minutePersistentWorker = new MetricsPersistentMinMALWorker(
-                    moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, 
exportWorker, transWorker,
-                    supportUpdate, storageSessionTimeout, metricsDataTTL, kind
-                );
-                break;
-            default:
-                throw new IllegalArgumentException("Unsupported 
MetricStreamKind: " + kind);
-        }
+        MetricsPersistentMinWorker minutePersistentWorker = new 
MetricsPersistentMinWorker(
+            moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, 
exportWorker, transWorker,
+            supportUpdate, storageSessionTimeout, metricsDataTTL, kind, 
metricsClass
+        );
         persistentWorkers.add(minutePersistentWorker);
 
         return minutePersistentWorker;
diff --git a/oap-server/server-library/library-batch-queue/MIGRATION.md 
b/oap-server/server-library/library-batch-queue/MIGRATION.md
deleted file mode 100644
index 3cdc251ec3..0000000000
--- a/oap-server/server-library/library-batch-queue/MIGRATION.md
+++ /dev/null
@@ -1,196 +0,0 @@
-# DataCarrier to BatchQueue Migration
-
-This document tracks the step-by-step replacement of DataCarrier with 
BatchQueue
-across the SkyWalking OAP codebase. Each section covers one replacement target
-with its current state, proposed changes, and key parameters.
-
-## 1. L1 Metrics Aggregation (MetricsAggregateWorker)
-
-### Current Architecture
-
-Each metric type (~620 OAL + ~100 MAL) creates its own `MetricsAggregateWorker`
-with a dedicated `DataCarrier`. OAL and MAL workers use separate
-`BulkConsumePool` instances with different thread counts and configurations,
-despite doing the same work (merge + flush).
-
-```
-MetricsStreamProcessor.in(metrics)
-  └─ entryWorkers.get(metrics.getClass())    // one worker per metric type
-       └─ worker.in(metrics)
-            └─ dataCarrier.produce(metrics)   // one DataCarrier per worker
-                 └─ BulkConsumePool drains    // OAL pool or MAL pool
-                      └─ AggregatorConsumer.consume(batch)
-                           └─ mergeDataCache.accept(each)  // per-worker merge 
cache
-                           └─ flush() if l1FlushPeriod elapsed
-```
-
-**Files involved:**
-- `server-core/.../worker/MetricsAggregateWorker.java` — base class, owns 
DataCarrier + merge cache
-- `server-core/.../worker/MetricsAggregateOALWorker.java` — OAL pool config
-- `server-core/.../worker/MetricsAggregateMALWorker.java` — MAL pool config + 
signal-driven
-- `server-core/.../worker/MetricsStreamProcessor.java` — creates workers, 
routes `in()` calls
-
-**Current parameters (8-core machine):**
-
-| Parameter | OAL pool | MAL pool |
-|-----------|----------|----------|
-| Pool name | `METRICS_L1_AGGREGATION_OAL` | `METRICS_L1_AGGREGATION_MAL` |
-| Pool threads | `ceil(cores * 3)` = 24 | `cores / 4` = 2 |
-| Signal-driven | no (200ms poll cycle) | yes (`notifyConsumers()` per `in()`) 
|
-| Channels per type | 2 | 1 |
-| Buffer per channel | 10,000 | 1,000 |
-| Total buffer per type | 20,000 | 1,000 |
-| Strategy | IF_POSSIBLE | IF_POSSIBLE |
-| **Total threads** | **26** | |
-| **Total DataCarriers** | **~720** | |
-
-**Per-worker state:**
-- `MergableBufferedData<Metrics> mergeDataCache` — not thread-safe, merge cache
-- `long lastSendTime` — tracks l1FlushPeriod
-- `AbstractWorker<Metrics> nextWorker` — MetricsRemoteWorker
-- `CounterMetrics abandonCounter` — tagged by metricName, level=1, 
dimensionality=minute
-- `CounterMetrics aggregationCounter` — tagged by metricName, level=1, 
dimensionality=minute
-- `GaugeMetrics queuePercentageGauge` — tagged by metricName, level=1, 
kind=OAL/MAL
-
-### Proposed: Single BatchQueue for All Metric Types
-
-OAL and MAL use the same queue. There is no reason to separate them — the
-aggregation logic (merge + periodic flush) is identical. Splitting by kind only
-wastes threads and complicates configuration. `MetricStreamKind` becomes
-irrelevant at the L1 aggregation layer.
-
-One `BatchQueue<Metrics>` with adaptive partitioning handles all ~720 metric
-types. Each type registers a handler via `addHandler()`.
-
-**BatchQueue parameters:**
-
-| Parameter | Value | Rationale |
-|-----------|-------|-----------|
-| Queue name | `METRICS_L1_AGGREGATION` | Single queue for all metric types 
(OAL + MAL) |
-| ThreadPolicy | `cpuCores(1.0)` | 8 threads on 8-core. Benchmark: 8 threads + 
adaptive beats 26-thread DataCarrier by 34-68% |
-| PartitionPolicy | `adaptive()` (multiplier=25) | threshold=8*25=200. ~720 
types → 200+520/2 = 460 partitions |
-| BufferSize | 20,000 | Matches current OAL per-type capacity |
-| Strategy | `IF_POSSIBLE` | Same as current — drop if full |
-| PartitionSelector | `typeHash()` (default) | Same type → same partition → 
single-thread access to merge cache |
-| minIdleMs | 1 | Fast reaction (current MAL used signal-driven wakeup; 1ms 
backoff is equivalent) |
-| maxIdleMs | 50 | Backoff cap when idle |
-
-**Thread-safety guarantee:** `typeHash()` ensures same metric class → same
-partition → same drain thread. Each handler's `MergableBufferedData` is only
-accessed by one thread, preserving the current invariant without 
synchronization.
-
-**Memory comparison (8-core, ~720 types):**
-
-| | Current (OAL + MAL pools) | Proposed (single queue) |
-|---|---------------------------|-------------------------|
-| Queues | ~1340 channels | 460 partitions |
-| Buffer slots | ~12.5M | ~9.2M |
-| Threads | 26 | 8 |
-
-### Telemetry
-
-| Metric | Plan |
-|--------|------|
-| `metrics_aggregator_abandon` | Keep. Per-type counter in handler. 
Incremented when `produce()` returns false. |
-| `metrics_aggregation` | Keep. Per-type counter in handler. Incremented per 
item in `consume()`. |
-| `metrics_aggregation_queue_used_percentage` | TODO: Redesign for 
shared-partition model. Leave as placeholder. |
-
-### Replacement Steps
-
-#### Step 1: Create L1 aggregation handler
-
-Create a handler class that captures the per-type state currently spread across
-`MetricsAggregateWorker` / `MetricsAggregateOALWorker` / 
`MetricsAggregateMALWorker`.
-No OAL/MAL distinction — the handler is the same for all metric types.
-
-```
-MetricsL1Handler implements HandlerConsumer<Metrics>
-  - MergableBufferedData<Metrics> mergeDataCache
-  - long lastSendTime
-  - long l1FlushPeriod
-  - AbstractWorker<Metrics> nextWorker
-  - CounterMetrics aggregationCounter
-
-  consume(List<Metrics> batch):
-    for each metric: aggregationCounter.inc(); mergeDataCache.accept(metric)
-    flush()
-
-  onIdle():
-    flush()
-
-  flush():
-    if currentTime - lastSendTime > l1FlushPeriod:
-      mergeDataCache.read().forEach(nextWorker::in)
-      lastSendTime = currentTime
-```
-
-#### Step 2: Create the shared BatchQueue in MetricsStreamProcessor
-
-One queue, created once, shared by all metric types regardless of kind:
-
-```java
-BatchQueue<Metrics> l1Queue = BatchQueueManager.create(
-    "METRICS_L1_AGGREGATION",
-    BatchQueueConfig.<Metrics>builder()
-        .threads(ThreadPolicy.cpuCores(1.0))
-        .partitions(PartitionPolicy.adaptive())
-        .bufferSize(20_000)
-        .strategy(BufferStrategy.IF_POSSIBLE)
-        .minIdleMs(1)
-        .maxIdleMs(50)
-        .build());
-```
-
-#### Step 3: Register handlers per metric type
-
-In `MetricsStreamProcessor.create()`, replace the OAL/MAL worker switch with
-a single handler registration. The `kind` parameter is no longer needed:
-
-```java
-// Replace:
-//   switch (kind) {
-//       case OAL:  aggregateWorker = new MetricsAggregateOALWorker(...); 
break;
-//       case MAL:  aggregateWorker = new MetricsAggregateMALWorker(...); 
break;
-//   }
-//   entryWorkers.put(metricsClass, aggregateWorker);
-
-// With:
-MetricsL1Handler handler = new MetricsL1Handler(
-    remoteWorker, l1FlushPeriod, modelName, metricsCreator);
-l1Queue.addHandler(metricsClass, handler);
-```
-
-#### Step 4: Replace `in()` routing
-
-The `MetricsStreamProcessor.in()` method no longer looks up a per-type worker.
-It produces directly to the queue:
-
-```java
-// Replace:
-//   MetricsAggregateWorker worker = entryWorkers.get(metrics.getClass());
-//   if (worker != null) { worker.in(metrics); }
-
-// With:
-if (!l1Queue.produce(metrics)) {
-    abandonCounters.get(metrics.getClass()).inc();
-}
-```
-
-`abandonCounters` is a `Map<Class<?>, CounterMetrics>` populated during
-handler registration (Step 3).
-
-#### Step 5: Verify
-
-- All existing unit tests for MetricsStreamProcessor pass
-- E2E tests with metrics aggregation pass
-- Telemetry counters (abandon, aggregation) still reported per metric type
-
----
-
-## 2. L2 Metrics Persistence (MetricsPersistentMinWorker)
-
-*To be planned after L1 replacement is complete.*
-
-## 3. Individual Consumer Replacements
-
-*To be planned: GRPCRemoteClient, TopNWorker, exporters, JDBCBatchDAO.*
diff --git 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
index f6a117039e..1167fc57a5 100644
--- 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
+++ 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
@@ -513,4 +513,16 @@ public class BatchQueue<T> {
     boolean isDedicatedScheduler() {
         return dedicatedScheduler;
     }
+
+    /**
+     * Take a point-in-time snapshot of queue usage across all partitions.
+     */
+    public BatchQueueStats stats() {
+        final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;
+        final int[] used = new int[currentPartitions.length];
+        for (int i = 0; i < currentPartitions.length; i++) {
+            used[i] = currentPartitions[i].size();
+        }
+        return new BatchQueueStats(currentPartitions.length, 
config.getBufferSize(), used);
+    }
 }
diff --git 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueStats.java
 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueStats.java
new file mode 100644
index 0000000000..2a0b64dff3
--- /dev/null
+++ 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueStats.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.batchqueue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import lombok.Getter;
+
+/**
+ * Immutable snapshot of a {@link BatchQueue}'s partition usage at a point in 
time.
+ *
+ * <p>Provides three levels of detail:
+ * <ul>
+ *   <li><b>Total usage</b> — aggregate across all partitions</li>
+ *   <li><b>Per-partition usage</b> — individual partition sizes</li>
+ *   <li><b>Top N</b> — the most-loaded partitions, sorted by usage 
descending</li>
+ * </ul>
+ */
+public class BatchQueueStats {
+    @Getter
+    private final int partitionCount;
+    @Getter
+    private final int bufferSize;
+    private final int[] partitionUsed;
+
+    BatchQueueStats(final int partitionCount, final int bufferSize, final 
int[] partitionUsed) {
+        this.partitionCount = partitionCount;
+        this.bufferSize = bufferSize;
+        this.partitionUsed = Arrays.copyOf(partitionUsed, 
partitionUsed.length);
+    }
+
+    /**
+     * Total capacity across all partitions: {@code partitionCount * 
bufferSize}.
+     */
+    public long totalCapacity() {
+        return (long) partitionCount * bufferSize;
+    }
+
+    /**
+     * Total number of items currently queued across all partitions.
+     */
+    public int totalUsed() {
+        int sum = 0;
+        for (final int used : partitionUsed) {
+            sum += used;
+        }
+        return sum;
+    }
+
+    /**
+     * Overall queue usage as a percentage (0.0–100.0).
+     */
+    public double totalUsedPercentage() {
+        final long capacity = totalCapacity();
+        if (capacity == 0) {
+            return 0.0;
+        }
+        return 100.0 * totalUsed() / capacity;
+    }
+
+    /**
+     * Number of items currently queued in the given partition.
+     */
+    public int partitionUsed(final int index) {
+        return partitionUsed[index];
+    }
+
+    /**
+     * Usage of the given partition as a percentage (0.0–100.0).
+     */
+    public double partitionUsedPercentage(final int index) {
+        if (bufferSize == 0) {
+            return 0.0;
+        }
+        return 100.0 * partitionUsed[index] / bufferSize;
+    }
+
+    /**
+     * Return the top {@code n} most-loaded partitions, sorted by usage 
descending.
+     * If {@code n >= partitionCount}, all partitions are returned.
+     */
+    public List<PartitionUsage> topN(final int n) {
+        final Integer[] indices = new Integer[partitionCount];
+        for (int i = 0; i < partitionCount; i++) {
+            indices[i] = i;
+        }
+        Arrays.sort(indices, (a, b) -> Integer.compare(partitionUsed[b], 
partitionUsed[a]));
+
+        final int limit = Math.min(n, partitionCount);
+        final List<PartitionUsage> result = new ArrayList<>(limit);
+        for (int i = 0; i < limit; i++) {
+            final int idx = indices[i];
+            result.add(new PartitionUsage(
+                idx, partitionUsed[idx],
+                bufferSize == 0 ? 0.0 : 100.0 * partitionUsed[idx] / bufferSize
+            ));
+        }
+        return result;
+    }
+
+    /**
+     * Usage snapshot for a single partition.
+     */
+    @Getter
+    public static class PartitionUsage {
+        private final int partitionIndex;
+        private final int used;
+        private final double usedPercentage;
+
+        PartitionUsage(final int partitionIndex, final int used, final double 
usedPercentage) {
+            this.partitionIndex = partitionIndex;
+            this.used = used;
+            this.usedPercentage = usedPercentage;
+        }
+    }
+}
diff --git 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java
 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java
index 6b6585debe..43e5f77ef1 100644
--- 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java
+++ 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java
@@ -22,19 +22,22 @@ package org.apache.skywalking.oap.server.library.batchqueue;
  * Determines the number of threads for a BatchQueue's dedicated scheduler
  * or for a shared scheduler.
  *
- * Two modes:
+ * Three modes:
  * - fixed(N): exactly N threads, regardless of hardware.
  * - cpuCores(multiplier): multiplier * Runtime.availableProcessors(), rounded.
+ * - cpuCoresWithBase(base, multiplier): base + multiplier * 
Runtime.availableProcessors(), rounded.
  *
  * Resolved value is always >= 1 — every pool must have at least one thread.
  * fixed() requires count >= 1 at construction. cpuCores() applies max(1, ...) 
at resolution.
  */
 public class ThreadPolicy {
     private final int fixedCount;
+    private final int base;
     private final double cpuMultiplier;
 
-    private ThreadPolicy(final int fixedCount, final double cpuMultiplier) {
+    private ThreadPolicy(final int fixedCount, final int base, final double 
cpuMultiplier) {
         this.fixedCount = fixedCount;
+        this.base = base;
         this.cpuMultiplier = cpuMultiplier;
     }
 
@@ -47,7 +50,7 @@ public class ThreadPolicy {
         if (count < 1) {
             throw new IllegalArgumentException("Thread count must be >= 1, 
got: " + count);
         }
-        return new ThreadPolicy(count, 0);
+        return new ThreadPolicy(count, 0, 0);
     }
 
     /**
@@ -60,7 +63,25 @@ public class ThreadPolicy {
         if (multiplier <= 0) {
             throw new IllegalArgumentException("CPU multiplier must be > 0, 
got: " + multiplier);
         }
-        return new ThreadPolicy(0, multiplier);
+        return new ThreadPolicy(0, 0, multiplier);
+    }
+
+    /**
+     * Threads = base + round(multiplier * available CPU cores), min 1.
+     * Base must be >= 0, multiplier must be > 0.
+     *
+     * Example: cpuCoresWithBase(2, 0.25) on 8-core = 2 + 2 = 4, on 16-core = 
2 + 4 = 6, on 24-core = 2 + 6 = 8.
+     *
+     * @throws IllegalArgumentException if base < 0 or multiplier <= 0
+     */
+    public static ThreadPolicy cpuCoresWithBase(final int base, final double 
multiplier) {
+        if (base < 0) {
+            throw new IllegalArgumentException("Base must be >= 0, got: " + 
base);
+        }
+        if (multiplier <= 0) {
+            throw new IllegalArgumentException("CPU multiplier must be > 0, 
got: " + multiplier);
+        }
+        return new ThreadPolicy(0, base, multiplier);
     }
 
     /**
@@ -70,7 +91,7 @@ public class ThreadPolicy {
         if (fixedCount > 0) {
             return fixedCount;
         }
-        return Math.max(1, (int) Math.round(cpuMultiplier * 
Runtime.getRuntime().availableProcessors()));
+        return Math.max(1, base + (int) Math.round(cpuMultiplier * 
Runtime.getRuntime().availableProcessors()));
     }
 
     @Override
@@ -78,6 +99,9 @@ public class ThreadPolicy {
         if (fixedCount > 0) {
             return "fixed(" + fixedCount + ")";
         }
+        if (base > 0) {
+            return "cpuCoresWithBase(" + base + ", " + cpuMultiplier + ")";
+        }
         return "cpuCores(" + cpuMultiplier + ")";
     }
 }
diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java
index a63bbdf400..1f3795102f 100644
--- 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class BatchQueueTest {
@@ -484,6 +485,195 @@ public class BatchQueueTest {
             "Later gap (" + laterGap + "ms) should be larger than early gap (" 
+ earlyGap + "ms)");
     }
 
+    // --- Stats ---
+
+    @Test
+    public void testStatsReflectsQueueUsage() {
+        final CountDownLatch blockLatch = new CountDownLatch(1);
+        final BatchQueue<String> queue = 
BatchQueueManager.create("stats-usage-test",
+            BatchQueueConfig.<String>builder()
+                .threads(ThreadPolicy.fixed(1))
+                .strategy(BufferStrategy.IF_POSSIBLE)
+                .consumer(data -> {
+                    try {
+                        blockLatch.await();
+                    } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                })
+                .bufferSize(100)
+                .build());
+
+        // Wait for the consumer to block on the first item
+        queue.produce("trigger");
+        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+            .pollInterval(10, TimeUnit.MILLISECONDS)
+            .until(() -> {
+                final BatchQueueStats stats = queue.stats();
+                return stats.totalUsed() == 0; // first item already drained 
into blocked consumer
+            });
+
+        // Produce 10 items — they'll sit in the partition because consumer is 
blocked
+        for (int i = 0; i < 10; i++) {
+            queue.produce("item-" + i);
+        }
+
+        final BatchQueueStats stats = queue.stats();
+        assertEquals(1, stats.getPartitionCount());
+        assertEquals(100, stats.getBufferSize());
+        assertEquals(100, stats.totalCapacity());
+        assertEquals(10, stats.totalUsed());
+        assertEquals(10.0, stats.totalUsedPercentage(), 0.01);
+        assertEquals(10, stats.partitionUsed(0));
+        assertEquals(10.0, stats.partitionUsedPercentage(0), 0.01);
+
+        blockLatch.countDown();
+    }
+
+    @Test
+    public void testStatsWithMultiplePartitions() {
+        final CountDownLatch blockLatch = new CountDownLatch(1);
+        final BatchQueue<String> queue = 
BatchQueueManager.create("stats-multi-partition-test",
+            BatchQueueConfig.<String>builder()
+                .threads(ThreadPolicy.fixed(2))
+                .partitions(PartitionPolicy.fixed(4))
+                .strategy(BufferStrategy.IF_POSSIBLE)
+                .partitionSelector((data, count) -> Integer.parseInt(data) % 
count)
+                .consumer(data -> {
+                    try {
+                        blockLatch.await();
+                    } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                })
+                .bufferSize(50)
+                .build());
+
+        // Wait for drain threads to block
+        queue.produce("0");
+        queue.produce("1");
+        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+            .pollInterval(10, TimeUnit.MILLISECONDS)
+            .until(() -> queue.stats().totalUsed() == 0);
+
+        // Produce items targeting specific partitions: value % 4
+        for (int i = 0; i < 20; i++) {
+            queue.produce(String.valueOf(i));
+        }
+
+        final BatchQueueStats stats = queue.stats();
+        assertEquals(4, stats.getPartitionCount());
+        assertEquals(50, stats.getBufferSize());
+        assertEquals(200, stats.totalCapacity());
+        assertEquals(20, stats.totalUsed());
+        // Each partition gets 5 items (0,4,8,12,16 / 1,5,9,13,17 / 
2,6,10,14,18 / 3,7,11,15,19)
+        for (int p = 0; p < 4; p++) {
+            assertEquals(5, stats.partitionUsed(p));
+            assertEquals(10.0, stats.partitionUsedPercentage(p), 0.01);
+        }
+
+        blockLatch.countDown();
+    }
+
+    @Test
+    public void testStatsSnapshotIsImmutable() {
+        final CountDownLatch blockLatch = new CountDownLatch(1);
+        final BatchQueue<String> queue = 
BatchQueueManager.create("stats-immutable-test",
+            BatchQueueConfig.<String>builder()
+                .threads(ThreadPolicy.fixed(1))
+                .strategy(BufferStrategy.IF_POSSIBLE)
+                .consumer(data -> {
+                    try {
+                        blockLatch.await();
+                    } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                })
+                .bufferSize(100)
+                .build());
+
+        queue.produce("trigger");
+        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+            .pollInterval(10, TimeUnit.MILLISECONDS)
+            .until(() -> queue.stats().totalUsed() == 0);
+
+        for (int i = 0; i < 5; i++) {
+            queue.produce("item-" + i);
+        }
+
+        final BatchQueueStats snapshot = queue.stats();
+        assertEquals(5, snapshot.totalUsed());
+
+        // Produce more — the snapshot should not change
+        for (int i = 0; i < 5; i++) {
+            queue.produce("more-" + i);
+        }
+
+        assertEquals(5, snapshot.totalUsed());
+        assertNotEquals(5, queue.stats().totalUsed());
+
+        blockLatch.countDown();
+    }
+
+    @Test
+    public void testStatsTopNReturnsHottestPartitions() {
+        final CountDownLatch blockLatch = new CountDownLatch(1);
+        final BatchQueue<String> queue = 
BatchQueueManager.create("stats-topn-test",
+            BatchQueueConfig.<String>builder()
+                .threads(ThreadPolicy.fixed(2))
+                .partitions(PartitionPolicy.fixed(4))
+                .strategy(BufferStrategy.IF_POSSIBLE)
+                .partitionSelector((data, count) -> 
Integer.parseInt(data.split("-")[0]) % count)
+                .consumer(data -> {
+                    try {
+                        blockLatch.await();
+                    } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                })
+                .bufferSize(100)
+                .build());
+
+        // Wait for drain threads to block
+        queue.produce("0-trigger");
+        queue.produce("1-trigger");
+        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+            .pollInterval(10, TimeUnit.MILLISECONDS)
+            .until(() -> queue.stats().totalUsed() == 0);
+
+        // Load partitions unevenly: p0=20, p1=5, p2=15, p3=10
+        for (int i = 0; i < 20; i++) {
+            queue.produce("0-" + i);
+        }
+        for (int i = 0; i < 5; i++) {
+            queue.produce("1-" + i);
+        }
+        for (int i = 0; i < 15; i++) {
+            queue.produce("2-" + i);
+        }
+        for (int i = 0; i < 10; i++) {
+            queue.produce("3-" + i);
+        }
+
+        final BatchQueueStats stats = queue.stats();
+        final java.util.List<BatchQueueStats.PartitionUsage> top2 = 
stats.topN(2);
+
+        assertEquals(2, top2.size());
+        // Highest: partition 0 (20 items)
+        assertEquals(0, top2.get(0).getPartitionIndex());
+        assertEquals(20, top2.get(0).getUsed());
+        assertEquals(20.0, top2.get(0).getUsedPercentage(), 0.01);
+        // Second: partition 2 (15 items)
+        assertEquals(2, top2.get(1).getPartitionIndex());
+        assertEquals(15, top2.get(1).getUsed());
+
+        // topN with n > partitionCount returns all
+        final java.util.List<BatchQueueStats.PartitionUsage> topAll = 
stats.topN(10);
+        assertEquals(4, topAll.size());
+
+        blockLatch.countDown();
+    }
+
     @Test
     public void testBackoffResetsOnData() throws Exception {
         final AtomicInteger consumeCount = new AtomicInteger(0);
diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java
index 4a77f7ad43..3cbdeb6c0a 100644
--- 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java
@@ -75,4 +75,36 @@ public class ThreadPolicyTest {
     public void testToStringCpuCores() {
         assertEquals("cpuCores(0.5)", ThreadPolicy.cpuCores(0.5).toString());
     }
+
+    @Test
+    public void testCpuCoresWithBaseAddsBaseToScaled() {
+        final int cores = Runtime.getRuntime().availableProcessors();
+        final int resolved = ThreadPolicy.cpuCoresWithBase(2, 0.25).resolve();
+        assertEquals(2 + (int) Math.round(0.25 * cores), resolved);
+    }
+
+    @Test
+    public void testCpuCoresWithBaseResolvesAtLeastOne() {
+        assertTrue(ThreadPolicy.cpuCoresWithBase(0, 0.001).resolve() >= 1);
+    }
+
+    @Test
+    public void testCpuCoresWithBaseRejectsNegativeBase() {
+        assertThrows(IllegalArgumentException.class, () -> 
ThreadPolicy.cpuCoresWithBase(-1, 0.25));
+    }
+
+    @Test
+    public void testCpuCoresWithBaseRejectsZeroMultiplier() {
+        assertThrows(IllegalArgumentException.class, () -> 
ThreadPolicy.cpuCoresWithBase(2, 0));
+    }
+
+    @Test
+    public void testCpuCoresWithBaseRejectsNegativeMultiplier() {
+        assertThrows(IllegalArgumentException.class, () -> 
ThreadPolicy.cpuCoresWithBase(2, -0.5));
+    }
+
+    @Test
+    public void testToStringCpuCoresWithBase() {
+        assertEquals("cpuCoresWithBase(2, 0.25)", 
ThreadPolicy.cpuCoresWithBase(2, 0.25).toString());
+    }
 }
diff --git a/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml 
b/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml
index 56b1cf9f71..223c6e0cf4 100644
--- a/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml
@@ -69,7 +69,7 @@ metricsRules:
       metrics_aggregation.tagEqual('dimensionality', 'minute').sum(['service', 
'host_name', 'level']).increase('PT1M')
       .tag({tags -> if (tags['level'] == '1') {tags.level = 'L1 aggregation'} 
}).tag({tags -> if (tags['level'] == '2') {tags.level = 'L2 aggregation'} })
   - name: instance_metrics_aggregation_queue_used_percentage
-    exp: metrics_aggregation_queue_used_percentage.sum(['service', 
'host_name', 'level', 'kind', 'metricName'])
+    exp: metrics_aggregation_queue_used_percentage.sum(['service', 
'host_name', 'level', 'slot'])
   - name: instance_persistence_execute_percentile
     exp: persistence_timer_bulk_execute_latency.sum(['le', 'service', 
'host_name']).increase('PT1M').histogram().histogram_percentile([50,75,90,95,99])
   - name: instance_persistence_prepare_percentile
diff --git 
a/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
index 0e630c917a..cfba96f7c0 100644
--- 
a/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
+++ 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
@@ -470,7 +470,7 @@
                   "i": "22",
                   "type": "Widget",
                   "expressions": [
-                    
"sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='1',kind='OAL'},10,des,avg)"
+                    
"sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='1'},10,des,avg)"
                   ],
                   "graph": {
                     "type": "Line",
@@ -481,7 +481,7 @@
                     "showYAxis": true
                   },
                   "widget": {
-                    "title": "OAL L1 Aggregation Queue Percentage (%)"
+                    "title": "L1 Aggregation Queue Percentage (%)"
                   }
                 },
                 {
@@ -492,51 +492,7 @@
                   "i": "23",
                   "type": "Widget",
                   "expressions": [
-                    
"sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='1',kind='MAL'},10,des,avg)"
-                  ],
-                  "widget": {
-                    "title": "MAL L1 Aggregation Queue Percentage (%)"
-                  },
-                  "graph": {
-                    "type": "Line",
-                    "step": false,
-                    "smooth": false,
-                    "showSymbol": true,
-                    "showXAxis": true,
-                    "showYAxis": true
-                  }
-                },
-                {
-                  "x": 12,
-                  "y": 65,
-                  "w": 12,
-                  "h": 13,
-                  "i": "24",
-                  "type": "Widget",
-                  "graph": {
-                    "type": "Line",
-                    "step": false,
-                    "smooth": false,
-                    "showSymbol": true,
-                    "showXAxis": true,
-                    "showYAxis": true
-                  },
-                  "expressions": [
-                    
"sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='2',kind='OAL'},10,des,avg)"
-                  ],
-                  "widget": {
-                    "title": "OAL L2 Aggregation Queue Percentage (%)"
-                  }
-                },
-                {
-                  "x": 0,
-                  "y": 78,
-                  "w": 12,
-                  "h": 13,
-                  "i": "25",
-                  "type": "Widget",
-                  "expressions": [
-                    
"sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='2',kind='MAL'},10,des,avg)"
+                    
"sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='2'},10,des,avg)"
                   ],
                   "graph": {
                     "type": "Line",
@@ -547,15 +503,15 @@
                     "showYAxis": true
                   },
                   "widget": {
-                    "title": "MAL L2 Aggregation Queue Percentage (%)"
+                    "title": "L2 Aggregation Queue Percentage (%)"
                   }
                 },
                 {
                   "x": 12,
-                  "y": 78,
+                  "y": 65,
                   "w": 12,
                   "h": 13,
-                  "i": "26",
+                  "i": "24",
                   "type": "Widget",
                   "graph": {
                     "type": "Line",
@@ -574,10 +530,10 @@
                 },
                 {
                   "x": 0,
-                  "y": 91,
+                  "y": 78,
                   "w": 12,
                   "h": 13,
-                  "i": "27",
+                  "i": "25",
                   "type": "Widget",
                   "expressions": [
                     
"sort_values(meter_oap_instance_metrics_persistent_collection_cached_size{dimensionality='minute',kind='MAL'},10,des,avg)"

Reply via email to