This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch library-batch-queue in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit c86216c5d3765a5d3ad98908ef79c2a2b2e83a84 Author: Wu Sheng <[email protected]> AuthorDate: Sat Feb 14 21:04:27 2026 +0800 Replace DataCarrier with BatchQueue for L2 metrics persistence - Replace DataCarrier in MetricsPersistentMinWorker with shared BatchQueue (METRICS_L2_PERSISTENCE) using adaptive partitioning - Add ThreadPolicy.cpuCoresWithBase(base, multiplier) API for L2 thread scaling (4/6/8 threads on 8/16/24 cores) - Remove MetricsPersistentMinOALWorker and MetricsPersistentMinMALWorker subclasses — single unified worker for all metric types - Add BatchQueueStats for queue usage telemetry (total + top-N partition usage), applied to both L1 and L2 workers - Update Grafana dashboards and otel-rules for new telemetry model Co-Authored-By: Claude Opus 4.6 <[email protected]> --- docs/en/setup/backend/grafana-cluster.json | 222 +-------------------- docs/en/setup/backend/grafana-instance.json | 212 +------------------- .../analysis/worker/MetricsAggregateWorker.java | 46 ++++- .../worker/MetricsPersistentMinMALWorker.java | 62 ------ .../worker/MetricsPersistentMinOALWorker.java | 52 ----- .../worker/MetricsPersistentMinWorker.java | 128 +++++++----- .../analysis/worker/MetricsStreamProcessor.java | 26 +-- .../library-batch-queue/MIGRATION.md | 196 ------------------ .../oap/server/library/batchqueue/BatchQueue.java | 12 ++ .../server/library/batchqueue/BatchQueueStats.java | 133 ++++++++++++ .../server/library/batchqueue/ThreadPolicy.java | 34 +++- .../server/library/batchqueue/BatchQueueTest.java | 190 ++++++++++++++++++ .../library/batchqueue/ThreadPolicyTest.java | 32 +++ .../src/main/resources/otel-rules/oap.yaml | 2 +- .../so11y_oap/so11y-instance.json | 60 +----- 15 files changed, 553 insertions(+), 854 deletions(-) diff --git a/docs/en/setup/backend/grafana-cluster.json b/docs/en/setup/backend/grafana-cluster.json index ea60f34863..3a61beb9eb 100644 --- a/docs/en/setup/backend/grafana-cluster.json +++ b/docs/en/setup/backend/grafana-cluster.json @@ -6693,17 +6693,17 @@ "uid": "$datasource" }, "editorMode": "code", - "expr": "topk(10,avg by(metricName)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"1\",kind=\"OAL\"}))", + "expr": "avg by(slot)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"1\"})", "format": "time_series", "hide": false, "interval": "", "intervalFactor": 1, - "legendFormat": "{{metricName}}", + "legendFormat": "{{slot}}", "range": true, "refId": "A" } ], - "title": "OAL L1 Aggregation Queue Percentage (%)", + "title": "L1 Aggregation Queue Percentage (%)", "type": "timeseries" }, { @@ -6793,17 +6793,17 @@ "uid": "$datasource" }, "editorMode": "code", - "expr": "topk(10,avg by(metricName)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"1\",kind=\"MAL\"}))", + "expr": "avg by(slot)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"2\"})", "format": "time_series", "hide": false, "interval": "", "intervalFactor": 1, - "legendFormat": "{{metricName}}", + "legendFormat": "{{slot}}", "range": true, "refId": "A" } ], - "title": "MAL L1 Aggregation Queue Percentage (%)", + "title": "L2 Aggregation Queue Percentage (%)", "type": "timeseries" }, { @@ -6871,206 +6871,6 @@ "x": 8, "y": 122 }, - "id": 150, - "interval": "1m", - "options": { - "dataLinks": [], - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "multi", - "sort": "none" - } - }, - "pluginVersion": "11.3.1", - "targets": [ - { - "datasource": { - "uid": "$datasource" - }, - "editorMode": "code", - "expr": "topk(10,avg by(metricName)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"2\",kind=\"OAL\"}))", - "format": "time_series", - "hide": false, - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{metricName}}", - "range": true, - "refId": "A" - } - ], - "title": "OAL L2 Aggregation Queue Percentage (%)", - "type": "timeseries" - }, - { - "datasource": { - "uid": "$datasource" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 10, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "never", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 8, - "x": 16, - "y": 122 - }, - "id": 151, - "interval": "1m", - "options": { - "dataLinks": [], - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "multi", - "sort": "none" - } - }, - "pluginVersion": "11.3.1", - "targets": [ - { - "datasource": { - "uid": "$datasource" - }, - "editorMode": "code", - "expr": "topk(10,avg by(metricName)(metrics_aggregation_queue_used_percentage{job=\"$job\",level=\"2\",kind=\"MAL\"}))", - "format": "time_series", - "hide": false, - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{metricName}}", - "range": true, - "refId": "A" - } - ], - "title": "MAL L2 Aggregation Queue Percentage (%)", - "type": "timeseries" - }, - { - "datasource": { - "uid": "$datasource" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 10, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "never", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 8, - "x": 0, - "y": 128 - }, "id": 149, "interval": "1m", "options": { @@ -7168,8 +6968,8 @@ "gridPos": { "h": 6, "w": 8, - "x": 8, - "y": 128 + "x": 16, + "y": 122 }, "id": 152, "interval": "1m", @@ -7269,7 +7069,7 @@ "gridPos": { "h": 6, "w": 8, - "x": 16, + "x": 0, "y": 128 }, "id": 146, @@ -7421,8 +7221,8 @@ "gridPos": { "h": 6, "w": 8, - "x": 0, - "y": 134 + "x": 8, + "y": 128 }, "id": 145, "interval": "1m", diff --git a/docs/en/setup/backend/grafana-instance.json b/docs/en/setup/backend/grafana-instance.json index 4c78f5c313..ef2070fa1d 100644 --- a/docs/en/setup/backend/grafana-instance.json +++ b/docs/en/setup/backend/grafana-instance.json @@ -7518,17 +7518,17 @@ "uid": "$datasource" }, "editorMode": "code", - "expr": "topk(10,metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"1\",kind=\"OAL\"})", + "expr": "metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"1\"}", "format": "time_series", "hide": false, "interval": "1m", "intervalFactor": 1, - "legendFormat": "{{metricName}}", + "legendFormat": "{{slot}}", "range": true, "refId": "A" } ], - "title": "OAL L1 Aggregation Queue Percentage (%)", + "title": "L1 Aggregation Queue Percentage (%)", "type": "timeseries" }, { @@ -7618,217 +7618,17 @@ "uid": "$datasource" }, "editorMode": "code", - "expr": "topk(10,metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"2\",kind=\"OAL\"})", - "format": "time_series", - "hide": false, - "interval": "1m", - "intervalFactor": 1, - "legendFormat": "{{metricName}}", - "range": true, - "refId": "A" - } - ], - "title": "OAL L2 Aggregation Queue Percentage (%)", - "type": "timeseries" - }, - { - "datasource": { - "uid": "$datasource" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 10, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "never", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 8, - "x": 0, - "y": 148 - }, - "id": 148, - "interval": "1m", - "options": { - "dataLinks": [], - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "multi", - "sort": "none" - } - }, - "pluginVersion": "11.3.1", - "targets": [ - { - "datasource": { - "uid": "$datasource" - }, - "editorMode": "code", - "expr": "topk(10,metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"1\",kind=\"MAL\"})", + "expr": "metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"2\"}", "format": "time_series", "hide": false, "interval": "1m", "intervalFactor": 1, - "legendFormat": "{{metricName}}", - "range": true, - "refId": "A" - } - ], - "title": "MAL L1 Aggregation Queue Percentage (%)", - "type": "timeseries" - }, - { - "datasource": { - "uid": "$datasource" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 10, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "never", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 8, - "x": 8, - "y": 148 - }, - "id": 151, - "interval": "1m", - "options": { - "dataLinks": [], - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "multi", - "sort": "none" - } - }, - "pluginVersion": "11.3.1", - "targets": [ - { - "datasource": { - "uid": "$datasource" - }, - "editorMode": "code", - "expr": "topk(10,metrics_aggregation_queue_used_percentage{instance=\"$instance\",job=\"$job\",level=\"2\",kind=\"MAL\"})", - "format": "time_series", - "hide": false, - "interval": "1m", - "intervalFactor": 1, - "legendFormat": "{{metricName}}", + "legendFormat": "{{slot}}", "range": true, "refId": "A" } ], - "title": "MAL L2 Aggregation Queue Percentage (%)", + "title": "L2 Aggregation Queue Percentage (%)", "type": "timeseries" }, { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index ff69e6f8ec..729834cf6b 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -18,7 +18,9 @@ package org.apache.skywalking.oap.server.core.analysis.worker; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; @@ -26,6 +28,7 @@ import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue; import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueConfig; import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueManager; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueStats; import org.apache.skywalking.oap.server.library.batchqueue.BufferStrategy; import org.apache.skywalking.oap.server.library.batchqueue.HandlerConsumer; import org.apache.skywalking.oap.server.library.batchqueue.PartitionPolicy; @@ -33,6 +36,7 @@ import org.apache.skywalking.oap.server.library.batchqueue.ThreadPolicy; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; @@ -59,13 +63,16 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> { .maxIdleMs(50) .build(); + private static final int TOP_N = 10; + /** slot label -> gauge instance. Keys: "total", "top1" .. "top10". */ + private static Map<String, GaugeMetrics> QUEUE_USAGE_GAUGE; + private final BatchQueue<Metrics> l1Queue; private final long l1FlushPeriod; private final AbstractWorker<Metrics> nextWorker; private final MergableBufferedData<Metrics> mergeDataCache; private final CounterMetrics abandonCounter; private final CounterMetrics aggregationCounter; - // TODO: add queue usage telemetry after per-partition metrics are designed private long lastSendTime = 0; MetricsAggregateWorker(final ModuleDefineHolder moduleDefineHolder, @@ -93,6 +100,25 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> { new MetricsTag.Values(modelName, "1", "minute") ); + if (QUEUE_USAGE_GAUGE == null) { + final Map<String, GaugeMetrics> gauge = new LinkedHashMap<>(); + gauge.put("total", metricsCreator.createGauge( + "metrics_aggregation_queue_used_percentage", + "The percentage of queue used in L1 aggregation.", + new MetricsTag.Keys("level", "slot"), + new MetricsTag.Values("1", "total") + )); + for (int i = 1; i <= TOP_N; i++) { + gauge.put("top" + i, metricsCreator.createGauge( + "metrics_aggregation_queue_used_percentage", + "The percentage of queue used in L1 aggregation.", + new MetricsTag.Keys("level", "slot"), + new MetricsTag.Values("1", "top" + i) + )); + } + QUEUE_USAGE_GAUGE = gauge; + } + l1Queue.addHandler(metricsClass, new L1Handler()); } @@ -108,9 +134,27 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> { aggregationCounter.inc(); mergeDataCache.accept(metrics); } + updateQueueUsageGauges(); flush(); } + private void updateQueueUsageGauges() { + final Map<String, GaugeMetrics> gauge = QUEUE_USAGE_GAUGE; + if (gauge == null) { + return; + } + final BatchQueueStats stats = l1Queue.stats(); + gauge.get("total").setValue(stats.totalUsedPercentage()); + final List<BatchQueueStats.PartitionUsage> topPartitions = stats.topN(TOP_N); + for (int i = 1; i <= TOP_N; i++) { + if (i <= topPartitions.size()) { + gauge.get("top" + i).setValue(topPartitions.get(i - 1).getUsedPercentage()); + } else { + gauge.get("top" + i).setValue(0); + } + } + } + private void flush() { final long currentTime = System.currentTimeMillis(); if (currentTime - lastSendTime > l1FlushPeriod) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java deleted file mode 100644 index 6ced828694..0000000000 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.core.analysis.worker; - -import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; -import org.apache.skywalking.oap.server.core.exporter.ExportEvent; -import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; -import org.apache.skywalking.oap.server.core.storage.model.Model; -import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; -import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; - -@Slf4j -public class MetricsPersistentMinMALWorker extends MetricsPersistentMinWorker { - private final static String POOL_NAME = "METRICS_L2_AGGREGATION_MAL"; - private final BulkConsumePool pool; - - MetricsPersistentMinMALWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, - AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker, - MetricsTransWorker transWorker, boolean supportUpdate, - long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) { - super( - moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate, - storageSessionTimeout, metricsDataTTL, kind, - POOL_NAME, - calculatePoolSize(), - true, - 1, - 1000 - ); - this.pool = (BulkConsumePool) ConsumerPoolFactory.INSTANCE.get(POOL_NAME); - } - - @Override - public void in(Metrics metrics) { - super.in(metrics); - pool.notifyConsumers(); - } - - private static int calculatePoolSize() { - int size = BulkConsumePool.Creator.recommendMaxSize() / 16; - return size == 0 ? 1 : size; - } -} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java deleted file mode 100644 index 534b50f9f5..0000000000 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.core.analysis.worker; - -import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; -import org.apache.skywalking.oap.server.core.exporter.ExportEvent; -import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; -import org.apache.skywalking.oap.server.core.storage.model.Model; -import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; -import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; - -@Slf4j -public class MetricsPersistentMinOALWorker extends MetricsPersistentMinWorker { - - MetricsPersistentMinOALWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, - AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker, - MetricsTransWorker transWorker, boolean supportUpdate, - long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) { - super( - moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate, - storageSessionTimeout, metricsDataTTL, kind, - "METRICS_L2_AGGREGATION_OAL", - calculatePoolSize(), - false, - 1, - 2000 - ); - } - - private static int calculatePoolSize() { - int size = BulkConsumePool.Creator.recommendMaxSize() / 8; - return size == 0 ? 1 : size; - } -} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java index 2e2f667045..d5dc31d03d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java @@ -18,22 +18,25 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.status.ServerStatusService; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; -import org.apache.skywalking.oap.server.library.datacarrier.buffer.QueueBuffer; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueConfig; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueManager; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueStats; +import org.apache.skywalking.oap.server.library.batchqueue.BufferStrategy; +import org.apache.skywalking.oap.server.library.batchqueue.HandlerConsumer; +import org.apache.skywalking.oap.server.library.batchqueue.PartitionPolicy; +import org.apache.skywalking.oap.server.library.batchqueue.ThreadPolicy; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics; @@ -41,16 +44,32 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; /** - * MetricsPersistentMinWorker is an extension of {@link MetricsPersistentWorker} and focuses on the Minute Metrics data persistent. + * MetricsPersistentMinWorker is an extension of {@link MetricsPersistentWorker} and focuses on the + * Minute Metrics data persistent. + * + * <p>All metric types (OAL and MAL) share a single {@link BatchQueue} with adaptive partitioning. + * The {@code typeHash()} partition selector ensures same metric class lands on the same partition, + * so each handler's {@link org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache} + * is only accessed by one drain thread. */ @Slf4j -public abstract class MetricsPersistentMinWorker extends MetricsPersistentWorker { - private final DataCarrier<Metrics> dataCarrier; +public class MetricsPersistentMinWorker extends MetricsPersistentWorker { + private static final String L2_QUEUE_NAME = "METRICS_L2_PERSISTENCE"; + private static final BatchQueueConfig<Metrics> L2_QUEUE_CONFIG = + BatchQueueConfig.<Metrics>builder() + .threads(ThreadPolicy.cpuCoresWithBase(1, 0.25)) + .partitions(PartitionPolicy.adaptive()) + .bufferSize(2_000) + .strategy(BufferStrategy.IF_POSSIBLE) + .minIdleMs(1) + .maxIdleMs(50) + .build(); - /** - * The percentage of queue used in aggregation - */ - private final GaugeMetrics queuePercentageGauge; + private static final int TOP_N = 10; + /** slot label -> gauge instance. Keys: "total", "top1" .. "top10". */ + private static Map<String, GaugeMetrics> QUEUE_USAGE_GAUGE; + + private final BatchQueue<Metrics> l2Queue; /** * @since 9.4.0 @@ -59,41 +78,45 @@ public abstract class MetricsPersistentMinWorker extends MetricsPersistentWorker // Not going to expose this as a configuration, only for testing purpose private final boolean isTestingTTL = "true".equalsIgnoreCase(System.getenv("TESTING_TTL")); - private final int queueTotalSize; MetricsPersistentMinWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker, MetricsTransWorker transWorker, boolean supportUpdate, long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind, - String poolName, int poolSize, boolean isSignalDrivenMode, - int queueChannelSize, int queueBufferSize) { + Class<? extends Metrics> metricsClass) { super( moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate, storageSessionTimeout, metricsDataTTL, kind ); - BulkConsumePool.Creator creator = new BulkConsumePool.Creator(poolName, poolSize, 200, isSignalDrivenMode); - try { - ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator); - } catch (Exception e) { - throw new UnexpectedException(e.getMessage(), e); - } - this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), poolName, queueChannelSize, queueBufferSize); - this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), new PersistentConsumer()); + this.l2Queue = BatchQueueManager.create(L2_QUEUE_NAME, L2_QUEUE_CONFIG); - MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME) - .provider() - .getService(MetricsCreator.class); - queuePercentageGauge = metricsCreator.createGauge( - "metrics_aggregation_queue_used_percentage", "The percentage of queue used in aggregation.", - new MetricsTag.Keys("metricName", "level", "kind"), - new MetricsTag.Values(model.getName(), "2", kind.name()) - ); serverStatusService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServerStatusService.class); serverStatusService.registerWatcher(this); - queueTotalSize = Arrays.stream(dataCarrier.getChannels().getBufferChannels()) - .mapToInt(QueueBuffer::getBufferSize) - .sum(); + + if (QUEUE_USAGE_GAUGE == null) { + final MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME) + .provider() + .getService(MetricsCreator.class); + final Map<String, GaugeMetrics> gauge = new LinkedHashMap<>(); + gauge.put("total", metricsCreator.createGauge( + "metrics_aggregation_queue_used_percentage", + "The percentage of queue used in L2 persistence.", + new MetricsTag.Keys("level", "slot"), + new MetricsTag.Values("2", "total") + )); + for (int i = 1; i <= TOP_N; i++) { + gauge.put("top" + i, metricsCreator.createGauge( + "metrics_aggregation_queue_used_percentage", + "The percentage of queue used in L2 persistence.", + new MetricsTag.Keys("level", "slot"), + new MetricsTag.Values("2", "top" + i) + )); + } + QUEUE_USAGE_GAUGE = gauge; + } + + l2Queue.addHandler(metricsClass, new L2Handler()); } /** @@ -107,24 +130,31 @@ public abstract class MetricsPersistentMinWorker extends MetricsPersistentWorker return; } getAggregationCounter().inc(); - dataCarrier.produce(metrics); + l2Queue.produce(metrics); } - /** - * Metrics queue processor, merge the received metrics if existing one with same ID(s) and time bucket. - * - * ID is declared through {@link Object#hashCode()} and {@link Object#equals(Object)} as usual. - */ - private class PersistentConsumer implements IConsumer<Metrics> { - @Override - public void consume(List<Metrics> data) { - queuePercentageGauge.setValue(Math.round(100 * (double) data.size() / queueTotalSize)); - MetricsPersistentMinWorker.this.onWork(data); + private void updateQueueUsageGauges() { + final Map<String, GaugeMetrics> gauge = QUEUE_USAGE_GAUGE; + if (gauge == null) { + return; + } + final BatchQueueStats stats = l2Queue.stats(); + gauge.get("total").setValue(stats.totalUsedPercentage()); + final List<BatchQueueStats.PartitionUsage> topPartitions = stats.topN(TOP_N); + for (int i = 1; i <= TOP_N; i++) { + if (i <= topPartitions.size()) { + gauge.get("top" + i).setValue(topPartitions.get(i - 1).getUsedPercentage()); + } else { + gauge.get("top" + i).setValue(0); + } } + } + private class L2Handler implements HandlerConsumer<Metrics> { @Override - public void onError(List<Metrics> data, Throwable t) { - log.error(t.getMessage(), t); + public void consume(List<Metrics> data) { + updateQueueUsageGauges(); + onWork(data); } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index 38361ebd10..c405bb5b37 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -185,7 +185,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Minute) ); MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker( - moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate, kind); + moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate, kind, metricsClass); String remoteReceiverWorkerName = stream.getName() + "_rec"; IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME) @@ -204,27 +204,15 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { Model model, MetricsTransWorker transWorker, boolean supportUpdate, - MetricStreamKind kind) { + MetricStreamKind kind, + Class<? extends Metrics> metricsClass) { AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder); ExportMetricsWorker exportWorker = new ExportMetricsWorker(moduleDefineHolder); - MetricsPersistentWorker minutePersistentWorker; - switch (kind) { - case OAL: - minutePersistentWorker = new MetricsPersistentMinOALWorker( - moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, - supportUpdate, storageSessionTimeout, metricsDataTTL, kind - ); - break; - case MAL: - minutePersistentWorker = new MetricsPersistentMinMALWorker( - moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, - supportUpdate, storageSessionTimeout, metricsDataTTL, kind - ); - break; - default: - throw new IllegalArgumentException("Unsupported MetricStreamKind: " + kind); - } + MetricsPersistentMinWorker minutePersistentWorker = new MetricsPersistentMinWorker( + moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, + supportUpdate, storageSessionTimeout, metricsDataTTL, kind, metricsClass + ); persistentWorkers.add(minutePersistentWorker); return minutePersistentWorker; diff --git a/oap-server/server-library/library-batch-queue/MIGRATION.md b/oap-server/server-library/library-batch-queue/MIGRATION.md deleted file mode 100644 index 3cdc251ec3..0000000000 --- a/oap-server/server-library/library-batch-queue/MIGRATION.md +++ /dev/null @@ -1,196 +0,0 @@ -# DataCarrier to BatchQueue Migration - -This document tracks the step-by-step replacement of DataCarrier with BatchQueue -across the SkyWalking OAP codebase. Each section covers one replacement target -with its current state, proposed changes, and key parameters. - -## 1. L1 Metrics Aggregation (MetricsAggregateWorker) - -### Current Architecture - -Each metric type (~620 OAL + ~100 MAL) creates its own `MetricsAggregateWorker` -with a dedicated `DataCarrier`. OAL and MAL workers use separate -`BulkConsumePool` instances with different thread counts and configurations, -despite doing the same work (merge + flush). - -``` -MetricsStreamProcessor.in(metrics) - └─ entryWorkers.get(metrics.getClass()) // one worker per metric type - └─ worker.in(metrics) - └─ dataCarrier.produce(metrics) // one DataCarrier per worker - └─ BulkConsumePool drains // OAL pool or MAL pool - └─ AggregatorConsumer.consume(batch) - └─ mergeDataCache.accept(each) // per-worker merge cache - └─ flush() if l1FlushPeriod elapsed -``` - -**Files involved:** -- `server-core/.../worker/MetricsAggregateWorker.java` — base class, owns DataCarrier + merge cache -- `server-core/.../worker/MetricsAggregateOALWorker.java` — OAL pool config -- `server-core/.../worker/MetricsAggregateMALWorker.java` — MAL pool config + signal-driven -- `server-core/.../worker/MetricsStreamProcessor.java` — creates workers, routes `in()` calls - -**Current parameters (8-core machine):** - -| Parameter | OAL pool | MAL pool | -|-----------|----------|----------| -| Pool name | `METRICS_L1_AGGREGATION_OAL` | `METRICS_L1_AGGREGATION_MAL` | -| Pool threads | `ceil(cores * 3)` = 24 | `cores / 4` = 2 | -| Signal-driven | no (200ms poll cycle) | yes (`notifyConsumers()` per `in()`) | -| Channels per type | 2 | 1 | -| Buffer per channel | 10,000 | 1,000 | -| Total buffer per type | 20,000 | 1,000 | -| Strategy | IF_POSSIBLE | IF_POSSIBLE | -| **Total threads** | **26** | | -| **Total DataCarriers** | **~720** | | - -**Per-worker state:** -- `MergableBufferedData<Metrics> mergeDataCache` — not thread-safe, merge cache -- `long lastSendTime` — tracks l1FlushPeriod -- `AbstractWorker<Metrics> nextWorker` — MetricsRemoteWorker -- `CounterMetrics abandonCounter` — tagged by metricName, level=1, dimensionality=minute -- `CounterMetrics aggregationCounter` — tagged by metricName, level=1, dimensionality=minute -- `GaugeMetrics queuePercentageGauge` — tagged by metricName, level=1, kind=OAL/MAL - -### Proposed: Single BatchQueue for All Metric Types - -OAL and MAL use the same queue. There is no reason to separate them — the -aggregation logic (merge + periodic flush) is identical. Splitting by kind only -wastes threads and complicates configuration. `MetricStreamKind` becomes -irrelevant at the L1 aggregation layer. - -One `BatchQueue<Metrics>` with adaptive partitioning handles all ~720 metric -types. Each type registers a handler via `addHandler()`. - -**BatchQueue parameters:** - -| Parameter | Value | Rationale | -|-----------|-------|-----------| -| Queue name | `METRICS_L1_AGGREGATION` | Single queue for all metric types (OAL + MAL) | -| ThreadPolicy | `cpuCores(1.0)` | 8 threads on 8-core. Benchmark: 8 threads + adaptive beats 26-thread DataCarrier by 34-68% | -| PartitionPolicy | `adaptive()` (multiplier=25) | threshold=8*25=200. ~720 types → 200+520/2 = 460 partitions | -| BufferSize | 20,000 | Matches current OAL per-type capacity | -| Strategy | `IF_POSSIBLE` | Same as current — drop if full | -| PartitionSelector | `typeHash()` (default) | Same type → same partition → single-thread access to merge cache | -| minIdleMs | 1 | Fast reaction (current MAL used signal-driven wakeup; 1ms backoff is equivalent) | -| maxIdleMs | 50 | Backoff cap when idle | - -**Thread-safety guarantee:** `typeHash()` ensures same metric class → same -partition → same drain thread. Each handler's `MergableBufferedData` is only -accessed by one thread, preserving the current invariant without synchronization. - -**Memory comparison (8-core, ~720 types):** - -| | Current (OAL + MAL pools) | Proposed (single queue) | -|---|---------------------------|-------------------------| -| Queues | ~1340 channels | 460 partitions | -| Buffer slots | ~12.5M | ~9.2M | -| Threads | 26 | 8 | - -### Telemetry - -| Metric | Plan | -|--------|------| -| `metrics_aggregator_abandon` | Keep. Per-type counter in handler. Incremented when `produce()` returns false. | -| `metrics_aggregation` | Keep. Per-type counter in handler. Incremented per item in `consume()`. | -| `metrics_aggregation_queue_used_percentage` | TODO: Redesign for shared-partition model. Leave as placeholder. | - -### Replacement Steps - -#### Step 1: Create L1 aggregation handler - -Create a handler class that captures the per-type state currently spread across -`MetricsAggregateWorker` / `MetricsAggregateOALWorker` / `MetricsAggregateMALWorker`. -No OAL/MAL distinction — the handler is the same for all metric types. - -``` -MetricsL1Handler implements HandlerConsumer<Metrics> - - MergableBufferedData<Metrics> mergeDataCache - - long lastSendTime - - long l1FlushPeriod - - AbstractWorker<Metrics> nextWorker - - CounterMetrics aggregationCounter - - consume(List<Metrics> batch): - for each metric: aggregationCounter.inc(); mergeDataCache.accept(metric) - flush() - - onIdle(): - flush() - - flush(): - if currentTime - lastSendTime > l1FlushPeriod: - mergeDataCache.read().forEach(nextWorker::in) - lastSendTime = currentTime -``` - -#### Step 2: Create the shared BatchQueue in MetricsStreamProcessor - -One queue, created once, shared by all metric types regardless of kind: - -```java -BatchQueue<Metrics> l1Queue = BatchQueueManager.create( - "METRICS_L1_AGGREGATION", - BatchQueueConfig.<Metrics>builder() - .threads(ThreadPolicy.cpuCores(1.0)) - .partitions(PartitionPolicy.adaptive()) - .bufferSize(20_000) - .strategy(BufferStrategy.IF_POSSIBLE) - .minIdleMs(1) - .maxIdleMs(50) - .build()); -``` - -#### Step 3: Register handlers per metric type - -In `MetricsStreamProcessor.create()`, replace the OAL/MAL worker switch with -a single handler registration. The `kind` parameter is no longer needed: - -```java -// Replace: -// switch (kind) { -// case OAL: aggregateWorker = new MetricsAggregateOALWorker(...); break; -// case MAL: aggregateWorker = new MetricsAggregateMALWorker(...); break; -// } -// entryWorkers.put(metricsClass, aggregateWorker); - -// With: -MetricsL1Handler handler = new MetricsL1Handler( - remoteWorker, l1FlushPeriod, modelName, metricsCreator); -l1Queue.addHandler(metricsClass, handler); -``` - -#### Step 4: Replace `in()` routing - -The `MetricsStreamProcessor.in()` method no longer looks up a per-type worker. -It produces directly to the queue: - -```java -// Replace: -// MetricsAggregateWorker worker = entryWorkers.get(metrics.getClass()); -// if (worker != null) { worker.in(metrics); } - -// With: -if (!l1Queue.produce(metrics)) { - abandonCounters.get(metrics.getClass()).inc(); -} -``` - -`abandonCounters` is a `Map<Class<?>, CounterMetrics>` populated during -handler registration (Step 3). - -#### Step 5: Verify - -- All existing unit tests for MetricsStreamProcessor pass -- E2E tests with metrics aggregation pass -- Telemetry counters (abandon, aggregation) still reported per metric type - ---- - -## 2. L2 Metrics Persistence (MetricsPersistentMinWorker) - -*To be planned after L1 replacement is complete.* - -## 3. Individual Consumer Replacements - -*To be planned: GRPCRemoteClient, TopNWorker, exporters, JDBCBatchDAO.* diff --git a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java index f6a117039e..1167fc57a5 100644 --- a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java +++ b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java @@ -513,4 +513,16 @@ public class BatchQueue<T> { boolean isDedicatedScheduler() { return dedicatedScheduler; } + + /** + * Take a point-in-time snapshot of queue usage across all partitions. + */ + public BatchQueueStats stats() { + final ArrayBlockingQueue<T>[] currentPartitions = this.partitions; + final int[] used = new int[currentPartitions.length]; + for (int i = 0; i < currentPartitions.length; i++) { + used[i] = currentPartitions[i].size(); + } + return new BatchQueueStats(currentPartitions.length, config.getBufferSize(), used); + } } diff --git a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueStats.java b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueStats.java new file mode 100644 index 0000000000..2a0b64dff3 --- /dev/null +++ b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueStats.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.batchqueue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import lombok.Getter; + +/** + * Immutable snapshot of a {@link BatchQueue}'s partition usage at a point in time. + * + * <p>Provides three levels of detail: + * <ul> + * <li><b>Total usage</b> — aggregate across all partitions</li> + * <li><b>Per-partition usage</b> — individual partition sizes</li> + * <li><b>Top N</b> — the most-loaded partitions, sorted by usage descending</li> + * </ul> + */ +public class BatchQueueStats { + @Getter + private final int partitionCount; + @Getter + private final int bufferSize; + private final int[] partitionUsed; + + BatchQueueStats(final int partitionCount, final int bufferSize, final int[] partitionUsed) { + this.partitionCount = partitionCount; + this.bufferSize = bufferSize; + this.partitionUsed = Arrays.copyOf(partitionUsed, partitionUsed.length); + } + + /** + * Total capacity across all partitions: {@code partitionCount * bufferSize}. + */ + public long totalCapacity() { + return (long) partitionCount * bufferSize; + } + + /** + * Total number of items currently queued across all partitions. + */ + public int totalUsed() { + int sum = 0; + for (final int used : partitionUsed) { + sum += used; + } + return sum; + } + + /** + * Overall queue usage as a percentage (0.0–100.0). + */ + public double totalUsedPercentage() { + final long capacity = totalCapacity(); + if (capacity == 0) { + return 0.0; + } + return 100.0 * totalUsed() / capacity; + } + + /** + * Number of items currently queued in the given partition. + */ + public int partitionUsed(final int index) { + return partitionUsed[index]; + } + + /** + * Usage of the given partition as a percentage (0.0–100.0). + */ + public double partitionUsedPercentage(final int index) { + if (bufferSize == 0) { + return 0.0; + } + return 100.0 * partitionUsed[index] / bufferSize; + } + + /** + * Return the top {@code n} most-loaded partitions, sorted by usage descending. + * If {@code n >= partitionCount}, all partitions are returned. + */ + public List<PartitionUsage> topN(final int n) { + final Integer[] indices = new Integer[partitionCount]; + for (int i = 0; i < partitionCount; i++) { + indices[i] = i; + } + Arrays.sort(indices, (a, b) -> Integer.compare(partitionUsed[b], partitionUsed[a])); + + final int limit = Math.min(n, partitionCount); + final List<PartitionUsage> result = new ArrayList<>(limit); + for (int i = 0; i < limit; i++) { + final int idx = indices[i]; + result.add(new PartitionUsage( + idx, partitionUsed[idx], + bufferSize == 0 ? 0.0 : 100.0 * partitionUsed[idx] / bufferSize + )); + } + return result; + } + + /** + * Usage snapshot for a single partition. + */ + @Getter + public static class PartitionUsage { + private final int partitionIndex; + private final int used; + private final double usedPercentage; + + PartitionUsage(final int partitionIndex, final int used, final double usedPercentage) { + this.partitionIndex = partitionIndex; + this.used = used; + this.usedPercentage = usedPercentage; + } + } +} diff --git a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java index 6b6585debe..43e5f77ef1 100644 --- a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java +++ b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java @@ -22,19 +22,22 @@ package org.apache.skywalking.oap.server.library.batchqueue; * Determines the number of threads for a BatchQueue's dedicated scheduler * or for a shared scheduler. * - * Two modes: + * Three modes: * - fixed(N): exactly N threads, regardless of hardware. * - cpuCores(multiplier): multiplier * Runtime.availableProcessors(), rounded. + * - cpuCoresWithBase(base, multiplier): base + multiplier * Runtime.availableProcessors(), rounded. * * Resolved value is always >= 1 — every pool must have at least one thread. * fixed() requires count >= 1 at construction. cpuCores() applies max(1, ...) at resolution. */ public class ThreadPolicy { private final int fixedCount; + private final int base; private final double cpuMultiplier; - private ThreadPolicy(final int fixedCount, final double cpuMultiplier) { + private ThreadPolicy(final int fixedCount, final int base, final double cpuMultiplier) { this.fixedCount = fixedCount; + this.base = base; this.cpuMultiplier = cpuMultiplier; } @@ -47,7 +50,7 @@ public class ThreadPolicy { if (count < 1) { throw new IllegalArgumentException("Thread count must be >= 1, got: " + count); } - return new ThreadPolicy(count, 0); + return new ThreadPolicy(count, 0, 0); } /** @@ -60,7 +63,25 @@ public class ThreadPolicy { if (multiplier <= 0) { throw new IllegalArgumentException("CPU multiplier must be > 0, got: " + multiplier); } - return new ThreadPolicy(0, multiplier); + return new ThreadPolicy(0, 0, multiplier); + } + + /** + * Threads = base + round(multiplier * available CPU cores), min 1. + * Base must be >= 0, multiplier must be > 0. + * + * Example: cpuCoresWithBase(2, 0.25) on 8-core = 2 + 2 = 4, on 16-core = 2 + 4 = 6, on 24-core = 2 + 6 = 8. + * + * @throws IllegalArgumentException if base < 0 or multiplier <= 0 + */ + public static ThreadPolicy cpuCoresWithBase(final int base, final double multiplier) { + if (base < 0) { + throw new IllegalArgumentException("Base must be >= 0, got: " + base); + } + if (multiplier <= 0) { + throw new IllegalArgumentException("CPU multiplier must be > 0, got: " + multiplier); + } + return new ThreadPolicy(0, base, multiplier); } /** @@ -70,7 +91,7 @@ public class ThreadPolicy { if (fixedCount > 0) { return fixedCount; } - return Math.max(1, (int) Math.round(cpuMultiplier * Runtime.getRuntime().availableProcessors())); + return Math.max(1, base + (int) Math.round(cpuMultiplier * Runtime.getRuntime().availableProcessors())); } @Override @@ -78,6 +99,9 @@ public class ThreadPolicy { if (fixedCount > 0) { return "fixed(" + fixedCount + ")"; } + if (base > 0) { + return "cpuCoresWithBase(" + base + ", " + cpuMultiplier + ")"; + } return "cpuCores(" + cpuMultiplier + ")"; } } diff --git a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java index a63bbdf400..1f3795102f 100644 --- a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java +++ b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; public class BatchQueueTest { @@ -484,6 +485,195 @@ public class BatchQueueTest { "Later gap (" + laterGap + "ms) should be larger than early gap (" + earlyGap + "ms)"); } + // --- Stats --- + + @Test + public void testStatsReflectsQueueUsage() { + final CountDownLatch blockLatch = new CountDownLatch(1); + final BatchQueue<String> queue = BatchQueueManager.create("stats-usage-test", + BatchQueueConfig.<String>builder() + .threads(ThreadPolicy.fixed(1)) + .strategy(BufferStrategy.IF_POSSIBLE) + .consumer(data -> { + try { + blockLatch.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .bufferSize(100) + .build()); + + // Wait for the consumer to block on the first item + queue.produce("trigger"); + Awaitility.await().atMost(2, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> { + final BatchQueueStats stats = queue.stats(); + return stats.totalUsed() == 0; // first item already drained into blocked consumer + }); + + // Produce 10 items — they'll sit in the partition because consumer is blocked + for (int i = 0; i < 10; i++) { + queue.produce("item-" + i); + } + + final BatchQueueStats stats = queue.stats(); + assertEquals(1, stats.getPartitionCount()); + assertEquals(100, stats.getBufferSize()); + assertEquals(100, stats.totalCapacity()); + assertEquals(10, stats.totalUsed()); + assertEquals(10.0, stats.totalUsedPercentage(), 0.01); + assertEquals(10, stats.partitionUsed(0)); + assertEquals(10.0, stats.partitionUsedPercentage(0), 0.01); + + blockLatch.countDown(); + } + + @Test + public void testStatsWithMultiplePartitions() { + final CountDownLatch blockLatch = new CountDownLatch(1); + final BatchQueue<String> queue = BatchQueueManager.create("stats-multi-partition-test", + BatchQueueConfig.<String>builder() + .threads(ThreadPolicy.fixed(2)) + .partitions(PartitionPolicy.fixed(4)) + .strategy(BufferStrategy.IF_POSSIBLE) + .partitionSelector((data, count) -> Integer.parseInt(data) % count) + .consumer(data -> { + try { + blockLatch.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .bufferSize(50) + .build()); + + // Wait for drain threads to block + queue.produce("0"); + queue.produce("1"); + Awaitility.await().atMost(2, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> queue.stats().totalUsed() == 0); + + // Produce items targeting specific partitions: value % 4 + for (int i = 0; i < 20; i++) { + queue.produce(String.valueOf(i)); + } + + final BatchQueueStats stats = queue.stats(); + assertEquals(4, stats.getPartitionCount()); + assertEquals(50, stats.getBufferSize()); + assertEquals(200, stats.totalCapacity()); + assertEquals(20, stats.totalUsed()); + // Each partition gets 5 items (0,4,8,12,16 / 1,5,9,13,17 / 2,6,10,14,18 / 3,7,11,15,19) + for (int p = 0; p < 4; p++) { + assertEquals(5, stats.partitionUsed(p)); + assertEquals(10.0, stats.partitionUsedPercentage(p), 0.01); + } + + blockLatch.countDown(); + } + + @Test + public void testStatsSnapshotIsImmutable() { + final CountDownLatch blockLatch = new CountDownLatch(1); + final BatchQueue<String> queue = BatchQueueManager.create("stats-immutable-test", + BatchQueueConfig.<String>builder() + .threads(ThreadPolicy.fixed(1)) + .strategy(BufferStrategy.IF_POSSIBLE) + .consumer(data -> { + try { + blockLatch.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .bufferSize(100) + .build()); + + queue.produce("trigger"); + Awaitility.await().atMost(2, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> queue.stats().totalUsed() == 0); + + for (int i = 0; i < 5; i++) { + queue.produce("item-" + i); + } + + final BatchQueueStats snapshot = queue.stats(); + assertEquals(5, snapshot.totalUsed()); + + // Produce more — the snapshot should not change + for (int i = 0; i < 5; i++) { + queue.produce("more-" + i); + } + + assertEquals(5, snapshot.totalUsed()); + assertNotEquals(5, queue.stats().totalUsed()); + + blockLatch.countDown(); + } + + @Test + public void testStatsTopNReturnsHottestPartitions() { + final CountDownLatch blockLatch = new CountDownLatch(1); + final BatchQueue<String> queue = BatchQueueManager.create("stats-topn-test", + BatchQueueConfig.<String>builder() + .threads(ThreadPolicy.fixed(2)) + .partitions(PartitionPolicy.fixed(4)) + .strategy(BufferStrategy.IF_POSSIBLE) + .partitionSelector((data, count) -> Integer.parseInt(data.split("-")[0]) % count) + .consumer(data -> { + try { + blockLatch.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .bufferSize(100) + .build()); + + // Wait for drain threads to block + queue.produce("0-trigger"); + queue.produce("1-trigger"); + Awaitility.await().atMost(2, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> queue.stats().totalUsed() == 0); + + // Load partitions unevenly: p0=20, p1=5, p2=15, p3=10 + for (int i = 0; i < 20; i++) { + queue.produce("0-" + i); + } + for (int i = 0; i < 5; i++) { + queue.produce("1-" + i); + } + for (int i = 0; i < 15; i++) { + queue.produce("2-" + i); + } + for (int i = 0; i < 10; i++) { + queue.produce("3-" + i); + } + + final BatchQueueStats stats = queue.stats(); + final java.util.List<BatchQueueStats.PartitionUsage> top2 = stats.topN(2); + + assertEquals(2, top2.size()); + // Highest: partition 0 (20 items) + assertEquals(0, top2.get(0).getPartitionIndex()); + assertEquals(20, top2.get(0).getUsed()); + assertEquals(20.0, top2.get(0).getUsedPercentage(), 0.01); + // Second: partition 2 (15 items) + assertEquals(2, top2.get(1).getPartitionIndex()); + assertEquals(15, top2.get(1).getUsed()); + + // topN with n > partitionCount returns all + final java.util.List<BatchQueueStats.PartitionUsage> topAll = stats.topN(10); + assertEquals(4, topAll.size()); + + blockLatch.countDown(); + } + @Test public void testBackoffResetsOnData() throws Exception { final AtomicInteger consumeCount = new AtomicInteger(0); diff --git a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java index 4a77f7ad43..3cbdeb6c0a 100644 --- a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java +++ b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java @@ -75,4 +75,36 @@ public class ThreadPolicyTest { public void testToStringCpuCores() { assertEquals("cpuCores(0.5)", ThreadPolicy.cpuCores(0.5).toString()); } + + @Test + public void testCpuCoresWithBaseAddsBaseToScaled() { + final int cores = Runtime.getRuntime().availableProcessors(); + final int resolved = ThreadPolicy.cpuCoresWithBase(2, 0.25).resolve(); + assertEquals(2 + (int) Math.round(0.25 * cores), resolved); + } + + @Test + public void testCpuCoresWithBaseResolvesAtLeastOne() { + assertTrue(ThreadPolicy.cpuCoresWithBase(0, 0.001).resolve() >= 1); + } + + @Test + public void testCpuCoresWithBaseRejectsNegativeBase() { + assertThrows(IllegalArgumentException.class, () -> ThreadPolicy.cpuCoresWithBase(-1, 0.25)); + } + + @Test + public void testCpuCoresWithBaseRejectsZeroMultiplier() { + assertThrows(IllegalArgumentException.class, () -> ThreadPolicy.cpuCoresWithBase(2, 0)); + } + + @Test + public void testCpuCoresWithBaseRejectsNegativeMultiplier() { + assertThrows(IllegalArgumentException.class, () -> ThreadPolicy.cpuCoresWithBase(2, -0.5)); + } + + @Test + public void testToStringCpuCoresWithBase() { + assertEquals("cpuCoresWithBase(2, 0.25)", ThreadPolicy.cpuCoresWithBase(2, 0.25).toString()); + } } diff --git a/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml b/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml index 56b1cf9f71..223c6e0cf4 100644 --- a/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml +++ b/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml @@ -69,7 +69,7 @@ metricsRules: metrics_aggregation.tagEqual('dimensionality', 'minute').sum(['service', 'host_name', 'level']).increase('PT1M') .tag({tags -> if (tags['level'] == '1') {tags.level = 'L1 aggregation'} }).tag({tags -> if (tags['level'] == '2') {tags.level = 'L2 aggregation'} }) - name: instance_metrics_aggregation_queue_used_percentage - exp: metrics_aggregation_queue_used_percentage.sum(['service', 'host_name', 'level', 'kind', 'metricName']) + exp: metrics_aggregation_queue_used_percentage.sum(['service', 'host_name', 'level', 'slot']) - name: instance_persistence_execute_percentile exp: persistence_timer_bulk_execute_latency.sum(['le', 'service', 'host_name']).increase('PT1M').histogram().histogram_percentile([50,75,90,95,99]) - name: instance_persistence_prepare_percentile diff --git a/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json b/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json index 0e630c917a..cfba96f7c0 100644 --- a/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json +++ b/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json @@ -470,7 +470,7 @@ "i": "22", "type": "Widget", "expressions": [ - "sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='1',kind='OAL'},10,des,avg)" + "sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='1'},10,des,avg)" ], "graph": { "type": "Line", @@ -481,7 +481,7 @@ "showYAxis": true }, "widget": { - "title": "OAL L1 Aggregation Queue Percentage (%)" + "title": "L1 Aggregation Queue Percentage (%)" } }, { @@ -492,51 +492,7 @@ "i": "23", "type": "Widget", "expressions": [ - "sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='1',kind='MAL'},10,des,avg)" - ], - "widget": { - "title": "MAL L1 Aggregation Queue Percentage (%)" - }, - "graph": { - "type": "Line", - "step": false, - "smooth": false, - "showSymbol": true, - "showXAxis": true, - "showYAxis": true - } - }, - { - "x": 12, - "y": 65, - "w": 12, - "h": 13, - "i": "24", - "type": "Widget", - "graph": { - "type": "Line", - "step": false, - "smooth": false, - "showSymbol": true, - "showXAxis": true, - "showYAxis": true - }, - "expressions": [ - "sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='2',kind='OAL'},10,des,avg)" - ], - "widget": { - "title": "OAL L2 Aggregation Queue Percentage (%)" - } - }, - { - "x": 0, - "y": 78, - "w": 12, - "h": 13, - "i": "25", - "type": "Widget", - "expressions": [ - "sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='2',kind='MAL'},10,des,avg)" + "sort_values(meter_oap_instance_metrics_aggregation_queue_used_percentage{level='2'},10,des,avg)" ], "graph": { "type": "Line", @@ -547,15 +503,15 @@ "showYAxis": true }, "widget": { - "title": "MAL L2 Aggregation Queue Percentage (%)" + "title": "L2 Aggregation Queue Percentage (%)" } }, { "x": 12, - "y": 78, + "y": 65, "w": 12, "h": 13, - "i": "26", + "i": "24", "type": "Widget", "graph": { "type": "Line", @@ -574,10 +530,10 @@ }, { "x": 0, - "y": 91, + "y": 78, "w": 12, "h": 13, - "i": "27", + "i": "25", "type": "Widget", "expressions": [ "sort_values(meter_oap_instance_metrics_persistent_collection_cached_size{dimensionality='minute',kind='MAL'},10,des,avg)"
