This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch library-batch-queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 8ce7cd06eaa743df69f3f14d4702031f9fa52bc1
Author: Wu Sheng <[email protected]>
AuthorDate: Sat Feb 14 22:08:16 2026 +0800

    Replace DataCarrier with shared BatchQueue for all three exporters
    
    The gRPC metrics, Kafka trace, and Kafka log exporters now share a
    single BatchQueue with adaptive partitions and 3 dedicated threads,
    replacing the per-exporter DataCarrier instances. The ExporterService
    interface no longer declares start() — lifecycle is managed by
    ExporterProvider which creates the shared queue and passes it to
    each enabled exporter.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 docs/en/changes/changes.md                         | 16 +++++++
 oap-server/exporter/pom.xml                        |  2 +-
 .../server/exporter/provider/ExporterProvider.java | 23 +++++++--
 .../server/exporter/provider/ExporterSetting.java  |  3 +-
 .../provider/grpc/GRPCMetricsExporter.java         | 30 +++++-------
 .../provider/kafka/log/KafkaLogExporter.java       | 32 +++++--------
 .../provider/kafka/trace/KafkaTraceExporter.java   | 33 +++++--------
 .../provider/grpc/GRPCExporterProviderTest.java    |  3 +-
 .../exporter/provider/grpc/GRPCExporterTest.java   | 54 +++++++++++-----------
 .../oap/server/core/exporter/ExporterService.java  |  2 -
 10 files changed, 101 insertions(+), 97 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index ec5d97fd4c..93f3cc67bc 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -12,6 +12,22 @@
 * Add `CLAUDE.md` as AI assistant guide for the project.
 * Upgrade Groovy to 5.0.3 in OAP backend.
 * Bump up nodejs to v24.13.0 for the latest UI(booster-ui) compiling.
+* Add `library-batch-queue` module — a partitioned, self-draining queue with 
type-based dispatch,
+  adaptive partitioning, and idle backoff. Designed to replace DataCarrier in 
high-fan-out scenarios.
+* Replace DataCarrier with BatchQueue for L1 metrics aggregation, L2 metrics 
persistence, TopN persistence,
+  and all three exporters (gRPC metrics, Kafka trace, Kafka log).
+  All metric types (OAL + MAL) now share unified queues instead of separate 
OAL/MAL pools.
+  The three exporters now share a single adaptive queue with 3 dedicated 
threads.
+  Thread count comparison on an 8-core machine:
+
+  | Queue | Old threads | Old channels | Old buffer slots | New threads | New 
partitions | New buffer slots | New policy |
+  
|-------|-------------|--------------|------------------|-------------|----------------|------------------|------------|
+  | L1 Aggregation (OAL) | 24 | ~1,240 | ~12.4M | 8 (unified) | ~460 adaptive 
| ~9.2M | `cpuCores(1.0)` |
+  | L1 Aggregation (MAL) | 2 | ~100 | ~100K | (unified above) | | | |
+  | L2 Persistence (OAL) | 2 | ~620 | ~1.24M | 3 (unified) | ~460 adaptive | 
~920K | `cpuCoresWithBase(1, 0.25)` |
+  | L2 Persistence (MAL) | 1 | ~100 | ~100K | (unified above) | | | |
+  | TopN Persistence | 4 | 4 | 4K | 1 | 4 adaptive | 4K | `fixed(1)` |
+  | **Total** | **33** | **~2,064** | **~13.8M** | **12** | **~924** | 
**~10.1M** | |
 
 #### OAP Server
 
diff --git a/oap-server/exporter/pom.xml b/oap-server/exporter/pom.xml
index ca75e04522..85dc662d51 100644
--- a/oap-server/exporter/pom.xml
+++ b/oap-server/exporter/pom.xml
@@ -35,7 +35,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.skywalking</groupId>
-            <artifactId>library-datacarrier-queue</artifactId>
+            <artifactId>library-batch-queue</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
diff --git 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java
 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java
index b9d1f18107..d33b92b8e8 100644
--- 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java
+++ 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java
@@ -26,6 +26,12 @@ import 
org.apache.skywalking.oap.server.core.exporter.TraceExportService;
 import 
org.apache.skywalking.oap.server.exporter.provider.grpc.GRPCMetricsExporter;
 import 
org.apache.skywalking.oap.server.exporter.provider.kafka.log.KafkaLogExporter;
 import 
org.apache.skywalking.oap.server.exporter.provider.kafka.trace.KafkaTraceExporter;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueConfig;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueManager;
+import org.apache.skywalking.oap.server.library.batchqueue.BufferStrategy;
+import org.apache.skywalking.oap.server.library.batchqueue.PartitionPolicy;
+import org.apache.skywalking.oap.server.library.batchqueue.ThreadPolicy;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 import org.apache.skywalking.oap.server.library.module.ModuleProvider;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -74,14 +80,25 @@ public class ExporterProvider extends ModuleProvider {
 
     @Override
     public void start() throws ServiceNotProvidedException, 
ModuleStartException {
+        final BatchQueue<Object> queue = BatchQueueManager.create(
+            "exporter",
+            BatchQueueConfig.<Object>builder()
+                .threads(ThreadPolicy.fixed(3))
+                .partitions(PartitionPolicy.adaptive())
+                .bufferSize(setting.getBufferSize())
+                .strategy(BufferStrategy.IF_POSSIBLE)
+                .maxIdleMs(200)
+                .build()
+        );
+
         if (setting.isEnableGRPCMetrics()) {
-            grpcMetricsExporter.start();
+            grpcMetricsExporter.start(queue);
         }
         if (setting.isEnableKafkaTrace()) {
-            kafkaTraceExporter.start();
+            kafkaTraceExporter.start(queue);
         }
         if (setting.isEnableKafkaLog()) {
-            kafkaLogExporter.start();
+            kafkaLogExporter.start(queue);
         }
     }
 
diff --git 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java
 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java
index d63be42e87..cde8c737cf 100644
--- 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java
+++ 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java
@@ -28,8 +28,7 @@ public class ExporterSetting extends ModuleConfig {
     private boolean enableGRPCMetrics = false;
     private String gRPCTargetHost;
     private int gRPCTargetPort;
-    private int bufferChannelSize = 20000;
-    private int bufferChannelNum = 2;
+    private int bufferSize = 20000;
 
     //kafka
     private boolean enableKafkaTrace = false;
diff --git 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java
 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java
index d2b6bda6fd..2c24b649c4 100644
--- 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java
+++ 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java
@@ -48,14 +48,13 @@ import 
org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
 import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
 import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
 import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
 import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
-import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus;
 
 @Slf4j
-public class GRPCMetricsExporter extends MetricFormatter implements 
MetricValuesExportService, IConsumer<ExportData> {
+public class GRPCMetricsExporter extends MetricFormatter implements 
MetricValuesExportService {
     /**
      * The period of subscription list fetching is hardcoded as 30s.
      */
@@ -63,7 +62,7 @@ public class GRPCMetricsExporter extends MetricFormatter 
implements MetricValues
     private final ExporterSetting setting;
     private MetricExportServiceGrpc.MetricExportServiceStub 
exportServiceFutureStub;
     private MetricExportServiceGrpc.MetricExportServiceBlockingStub 
blockingStub;
-    private DataCarrier exportBuffer;
+    private BatchQueue<Object> queue;
     private ReentrantLock fetchListLock;
     private volatile List<SubscriptionMetric> subscriptionList;
     private volatile long lastFetchTimestamp = 0;
@@ -72,15 +71,15 @@ public class GRPCMetricsExporter extends MetricFormatter 
implements MetricValues
         this.setting = setting;
     }
 
-    @Override
-    public void start() {
+    @SuppressWarnings("unchecked")
+    public void start(BatchQueue<Object> queue) {
         GRPCClient client = new GRPCClient(setting.getGRPCTargetHost(), 
setting.getGRPCTargetPort());
         client.connect();
         ManagedChannel channel = client.getChannel();
         exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
         blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
-        exportBuffer = new 
DataCarrier<ExportData>(setting.getBufferChannelNum(), 
setting.getBufferChannelSize());
-        exportBuffer.consume(this, 1, 200);
+        this.queue = queue;
+        queue.addHandler((Class<? extends Object>) ExportData.class, 
this::consumeExportData);
         subscriptionList = new ArrayList<>();
         fetchListLock = new ReentrantLock();
     }
@@ -91,12 +90,12 @@ public class GRPCMetricsExporter extends MetricFormatter 
implements MetricValues
         if (metrics instanceof WithMetadata) {
             MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta();
             if (subscriptionList.size() == 0 && 
ExportEvent.EventType.INCREMENT.equals(event.getType())) {
-                exportBuffer.produce(new ExportData(meta, metrics, 
event.getType()));
+                queue.produce(new ExportData(meta, metrics, event.getType()));
             } else {
                 subscriptionList.forEach(subscriptionMetric -> {
                     if 
(subscriptionMetric.getMetricName().equals(meta.getMetricsName()) &&
                         eventTypeMatch(event.getType(), 
subscriptionMetric.getEventType())) {
-                        exportBuffer.produce(new ExportData(meta, metrics, 
event.getType()));
+                        queue.produce(new ExportData(meta, metrics, 
event.getType()));
                     }
                 });
             }
@@ -133,8 +132,7 @@ public class GRPCMetricsExporter extends MetricFormatter 
implements MetricValues
         }
     }
 
-    @Override
-    public void consume(List<ExportData> data) {
+    void consumeExportData(List<Object> data) {
         if (CollectionUtils.isNotEmpty(data)) {
             GRPCStreamStatus status = new GRPCStreamStatus();
             StreamObserver<ExportMetricValue> streamObserver =
@@ -164,7 +162,8 @@ public class GRPCMetricsExporter extends MetricFormatter 
implements MetricValues
                                            });
             AtomicInteger exportNum = new AtomicInteger();
 
-            data.forEach(row -> {
+            data.forEach(item -> {
+                ExportData row = (ExportData) item;
                 ExportMetricValue.Builder builder = 
ExportMetricValue.newBuilder();
 
                 Metrics metrics = row.getMetrics();
@@ -247,11 +246,6 @@ public class GRPCMetricsExporter extends MetricFormatter 
implements MetricValues
         fetchSubscriptionList();
     }
 
-    @Override
-    public void onError(List<ExportData> data, Throwable t) {
-        log.error(t.getMessage(), t);
-    }
-
     private boolean eventTypeMatch(ExportEvent.EventType eventType,
                                    
org.apache.skywalking.oap.server.exporter.grpc.EventType subscriptionType) {
         return (ExportEvent.EventType.INCREMENT.equals(eventType) && 
EventType.INCREMENT.equals(subscriptionType))
diff --git 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java
 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java
index bc54fd7360..64dae615b0 100644
--- 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java
+++ 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java
@@ -37,9 +37,7 @@ import 
org.apache.skywalking.oap.server.core.exporter.LogExportService;
 import org.apache.skywalking.oap.server.core.query.type.ContentType;
 import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
 import 
org.apache.skywalking.oap.server.exporter.provider.kafka.KafkaExportProducer;
-import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
-import 
org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
@@ -48,8 +46,8 @@ import 
org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 
 @Slf4j
-public class KafkaLogExporter extends KafkaExportProducer implements 
LogExportService, IConsumer<LogRecord> {
-    private DataCarrier<LogRecord> exportBuffer;
+public class KafkaLogExporter extends KafkaExportProducer implements 
LogExportService {
+    private BatchQueue<Object> queue;
     private CounterMetrics successCounter;
     private CounterMetrics errorCounter;
     private final ModuleManager moduleManager;
@@ -59,14 +57,11 @@ public class KafkaLogExporter extends KafkaExportProducer 
implements LogExportSe
         this.moduleManager = manager;
     }
 
-    @Override
-    public void start() {
+    @SuppressWarnings("unchecked")
+    public void start(BatchQueue<Object> queue) {
         super.getProducer();
-        exportBuffer = new DataCarrier<>(
-            "KafkaLogExporter", "KafkaLogExporter", 
setting.getBufferChannelNum(), setting.getBufferChannelSize(),
-            BufferStrategy.IF_POSSIBLE
-        );
-        exportBuffer.consume(this, 1, 200);
+        this.queue = queue;
+        queue.addHandler((Class<? extends Object>) LogRecord.class, 
this::consumeLogRecords);
         MetricsCreator metricsCreator = 
moduleManager.find(TelemetryModule.NAME)
                                                      .provider()
                                                      
.getService(MetricsCreator.class);
@@ -84,7 +79,7 @@ public class KafkaLogExporter extends KafkaExportProducer 
implements LogExportSe
     @Override
     public void export(final LogRecord logRecord) {
         if (logRecord != null) {
-            exportBuffer.produce(logRecord);
+            queue.produce(logRecord);
         }
     }
 
@@ -93,9 +88,9 @@ public class KafkaLogExporter extends KafkaExportProducer 
implements LogExportSe
         return setting.isEnableKafkaLog();
     }
 
-    @Override
-    public void consume(final List<LogRecord> data) {
-        for (LogRecord logRecord : data) {
+    private void consumeLogRecords(final List<Object> data) {
+        for (Object item : data) {
+            LogRecord logRecord = (LogRecord) item;
             if (logRecord != null) {
                 try {
                     LogData logData = transLogData(logRecord);
@@ -120,11 +115,6 @@ public class KafkaLogExporter extends KafkaExportProducer 
implements LogExportSe
         }
     }
 
-    @Override
-    public void onError(final List<LogRecord> data, final Throwable t) {
-
-    }
-
     private LogData transLogData(LogRecord logRecord) throws 
InvalidProtocolBufferException {
         LogData.Builder builder = LogData.newBuilder();
         LogDataBody.Builder bodyBuilder = LogDataBody.newBuilder();
diff --git 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java
 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java
index 0c3d90f482..8f131a4c0f 100644
--- 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java
+++ 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java
@@ -30,9 +30,7 @@ import 
org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentReco
 import org.apache.skywalking.oap.server.core.exporter.TraceExportService;
 import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
 import 
org.apache.skywalking.oap.server.exporter.provider.kafka.KafkaExportProducer;
-import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
-import 
org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
@@ -40,8 +38,8 @@ import 
org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 
 @Slf4j
-public class KafkaTraceExporter extends KafkaExportProducer implements 
TraceExportService, IConsumer<SegmentRecord> {
-    private DataCarrier<SegmentRecord> exportBuffer;
+public class KafkaTraceExporter extends KafkaExportProducer implements 
TraceExportService {
+    private BatchQueue<Object> queue;
     private CounterMetrics successCounter;
     private CounterMetrics errorCounter;
     private final ModuleManager moduleManager;
@@ -51,14 +49,11 @@ public class KafkaTraceExporter extends KafkaExportProducer 
implements TraceExpo
         this.moduleManager = manager;
     }
 
-    @Override
-    public void start() {
+    @SuppressWarnings("unchecked")
+    public void start(BatchQueue<Object> queue) {
         super.getProducer();
-        exportBuffer = new DataCarrier<>(
-            "KafkaTraceExporter", "KafkaTraceExporter", 
setting.getBufferChannelNum(), setting.getBufferChannelSize(),
-            BufferStrategy.IF_POSSIBLE
-        );
-        exportBuffer.consume(this, 1, 200);
+        this.queue = queue;
+        queue.addHandler((Class<? extends Object>) SegmentRecord.class, 
this::consumeSegmentRecords);
         MetricsCreator metricsCreator = 
moduleManager.find(TelemetryModule.NAME)
                                                      .provider()
                                                      
.getService(MetricsCreator.class);
@@ -75,9 +70,8 @@ public class KafkaTraceExporter extends KafkaExportProducer 
implements TraceExpo
 
     public void export(SegmentRecord segmentRecord) {
         if (segmentRecord != null) {
-            exportBuffer.produce(segmentRecord);
+            queue.produce(segmentRecord);
         }
-
     }
 
     @Override
@@ -85,9 +79,9 @@ public class KafkaTraceExporter extends KafkaExportProducer 
implements TraceExpo
         return setting.isEnableKafkaTrace();
     }
 
-    @Override
-    public void consume(final List<SegmentRecord> data) {
-        for (SegmentRecord segmentRecord : data) {
+    private void consumeSegmentRecords(final List<Object> data) {
+        for (Object item : data) {
+            SegmentRecord segmentRecord = (SegmentRecord) item;
             if (segmentRecord != null) {
                 try {
                     SegmentObject segmentObject = 
SegmentObject.parseFrom(segmentRecord.getDataBinary());
@@ -124,9 +118,4 @@ public class KafkaTraceExporter extends KafkaExportProducer 
implements TraceExpo
         }
         return false;
     }
-
-    @Override
-    public void onError(final List<SegmentRecord> data, final Throwable t) {
-
-    }
 }
diff --git 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
index 6378093c78..d2e473004c 100644
--- 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
+++ 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
@@ -62,8 +62,7 @@ public class GRPCExporterProviderTest {
         assertNotNull(config);
         assertNull(config.getGRPCTargetHost());
         assertEquals(0, config.getGRPCTargetPort());
-        assertEquals(20000, config.getBufferChannelSize());
-        assertEquals(2, config.getBufferChannelNum());
+        assertEquals(20000, config.getBufferSize());
 
         //for test
         config.setGRPCTargetHost("localhost");
diff --git 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
index 23346d88d6..a64acd2d9c 100644
--- 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
+++ 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
@@ -23,6 +23,10 @@ import io.grpc.Server;
 import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.util.MutableHandlerRegistry;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
@@ -35,6 +39,12 @@ import 
org.apache.skywalking.oap.server.exporter.grpc.KeyValue;
 import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
 import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
 import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueConfig;
+import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueManager;
+import org.apache.skywalking.oap.server.library.batchqueue.BufferStrategy;
+import org.apache.skywalking.oap.server.library.batchqueue.PartitionPolicy;
+import org.apache.skywalking.oap.server.library.batchqueue.ThreadPolicy;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -42,10 +52,6 @@ import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.powermock.reflect.Whitebox;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.UUID;
 
 import static 
org.apache.skywalking.oap.server.core.exporter.ExportEvent.EventType.INCREMENT;
 import static org.mockito.Mockito.when;
@@ -64,6 +70,7 @@ public class GRPCExporterTest {
     private ManagedChannel channel;
     private MutableHandlerRegistry serviceRegistry;
     private MockedStatic<DefaultScopeDefine> defineMockedStatic;
+    private String queueName;
 
     @BeforeEach
     public void setUp() throws Exception {
@@ -83,7 +90,18 @@ public class GRPCExporterTest {
         setting.setGRPCTargetHost("localhost");
         setting.setGRPCTargetPort(9870);
         exporter = new GRPCMetricsExporter(setting);
-        exporter.start();
+        queueName = "exporter-test-" + UUID.randomUUID();
+        BatchQueue<Object> queue = BatchQueueManager.create(
+            queueName,
+            BatchQueueConfig.<Object>builder()
+                .threads(ThreadPolicy.fixed(1))
+                .partitions(PartitionPolicy.fixed(1))
+                .bufferSize(20000)
+                .strategy(BufferStrategy.IF_POSSIBLE)
+                .maxIdleMs(200)
+                .build()
+        );
+        exporter.start(queue);
         serviceRegistry.addService(service);
         blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
         futureStub = MetricExportServiceGrpc.newStub(channel);
@@ -95,6 +113,7 @@ public class GRPCExporterTest {
 
     @AfterEach
     public void after() {
+        BatchQueueManager.shutdown(queueName);
         channel.shutdown();
         server.shutdown();
 
@@ -143,15 +162,10 @@ public class GRPCExporterTest {
         Assertions.assertEquals("labeled-mock-metrics", 
subscriptionList.get(3).getMetricName());
     }
 
-    @Test
-    public void init() {
-        exporter.init(null);
-    }
-
     @Test
     public void consume() {
-        exporter.consume(dataList());
-        exporter.consume(Collections.emptyList());
+        exporter.consumeExportData(dataList());
+        exporter.consumeExportData(Collections.emptyList());
         List<ExportMetricValue> exportMetricValues = 
((MockMetricExportServiceImpl) service).exportMetricValues;
         Assertions.assertEquals(3, exportMetricValues.size());
         Assertions.assertEquals(12, 
exportMetricValues.get(0).getMetricValues(0).getLongValue());
@@ -160,20 +174,8 @@ public class GRPCExporterTest {
         
Assertions.assertEquals(KeyValue.newBuilder().setKey("labelName").setValue("labelValue").build(),
 exportMetricValues.get(2).getMetricValues(0).getLabels(0));
     }
 
-    @Test
-    public void onError() {
-        Exception e = new IllegalArgumentException("something wrong");
-        exporter.onError(Collections.emptyList(), e);
-        exporter.onError(dataList(), e);
-    }
-
-    @Test
-    public void onExit() {
-        exporter.onExit();
-    }
-
-    private List<ExportData> dataList() {
-        List<ExportData> dataList = new LinkedList<>();
+    private List<Object> dataList() {
+        List<Object> dataList = new LinkedList<>();
         dataList.add(new ExportData(new MetricsMetaInfo(
             "mock-metrics", DefaultScopeDefine.SERVICE, 
IDManager.ServiceID.buildId("mock-service", true)), new MockMetrics(), 
INCREMENT));
         dataList.add(new ExportData(new MetricsMetaInfo(
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java
index a322dae22a..edf36dc0f6 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java
@@ -22,8 +22,6 @@ import 
org.apache.skywalking.oap.server.library.module.Service;
 
 public interface ExporterService<T> extends Service {
 
-    void start();
-
     void export(T data);
 
     boolean isEnabled();

Reply via email to