This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new a0cec0ca23 Add QueueUsageBenchmark to validate BatchQueueStats under 
backpressure (#13710)
a0cec0ca23 is described below

commit a0cec0ca237792497d2da0b65757d11f58c3f342
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Wed Feb 25 12:21:15 2026 +0800

    Add QueueUsageBenchmark to validate BatchQueueStats under backpressure 
(#13710)
---
 .../library/batchqueue/BatchQueueBenchmark.java    |  63 +++-
 .../library/batchqueue/QueueUsageBenchmark.java    | 316 +++++++++++++++++++++
 2 files changed, 376 insertions(+), 3 deletions(-)

diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java
index db3cbff076..ba65a94a30 100644
--- 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.library.batchqueue;
 
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -457,16 +458,61 @@ public class BatchQueueBenchmark {
         Thread.sleep(200);
         consumed.set(0);
 
-        // Measure
+        // Measure — sample queue usage periodically during the run
+        final int sampleCount = MEASURE_SECONDS;
+        final double[] usageSamples = new double[sampleCount];
+        final double[] topPartitionSamples = new double[sampleCount];
+
         final long measureStart = System.currentTimeMillis();
         final long measureEnd = measureStart + MEASURE_SECONDS * 1000L;
+
+        final Thread sampler = new Thread(() -> {
+            for (int s = 0; s < sampleCount; s++) {
+                try {
+                    Thread.sleep(1000);
+                } catch (final InterruptedException e) {
+                    break;
+                }
+                final BatchQueueStats stats = queue.stats();
+                usageSamples[s] = stats.totalUsedPercentage();
+                final List<BatchQueueStats.PartitionUsage> top = stats.topN(1);
+                topPartitionSamples[s] = top.isEmpty() ? 0 : 
top.get(0).getUsedPercentage();
+            }
+        });
+        sampler.setName("UsageSampler");
+        sampler.setDaemon(true);
+        sampler.start();
+
         final long produced = runProducers(queue, typeCount, PRODUCER_THREADS, 
measureEnd);
         final long measureDuration = System.currentTimeMillis() - measureStart;
 
+        sampler.join(2000);
+
         Thread.sleep(500);
         final long totalConsumed = consumed.get();
         final double consumeRate = totalConsumed * 1000.0 / measureDuration;
 
+        // Compute queue usage stats
+        double usageSum = 0;
+        double usageMax = 0;
+        double topSum = 0;
+        double topMax = 0;
+        int validSamples = 0;
+        for (int s = 0; s < sampleCount; s++) {
+            if (usageSamples[s] > 0 || s == 0) {
+                validSamples++;
+                usageSum += usageSamples[s];
+                usageMax = Math.max(usageMax, usageSamples[s]);
+                topSum += topPartitionSamples[s];
+                topMax = Math.max(topMax, topPartitionSamples[s]);
+            }
+        }
+        final double usageAvg = validSamples > 0 ? usageSum / validSamples : 0;
+        final double topAvg = validSamples > 0 ? topSum / validSamples : 0;
+
+        // Final snapshot after producers stop
+        final BatchQueueStats finalStats = queue.stats();
+
         log.info("\n=== BatchQueue Benchmark: {} ===\n"
                 + "  Types:       {}\n"
                 + "  Threads:     {}\n"
@@ -479,14 +525,25 @@ public class BatchQueueBenchmark {
                 + "  Produced:    {}\n"
                 + "  Consumed:    {}\n"
                 + "  Consume rate:  {} items/sec\n"
-                + "  Drop rate:     {}%\n",
+                + "  Drop rate:     {}%\n"
+                + "  --- Queue Usage (sampled every 1s during measurement) 
---\n"
+                + "  Total usage:   avg={}%, max={}%\n"
+                + "  Top partition: avg={}%, max={}%\n"
+                + "  Final snapshot: totalUsed={}/{} ({}%), top1 
partition={}%\n",
             label, typeCount, THREADS, partitions, bufferSize, strategy,
             rebalance, PRODUCER_THREADS,
             measureDuration,
             String.format("%,d", produced), String.format("%,d", 
totalConsumed),
             String.format("%,.0f", consumeRate),
             String.format("%.2f", produced > 0
-                ? (produced - totalConsumed) * 100.0 / produced : 0));
+                ? (produced - totalConsumed) * 100.0 / produced : 0),
+            String.format("%.1f", usageAvg), String.format("%.1f", usageMax),
+            String.format("%.1f", topAvg), String.format("%.1f", topMax),
+            String.format("%,d", finalStats.totalUsed()),
+            String.format("%,d", finalStats.totalCapacity()),
+            String.format("%.1f", finalStats.totalUsedPercentage()),
+            String.format("%.1f", finalStats.topN(1).isEmpty()
+                ? 0 : finalStats.topN(1).get(0).getUsedPercentage()));
 
         return consumeRate;
     }
diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/QueueUsageBenchmark.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/QueueUsageBenchmark.java
new file mode 100644
index 0000000000..6a3d8de7b5
--- /dev/null
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/QueueUsageBenchmark.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.batchqueue;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Queue usage benchmark — validates {@link BatchQueueStats} metrics under 
realistic
+ * backpressure by adding simulated processing cost in consumers.
+ *
+ * <p>Unlike {@link BatchQueueBenchmark} which uses no-op consumers (queue 
stays near 0%),
+ * this benchmark adds per-item CPU work via busy-spin to simulate real 
handler cost
+ * (metrics merging, serialization). This creates genuine backpressure so the 
queue
+ * fills up and the usage percentage becomes meaningful.
+ *
+ * <p>Run with: mvn test -pl oap-server/server-library/library-batch-queue
+ *           -Dtest=QueueUsageBenchmark -DfailIfNoTests=false
+ */
+@Slf4j
+@SuppressWarnings("all")
+public class QueueUsageBenchmark {
+
+    private static final int WARMUP_SECONDS = 2;
+    private static final int MEASURE_SECONDS = 5;
+    private static final int PRODUCER_THREADS = 32;
+    private static final ThreadPolicy THREADS = ThreadPolicy.fixed(8);
+    private static final int TOP_N = 3;
+
+    private static final PartitionSelector<BenchmarkMetricTypes.TypedMetric> 
TYPE_ID_SELECTOR =
+        (data, count) -> data.typeId % count;
+
+    @AfterEach
+    public void cleanup() {
+        BatchQueueManager.reset();
+    }
+
+    /**
+     * Light consumer cost (~200ns per item). Drain threads mostly keep up,
+     * but usage should rise above 0% to validate the metric under mild load.
+     */
+    @Test
+    public void usageUnderLightLoad() throws Exception {
+        runUsageBenchmark("light-load", 500, BufferStrategy.IF_POSSIBLE, 200);
+    }
+
+    /**
+     * Medium consumer cost (~500ns per item). Creates moderate backpressure —
+     * queue usage should reach 5-30%.
+     */
+    @Test
+    public void usageUnderMediumLoad() throws Exception {
+        runUsageBenchmark("medium-load", 500, BufferStrategy.IF_POSSIBLE, 500);
+    }
+
+    /**
+     * Heavy consumer cost (~1μs per item). Creates strong backpressure —
+     * queue fills significantly. IF_POSSIBLE may drop items.
+     */
+    @Test
+    public void usageUnderHeavyLoad() throws Exception {
+        runUsageBenchmark("heavy-load", 500, BufferStrategy.IF_POSSIBLE, 1000);
+    }
+
+    /**
+     * Heavy load with BLOCKING strategy — producers block when queue is full,
+     * no data loss. Usage should be high and sustained.
+     */
+    @Test
+    public void usageUnderHeavyLoadBlocking() throws Exception {
+        runUsageBenchmark("heavy-load-blocking", 500, BufferStrategy.BLOCKING, 
1000);
+    }
+
+    /**
+     * Medium load with 1000 types — more partitions, validates per-partition
+     * usage tracking at scale.
+     */
+    @Test
+    public void usageUnderMediumLoad1000Types() throws Exception {
+        runUsageBenchmark("medium-load-1000t", 1000, 
BufferStrategy.IF_POSSIBLE, 500);
+    }
+
+    /**
+     * Heavy load with BLOCKING and 1000 types.
+     */
+    @Test
+    public void usageUnderHeavyLoadBlocking1000Types() throws Exception {
+        runUsageBenchmark("heavy-load-blocking-1000t", 1000, 
BufferStrategy.BLOCKING, 1000);
+    }
+
+    private void runUsageBenchmark(final String label, final int typeCount,
+                                   final BufferStrategy strategy,
+                                   final long consumeNanosPerItem) throws 
Exception {
+        final int partitionCount = PartitionPolicy.adaptive()
+            .resolve(THREADS.resolve(), typeCount);
+        final PartitionPolicy partitions = 
PartitionPolicy.fixed(partitionCount);
+        final AtomicLong consumed = new AtomicLong(0);
+
+        final 
BatchQueueConfig.BatchQueueConfigBuilder<BenchmarkMetricTypes.TypedMetric> 
configBuilder =
+            BatchQueueConfig.<BenchmarkMetricTypes.TypedMetric>builder()
+                .threads(THREADS)
+                .partitions(partitions)
+                .bufferSize(50_000)
+                .strategy(strategy)
+                .partitionSelector(TYPE_ID_SELECTOR)
+                .minIdleMs(1)
+                .maxIdleMs(50);
+
+        final BatchQueue<BenchmarkMetricTypes.TypedMetric> queue = 
BatchQueueManager.create(
+            "usage-" + label, configBuilder.build());
+
+        for (int t = 0; t < typeCount; t++) {
+            queue.addHandler(BenchmarkMetricTypes.CLASSES[t],
+                (HandlerConsumer<BenchmarkMetricTypes.TypedMetric>) data -> {
+                    for (int i = 0; i < data.size(); i++) {
+                        busySpin(consumeNanosPerItem);
+                    }
+                    consumed.addAndGet(data.size());
+                });
+        }
+
+        // Warmup — wait until every produced item is consumed before 
measurement.
+        // Comparing consumed count against produced count is more precise 
than polling
+        // queue size, because queue.stats().totalUsed()==0 can be true while 
a drain
+        // thread is still inside the consumer callback (items dequeued but 
not yet counted).
+        final long warmupEnd = System.currentTimeMillis() + WARMUP_SECONDS * 
1000L;
+        final long warmupProduced = runProducers(queue, typeCount, 
PRODUCER_THREADS, warmupEnd);
+        final CountDownLatch warmupDrained = new CountDownLatch(1);
+        final Thread warmupWaiter = new Thread(() -> {
+            while (consumed.get() < warmupProduced) {
+                try {
+                    Thread.sleep(50);
+                } catch (final InterruptedException e) {
+                    break;
+                }
+            }
+            warmupDrained.countDown();
+        });
+        warmupWaiter.setDaemon(true);
+        warmupWaiter.start();
+        warmupDrained.await(30, TimeUnit.SECONDS);
+        consumed.set(0);
+
+        // Measure — sample queue usage every 500ms
+        final int sampleInterval = 500;
+        final int maxSamples = MEASURE_SECONDS * 1000 / sampleInterval;
+        final double[] totalUsageSamples = new double[maxSamples];
+        final double[] topPartitionSamples = new double[maxSamples];
+        final int[][] topNIndexSamples = new int[maxSamples][TOP_N];
+        final double[][] topNUsageSamples = new double[maxSamples][TOP_N];
+        final AtomicLong samplesTaken = new AtomicLong(0);
+
+        final long measureStart = System.currentTimeMillis();
+        final long measureEnd = measureStart + MEASURE_SECONDS * 1000L;
+
+        final Thread sampler = new Thread(() -> {
+            for (int s = 0; s < maxSamples; s++) {
+                try {
+                    Thread.sleep(sampleInterval);
+                } catch (final InterruptedException e) {
+                    break;
+                }
+                final BatchQueueStats stats = queue.stats();
+                totalUsageSamples[s] = stats.totalUsedPercentage();
+                final List<BatchQueueStats.PartitionUsage> top = 
stats.topN(TOP_N);
+                topPartitionSamples[s] = top.isEmpty() ? 0 : 
top.get(0).getUsedPercentage();
+                for (int i = 0; i < TOP_N && i < top.size(); i++) {
+                    topNIndexSamples[s][i] = top.get(i).getPartitionIndex();
+                    topNUsageSamples[s][i] = top.get(i).getUsedPercentage();
+                }
+                samplesTaken.incrementAndGet();
+            }
+        });
+        sampler.setName("UsageSampler");
+        sampler.setDaemon(true);
+        sampler.start();
+
+        final long produced = runProducers(queue, typeCount, PRODUCER_THREADS, 
measureEnd);
+        final long measureDuration = System.currentTimeMillis() - measureStart;
+
+        sampler.join(2000);
+
+        // Let drain threads catch up for final snapshot
+        Thread.sleep(1000);
+        final long totalConsumed = consumed.get();
+        final double consumeRate = totalConsumed * 1000.0 / measureDuration;
+
+        // Compute usage stats
+        final int samples = (int) samplesTaken.get();
+        double usageSum = 0;
+        double usageMin = Double.MAX_VALUE;
+        double usageMax = 0;
+        double topSum = 0;
+        double topMax = 0;
+        for (int s = 0; s < samples; s++) {
+            usageSum += totalUsageSamples[s];
+            usageMin = Math.min(usageMin, totalUsageSamples[s]);
+            usageMax = Math.max(usageMax, totalUsageSamples[s]);
+            topSum += topPartitionSamples[s];
+            topMax = Math.max(topMax, topPartitionSamples[s]);
+        }
+        if (samples == 0) {
+            usageMin = 0;
+        }
+        final double usageAvg = samples > 0 ? usageSum / samples : 0;
+        final double topAvg = samples > 0 ? topSum / samples : 0;
+
+        final BatchQueueStats finalStats = queue.stats();
+
+        // Build the output
+        final StringBuilder sb = new StringBuilder();
+        sb.append(String.format("\n=== Queue Usage Benchmark: %s ===\n", 
label));
+        sb.append(String.format("  Types:           %d\n", typeCount));
+        sb.append(String.format("  Threads:         %s\n", THREADS));
+        sb.append(String.format("  Partitions:      %s\n", partitions));
+        sb.append(String.format("  BufferSize:      %d\n", 50_000));
+        sb.append(String.format("  Strategy:        %s\n", strategy));
+        sb.append(String.format("  Consumer cost:   %d ns/item\n", 
consumeNanosPerItem));
+        sb.append(String.format("  Producers:       %d\n", PRODUCER_THREADS));
+        sb.append(String.format("  Duration:        %d ms\n", 
measureDuration));
+        sb.append(String.format("  Produced:        %s\n", 
String.format("%,d", produced)));
+        sb.append(String.format("  Consumed:        %s\n", 
String.format("%,d", totalConsumed)));
+        sb.append(String.format("  Consume rate:    %s items/sec\n", 
String.format("%,.0f", consumeRate)));
+        sb.append(String.format("  Drop rate:       %s%%\n", 
String.format("%.2f",
+            produced > 0 ? (produced - totalConsumed) * 100.0 / produced : 
0)));
+        sb.append("\n  --- Queue Usage (sampled every 500ms) ---\n");
+        sb.append(String.format("  Samples:         %d\n", samples));
+        sb.append(String.format("  Total usage:     min=%.1f%%, avg=%.1f%%, 
max=%.1f%%\n",
+            usageMin, usageAvg, usageMax));
+        sb.append(String.format("  Top partition:   avg=%.1f%%, max=%.1f%%\n", 
topAvg, topMax));
+        sb.append(String.format("  Final snapshot:  totalUsed=%s/%s 
(%.1f%%)\n",
+            String.format("%,d", finalStats.totalUsed()),
+            String.format("%,d", finalStats.totalCapacity()),
+            finalStats.totalUsedPercentage()));
+
+        // Per-sample timeline
+        sb.append("\n  --- Usage Timeline ---\n");
+        sb.append("  Sample   Total%   Top1 partition (index: usage%)\n");
+        sb.append("  ------   ------   ------------------------------\n");
+        for (int s = 0; s < samples; s++) {
+            sb.append(String.format("  %4dms   %5.1f%%   ", (s + 1) * 
sampleInterval, totalUsageSamples[s]));
+            for (int i = 0; i < TOP_N; i++) {
+                if (topNUsageSamples[s][i] > 0) {
+                    sb.append(String.format("p%d:%.1f%%  ", 
topNIndexSamples[s][i], topNUsageSamples[s][i]));
+                }
+            }
+            sb.append("\n");
+        }
+
+        log.info(sb.toString());
+    }
+
+    /**
+     * Busy-spin for approximately the given nanoseconds.
+     * More accurate than Thread.sleep for sub-microsecond delays.
+     */
+    private static void busySpin(final long nanos) {
+        final long end = System.nanoTime() + nanos;
+        while (System.nanoTime() < end) {
+            // spin
+        }
+    }
+
+    private long runProducers(final 
BatchQueue<BenchmarkMetricTypes.TypedMetric> queue,
+                              final int typeCount, final int producerCount,
+                              final long endTimeMs) throws 
InterruptedException {
+        final AtomicLong totalProduced = new AtomicLong(0);
+        final CountDownLatch done = new CountDownLatch(producerCount);
+
+        for (int p = 0; p < producerCount; p++) {
+            final int producerIndex = p;
+            final Thread thread = new Thread(() -> {
+                long count = 0;
+                int typeIndex = producerIndex;
+                while (System.currentTimeMillis() < endTimeMs) {
+                    for (int batch = 0; batch < 100; batch++) {
+                        final int type = typeIndex % typeCount;
+                        if 
(queue.produce(BenchmarkMetricTypes.FACTORIES[type].create(count))) {
+                            count++;
+                        }
+                        typeIndex++;
+                    }
+                }
+                totalProduced.addAndGet(count);
+                done.countDown();
+            });
+            thread.setName("Producer-" + producerIndex);
+            thread.setDaemon(true);
+            thread.start();
+        }
+
+        done.await(MEASURE_SECONDS + WARMUP_SECONDS + 10, TimeUnit.SECONDS);
+        return totalProduced.get();
+    }
+}

Reply via email to