This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch library-batch-queue in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 8ce7cd06eaa743df69f3f14d4702031f9fa52bc1 Author: Wu Sheng <[email protected]> AuthorDate: Sat Feb 14 22:08:16 2026 +0800 Replace DataCarrier with shared BatchQueue for all three exporters The gRPC metrics, Kafka trace, and Kafka log exporters now share a single BatchQueue with adaptive partitions and 3 dedicated threads, replacing the per-exporter DataCarrier instances. The ExporterService interface no longer declares start() — lifecycle is managed by ExporterProvider which creates the shared queue and passes it to each enabled exporter. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- docs/en/changes/changes.md | 16 +++++++ oap-server/exporter/pom.xml | 2 +- .../server/exporter/provider/ExporterProvider.java | 23 +++++++-- .../server/exporter/provider/ExporterSetting.java | 3 +- .../provider/grpc/GRPCMetricsExporter.java | 30 +++++------- .../provider/kafka/log/KafkaLogExporter.java | 32 +++++-------- .../provider/kafka/trace/KafkaTraceExporter.java | 33 +++++-------- .../provider/grpc/GRPCExporterProviderTest.java | 3 +- .../exporter/provider/grpc/GRPCExporterTest.java | 54 +++++++++++----------- .../oap/server/core/exporter/ExporterService.java | 2 - 10 files changed, 101 insertions(+), 97 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index ec5d97fd4c..93f3cc67bc 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -12,6 +12,22 @@ * Add `CLAUDE.md` as AI assistant guide for the project. * Upgrade Groovy to 5.0.3 in OAP backend. * Bump up nodejs to v24.13.0 for the latest UI(booster-ui) compiling. +* Add `library-batch-queue` module — a partitioned, self-draining queue with type-based dispatch, + adaptive partitioning, and idle backoff. Designed to replace DataCarrier in high-fan-out scenarios. +* Replace DataCarrier with BatchQueue for L1 metrics aggregation, L2 metrics persistence, TopN persistence, + and all three exporters (gRPC metrics, Kafka trace, Kafka log). + All metric types (OAL + MAL) now share unified queues instead of separate OAL/MAL pools. + The three exporters now share a single adaptive queue with 3 dedicated threads. + Thread count comparison on an 8-core machine: + + | Queue | Old threads | Old channels | Old buffer slots | New threads | New partitions | New buffer slots | New policy | + |-------|-------------|--------------|------------------|-------------|----------------|------------------|------------| + | L1 Aggregation (OAL) | 24 | ~1,240 | ~12.4M | 8 (unified) | ~460 adaptive | ~9.2M | `cpuCores(1.0)` | + | L1 Aggregation (MAL) | 2 | ~100 | ~100K | (unified above) | | | | + | L2 Persistence (OAL) | 2 | ~620 | ~1.24M | 3 (unified) | ~460 adaptive | ~920K | `cpuCoresWithBase(1, 0.25)` | + | L2 Persistence (MAL) | 1 | ~100 | ~100K | (unified above) | | | | + | TopN Persistence | 4 | 4 | 4K | 1 | 4 adaptive | 4K | `fixed(1)` | + | **Total** | **33** | **~2,064** | **~13.8M** | **12** | **~924** | **~10.1M** | | #### OAP Server diff --git a/oap-server/exporter/pom.xml b/oap-server/exporter/pom.xml index ca75e04522..85dc662d51 100644 --- a/oap-server/exporter/pom.xml +++ b/oap-server/exporter/pom.xml @@ -35,7 +35,7 @@ </dependency> <dependency> <groupId>org.apache.skywalking</groupId> - <artifactId>library-datacarrier-queue</artifactId> + <artifactId>library-batch-queue</artifactId> <version>${project.version}</version> </dependency> <dependency> diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java index b9d1f18107..d33b92b8e8 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java @@ -26,6 +26,12 @@ import org.apache.skywalking.oap.server.core.exporter.TraceExportService; import org.apache.skywalking.oap.server.exporter.provider.grpc.GRPCMetricsExporter; import org.apache.skywalking.oap.server.exporter.provider.kafka.log.KafkaLogExporter; import org.apache.skywalking.oap.server.exporter.provider.kafka.trace.KafkaTraceExporter; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueConfig; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueManager; +import org.apache.skywalking.oap.server.library.batchqueue.BufferStrategy; +import org.apache.skywalking.oap.server.library.batchqueue.PartitionPolicy; +import org.apache.skywalking.oap.server.library.batchqueue.ThreadPolicy; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; @@ -74,14 +80,25 @@ public class ExporterProvider extends ModuleProvider { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { + final BatchQueue<Object> queue = BatchQueueManager.create( + "exporter", + BatchQueueConfig.<Object>builder() + .threads(ThreadPolicy.fixed(3)) + .partitions(PartitionPolicy.adaptive()) + .bufferSize(setting.getBufferSize()) + .strategy(BufferStrategy.IF_POSSIBLE) + .maxIdleMs(200) + .build() + ); + if (setting.isEnableGRPCMetrics()) { - grpcMetricsExporter.start(); + grpcMetricsExporter.start(queue); } if (setting.isEnableKafkaTrace()) { - kafkaTraceExporter.start(); + kafkaTraceExporter.start(queue); } if (setting.isEnableKafkaLog()) { - kafkaLogExporter.start(); + kafkaLogExporter.start(queue); } } diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java index d63be42e87..cde8c737cf 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java @@ -28,8 +28,7 @@ public class ExporterSetting extends ModuleConfig { private boolean enableGRPCMetrics = false; private String gRPCTargetHost; private int gRPCTargetPort; - private int bufferChannelSize = 20000; - private int bufferChannelNum = 2; + private int bufferSize = 20000; //kafka private boolean enableKafkaTrace = false; diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java index d2b6bda6fd..2c24b649c4 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java @@ -48,14 +48,13 @@ import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq; import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp; import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting; import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue; import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient; -import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus; @Slf4j -public class GRPCMetricsExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<ExportData> { +public class GRPCMetricsExporter extends MetricFormatter implements MetricValuesExportService { /** * The period of subscription list fetching is hardcoded as 30s. */ @@ -63,7 +62,7 @@ public class GRPCMetricsExporter extends MetricFormatter implements MetricValues private final ExporterSetting setting; private MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub; private MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub; - private DataCarrier exportBuffer; + private BatchQueue<Object> queue; private ReentrantLock fetchListLock; private volatile List<SubscriptionMetric> subscriptionList; private volatile long lastFetchTimestamp = 0; @@ -72,15 +71,15 @@ public class GRPCMetricsExporter extends MetricFormatter implements MetricValues this.setting = setting; } - @Override - public void start() { + @SuppressWarnings("unchecked") + public void start(BatchQueue<Object> queue) { GRPCClient client = new GRPCClient(setting.getGRPCTargetHost(), setting.getGRPCTargetPort()); client.connect(); ManagedChannel channel = client.getChannel(); exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel); blockingStub = MetricExportServiceGrpc.newBlockingStub(channel); - exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize()); - exportBuffer.consume(this, 1, 200); + this.queue = queue; + queue.addHandler((Class<? extends Object>) ExportData.class, this::consumeExportData); subscriptionList = new ArrayList<>(); fetchListLock = new ReentrantLock(); } @@ -91,12 +90,12 @@ public class GRPCMetricsExporter extends MetricFormatter implements MetricValues if (metrics instanceof WithMetadata) { MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta(); if (subscriptionList.size() == 0 && ExportEvent.EventType.INCREMENT.equals(event.getType())) { - exportBuffer.produce(new ExportData(meta, metrics, event.getType())); + queue.produce(new ExportData(meta, metrics, event.getType())); } else { subscriptionList.forEach(subscriptionMetric -> { if (subscriptionMetric.getMetricName().equals(meta.getMetricsName()) && eventTypeMatch(event.getType(), subscriptionMetric.getEventType())) { - exportBuffer.produce(new ExportData(meta, metrics, event.getType())); + queue.produce(new ExportData(meta, metrics, event.getType())); } }); } @@ -133,8 +132,7 @@ public class GRPCMetricsExporter extends MetricFormatter implements MetricValues } } - @Override - public void consume(List<ExportData> data) { + void consumeExportData(List<Object> data) { if (CollectionUtils.isNotEmpty(data)) { GRPCStreamStatus status = new GRPCStreamStatus(); StreamObserver<ExportMetricValue> streamObserver = @@ -164,7 +162,8 @@ public class GRPCMetricsExporter extends MetricFormatter implements MetricValues }); AtomicInteger exportNum = new AtomicInteger(); - data.forEach(row -> { + data.forEach(item -> { + ExportData row = (ExportData) item; ExportMetricValue.Builder builder = ExportMetricValue.newBuilder(); Metrics metrics = row.getMetrics(); @@ -247,11 +246,6 @@ public class GRPCMetricsExporter extends MetricFormatter implements MetricValues fetchSubscriptionList(); } - @Override - public void onError(List<ExportData> data, Throwable t) { - log.error(t.getMessage(), t); - } - private boolean eventTypeMatch(ExportEvent.EventType eventType, org.apache.skywalking.oap.server.exporter.grpc.EventType subscriptionType) { return (ExportEvent.EventType.INCREMENT.equals(eventType) && EventType.INCREMENT.equals(subscriptionType)) diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java index bc54fd7360..64dae615b0 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java @@ -37,9 +37,7 @@ import org.apache.skywalking.oap.server.core.exporter.LogExportService; import org.apache.skywalking.oap.server.core.query.type.ContentType; import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting; import org.apache.skywalking.oap.server.exporter.provider.kafka.KafkaExportProducer; -import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; -import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; @@ -48,8 +46,8 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; @Slf4j -public class KafkaLogExporter extends KafkaExportProducer implements LogExportService, IConsumer<LogRecord> { - private DataCarrier<LogRecord> exportBuffer; +public class KafkaLogExporter extends KafkaExportProducer implements LogExportService { + private BatchQueue<Object> queue; private CounterMetrics successCounter; private CounterMetrics errorCounter; private final ModuleManager moduleManager; @@ -59,14 +57,11 @@ public class KafkaLogExporter extends KafkaExportProducer implements LogExportSe this.moduleManager = manager; } - @Override - public void start() { + @SuppressWarnings("unchecked") + public void start(BatchQueue<Object> queue) { super.getProducer(); - exportBuffer = new DataCarrier<>( - "KafkaLogExporter", "KafkaLogExporter", setting.getBufferChannelNum(), setting.getBufferChannelSize(), - BufferStrategy.IF_POSSIBLE - ); - exportBuffer.consume(this, 1, 200); + this.queue = queue; + queue.addHandler((Class<? extends Object>) LogRecord.class, this::consumeLogRecords); MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME) .provider() .getService(MetricsCreator.class); @@ -84,7 +79,7 @@ public class KafkaLogExporter extends KafkaExportProducer implements LogExportSe @Override public void export(final LogRecord logRecord) { if (logRecord != null) { - exportBuffer.produce(logRecord); + queue.produce(logRecord); } } @@ -93,9 +88,9 @@ public class KafkaLogExporter extends KafkaExportProducer implements LogExportSe return setting.isEnableKafkaLog(); } - @Override - public void consume(final List<LogRecord> data) { - for (LogRecord logRecord : data) { + private void consumeLogRecords(final List<Object> data) { + for (Object item : data) { + LogRecord logRecord = (LogRecord) item; if (logRecord != null) { try { LogData logData = transLogData(logRecord); @@ -120,11 +115,6 @@ public class KafkaLogExporter extends KafkaExportProducer implements LogExportSe } } - @Override - public void onError(final List<LogRecord> data, final Throwable t) { - - } - private LogData transLogData(LogRecord logRecord) throws InvalidProtocolBufferException { LogData.Builder builder = LogData.newBuilder(); LogDataBody.Builder bodyBuilder = LogDataBody.newBuilder(); diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java index 0c3d90f482..8f131a4c0f 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java @@ -30,9 +30,7 @@ import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentReco import org.apache.skywalking.oap.server.core.exporter.TraceExportService; import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting; import org.apache.skywalking.oap.server.exporter.provider.kafka.KafkaExportProducer; -import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; -import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; @@ -40,8 +38,8 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; @Slf4j -public class KafkaTraceExporter extends KafkaExportProducer implements TraceExportService, IConsumer<SegmentRecord> { - private DataCarrier<SegmentRecord> exportBuffer; +public class KafkaTraceExporter extends KafkaExportProducer implements TraceExportService { + private BatchQueue<Object> queue; private CounterMetrics successCounter; private CounterMetrics errorCounter; private final ModuleManager moduleManager; @@ -51,14 +49,11 @@ public class KafkaTraceExporter extends KafkaExportProducer implements TraceExpo this.moduleManager = manager; } - @Override - public void start() { + @SuppressWarnings("unchecked") + public void start(BatchQueue<Object> queue) { super.getProducer(); - exportBuffer = new DataCarrier<>( - "KafkaTraceExporter", "KafkaTraceExporter", setting.getBufferChannelNum(), setting.getBufferChannelSize(), - BufferStrategy.IF_POSSIBLE - ); - exportBuffer.consume(this, 1, 200); + this.queue = queue; + queue.addHandler((Class<? extends Object>) SegmentRecord.class, this::consumeSegmentRecords); MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME) .provider() .getService(MetricsCreator.class); @@ -75,9 +70,8 @@ public class KafkaTraceExporter extends KafkaExportProducer implements TraceExpo public void export(SegmentRecord segmentRecord) { if (segmentRecord != null) { - exportBuffer.produce(segmentRecord); + queue.produce(segmentRecord); } - } @Override @@ -85,9 +79,9 @@ public class KafkaTraceExporter extends KafkaExportProducer implements TraceExpo return setting.isEnableKafkaTrace(); } - @Override - public void consume(final List<SegmentRecord> data) { - for (SegmentRecord segmentRecord : data) { + private void consumeSegmentRecords(final List<Object> data) { + for (Object item : data) { + SegmentRecord segmentRecord = (SegmentRecord) item; if (segmentRecord != null) { try { SegmentObject segmentObject = SegmentObject.parseFrom(segmentRecord.getDataBinary()); @@ -124,9 +118,4 @@ public class KafkaTraceExporter extends KafkaExportProducer implements TraceExpo } return false; } - - @Override - public void onError(final List<SegmentRecord> data, final Throwable t) { - - } } diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java index 6378093c78..d2e473004c 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java @@ -62,8 +62,7 @@ public class GRPCExporterProviderTest { assertNotNull(config); assertNull(config.getGRPCTargetHost()); assertEquals(0, config.getGRPCTargetPort()); - assertEquals(20000, config.getBufferChannelSize()); - assertEquals(2, config.getBufferChannelNum()); + assertEquals(20000, config.getBufferSize()); //for test config.setGRPCTargetHost("localhost"); diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java index 23346d88d6..a64acd2d9c 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java @@ -23,6 +23,10 @@ import io.grpc.Server; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.util.MutableHandlerRegistry; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; @@ -35,6 +39,12 @@ import org.apache.skywalking.oap.server.exporter.grpc.KeyValue; import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc; import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric; import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueConfig; +import org.apache.skywalking.oap.server.library.batchqueue.BatchQueueManager; +import org.apache.skywalking.oap.server.library.batchqueue.BufferStrategy; +import org.apache.skywalking.oap.server.library.batchqueue.PartitionPolicy; +import org.apache.skywalking.oap.server.library.batchqueue.ThreadPolicy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -42,10 +52,6 @@ import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; import org.mockito.Mockito; import org.powermock.reflect.Whitebox; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.UUID; import static org.apache.skywalking.oap.server.core.exporter.ExportEvent.EventType.INCREMENT; import static org.mockito.Mockito.when; @@ -64,6 +70,7 @@ public class GRPCExporterTest { private ManagedChannel channel; private MutableHandlerRegistry serviceRegistry; private MockedStatic<DefaultScopeDefine> defineMockedStatic; + private String queueName; @BeforeEach public void setUp() throws Exception { @@ -83,7 +90,18 @@ public class GRPCExporterTest { setting.setGRPCTargetHost("localhost"); setting.setGRPCTargetPort(9870); exporter = new GRPCMetricsExporter(setting); - exporter.start(); + queueName = "exporter-test-" + UUID.randomUUID(); + BatchQueue<Object> queue = BatchQueueManager.create( + queueName, + BatchQueueConfig.<Object>builder() + .threads(ThreadPolicy.fixed(1)) + .partitions(PartitionPolicy.fixed(1)) + .bufferSize(20000) + .strategy(BufferStrategy.IF_POSSIBLE) + .maxIdleMs(200) + .build() + ); + exporter.start(queue); serviceRegistry.addService(service); blockingStub = MetricExportServiceGrpc.newBlockingStub(channel); futureStub = MetricExportServiceGrpc.newStub(channel); @@ -95,6 +113,7 @@ public class GRPCExporterTest { @AfterEach public void after() { + BatchQueueManager.shutdown(queueName); channel.shutdown(); server.shutdown(); @@ -143,15 +162,10 @@ public class GRPCExporterTest { Assertions.assertEquals("labeled-mock-metrics", subscriptionList.get(3).getMetricName()); } - @Test - public void init() { - exporter.init(null); - } - @Test public void consume() { - exporter.consume(dataList()); - exporter.consume(Collections.emptyList()); + exporter.consumeExportData(dataList()); + exporter.consumeExportData(Collections.emptyList()); List<ExportMetricValue> exportMetricValues = ((MockMetricExportServiceImpl) service).exportMetricValues; Assertions.assertEquals(3, exportMetricValues.size()); Assertions.assertEquals(12, exportMetricValues.get(0).getMetricValues(0).getLongValue()); @@ -160,20 +174,8 @@ public class GRPCExporterTest { Assertions.assertEquals(KeyValue.newBuilder().setKey("labelName").setValue("labelValue").build(), exportMetricValues.get(2).getMetricValues(0).getLabels(0)); } - @Test - public void onError() { - Exception e = new IllegalArgumentException("something wrong"); - exporter.onError(Collections.emptyList(), e); - exporter.onError(dataList(), e); - } - - @Test - public void onExit() { - exporter.onExit(); - } - - private List<ExportData> dataList() { - List<ExportData> dataList = new LinkedList<>(); + private List<Object> dataList() { + List<Object> dataList = new LinkedList<>(); dataList.add(new ExportData(new MetricsMetaInfo( "mock-metrics", DefaultScopeDefine.SERVICE, IDManager.ServiceID.buildId("mock-service", true)), new MockMetrics(), INCREMENT)); dataList.add(new ExportData(new MetricsMetaInfo( diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java index a322dae22a..edf36dc0f6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java @@ -22,8 +22,6 @@ import org.apache.skywalking.oap.server.library.module.Service; public interface ExporterService<T> extends Service { - void start(); - void export(T data); boolean isEnabled();
