This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch new-exporter in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 9d1f4b62024cdf17ccddb88a8567bae1e0524272 Author: Wu Sheng <[email protected]> AuthorDate: Tue Mar 23 14:44:33 2021 +0800 Enhance the export service. --- docs/en/setup/backend/metrics-exporter.md | 18 +++- .../exporter/provider/grpc/GRPCExporter.java | 117 ++++++++++++--------- .../provider/grpc/GRPCExporterProvider.java | 2 +- .../exporter/src/main/proto/metric-exporter.proto | 15 ++- .../provider/grpc/ExporterMockReceiver.java | 16 ++- .../provider/grpc/GRPCExporterProviderTest.java | 2 +- .../exporter/provider/grpc/GRPCExporterTest.java | 18 ++-- .../provider/grpc/MockMetricExportServiceImpl.java | 14 ++- .../oap/server/core/exporter/ExportData.java | 12 +-- 9 files changed, 140 insertions(+), 74 deletions(-) diff --git a/docs/en/setup/backend/metrics-exporter.md b/docs/en/setup/backend/metrics-exporter.md index b190517..6bfdb20 100644 --- a/docs/en/setup/backend/metrics-exporter.md +++ b/docs/en/setup/backend/metrics-exporter.md @@ -31,7 +31,12 @@ message ExportMetricValue { } message SubscriptionsResp { - repeated string metricNames = 1; + repeated SubscriptionMetric metrics = 1; +} + +message SubscriptionMetric { + string metricName = 1; + EventType eventType = 2; } enum ValueType { @@ -40,6 +45,13 @@ enum ValueType { MULTI_LONG = 2; } +enum EventType { + // The metrics aggregated in this bulk, not include the existing persistent data. + INCREMENT = 0; + // Final result of the metrics at this moment. + TOTAL = 1; +} + message SubscriptionReq { } @@ -61,8 +73,8 @@ exporter: ## For target exporter service ### subscription implementation -Return the expected metrics name list, all the names must match the OAL script definition. Return empty list, if you want -to export all metrics. +Return the expected metrics name list with event type(increment or total), all the names must match the OAL script definition. +Return empty list, if you want to export all metrics in increment event type. ### export implementation Stream service, all subscribed metrics will be sent to here, based on OAP core schedule. Also, if the OAP deployed as cluster, diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java index e14b38e..84878b0 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java @@ -20,11 +20,11 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; -import java.util.HashSet; +import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.analysis.metrics.DoubleValueHolder; @@ -37,26 +37,26 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata; import org.apache.skywalking.oap.server.core.exporter.ExportData; import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService; +import org.apache.skywalking.oap.server.exporter.grpc.EventType; import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue; import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse; import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc; +import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric; import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq; import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp; import org.apache.skywalking.oap.server.exporter.grpc.ValueType; import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter; import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient; import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +@Slf4j public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<ExportData> { - private static final Logger LOGGER = LoggerFactory.getLogger(GRPCExporter.class); - private GRPCExporterSetting setting; private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub; private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub; private final DataCarrier exportBuffer; - private final Set<String> subscriptionSet; + private final List<SubscriptionMetric> subscriptionList; + private volatile long lastFetchTimestamp = 0; public GRPCExporter(GRPCExporterSetting setting) { this.setting = setting; @@ -67,27 +67,42 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS blockingStub = MetricExportServiceGrpc.newBlockingStub(channel); exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize()); exportBuffer.consume(this, 1, 200); - subscriptionSet = new HashSet<>(); + subscriptionList = new ArrayList<>(); } @Override public void export(ExportEvent event) { - if (ExportEvent.EventType.TOTAL == event.getType()) { - Metrics metrics = event.getMetrics(); - if (metrics instanceof WithMetadata) { - MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta(); - if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) { - exportBuffer.produce(new ExportData(meta, metrics)); - } + Metrics metrics = event.getMetrics(); + if (metrics instanceof WithMetadata) { + MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta(); + if (subscriptionList.size() == 0 && ExportEvent.EventType.INCREMENT.equals(event.getType())) { + exportBuffer.produce(new ExportData(meta, metrics, event.getType())); + } else { + subscriptionList.forEach(subscriptionMetric -> { + if (subscriptionMetric.getMetricName().equals(meta.getMetricsName()) && + eventTypeMatch(event.getType(), subscriptionMetric.getEventType())) { + exportBuffer.produce(new ExportData(meta, metrics, event.getType())); + } + }); } + + fetchSubscriptionList(); } } - public void initSubscriptionList() { - SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS) - .subscription(SubscriptionReq.newBuilder().build()); - subscription.getMetricNamesList().forEach(subscriptionSet::add); - LOGGER.debug("Get exporter subscription list, {}", subscriptionSet); + /** + * Read the subscription list. + */ + public void fetchSubscriptionList() { + final long currentTimeMillis = System.currentTimeMillis(); + if (currentTimeMillis - lastFetchTimestamp > 30_000) { + lastFetchTimestamp = currentTimeMillis; + SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS) + .subscription(SubscriptionReq.newBuilder().build()); + subscriptionList.clear(); + subscriptionList.addAll(subscription.getMetricsList()); + log.debug("Get exporter subscription list, {}", subscriptionList); + } } @Override @@ -97,32 +112,28 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS @Override public void consume(List<ExportData> data) { - if (data.size() == 0) { - return; - } - GRPCStreamStatus status = new GRPCStreamStatus(); - StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter( - 10, TimeUnit.SECONDS) - .export( - new StreamObserver<ExportResponse>() { - @Override - public void onNext( - ExportResponse response) { - - } - - @Override - public void onError( - Throwable throwable) { - status.done(); - } - - @Override - public void onCompleted() { - status.done(); - } - }); + StreamObserver<ExportMetricValue> streamObserver = + exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS) + .export( + new StreamObserver<ExportResponse>() { + @Override + public void onNext( + ExportResponse response) { + + } + + @Override + public void onError( + Throwable throwable) { + status.done(); + } + + @Override + public void onCompleted() { + status.done(); + } + }); AtomicInteger exportNum = new AtomicInteger(); data.forEach(row -> { ExportMetricValue.Builder builder = ExportMetricValue.newBuilder(); @@ -152,6 +163,8 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS MetricsMetaInfo meta = row.getMeta(); builder.setMetricName(meta.getMetricsName()); + builder.setEventType( + EventType.INCREMENT.equals(row.getEventType()) ? EventType.INCREMENT : EventType.TOTAL); String entityName = getEntityName(meta); if (entityName == null) { return; @@ -179,7 +192,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS } if (sleepTime > 2000L) { - LOGGER.warn( + log.warn( "Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting .getTargetPort(), sleepTime @@ -188,18 +201,26 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS } } - LOGGER.debug( + log.debug( "Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting .getTargetPort(), sleepTime); + + fetchSubscriptionList(); } @Override public void onError(List<ExportData> data, Throwable t) { - LOGGER.error(t.getMessage(), t); + log.error(t.getMessage(), t); } @Override public void onExit() { } + + private boolean eventTypeMatch(ExportEvent.EventType eventType, + org.apache.skywalking.oap.server.exporter.grpc.EventType subscriptionType) { + return (ExportEvent.EventType.INCREMENT.equals(eventType) && EventType.INCREMENT.equals(subscriptionType)) + || (ExportEvent.EventType.TOTAL.equals(eventType) && EventType.TOTAL.equals(subscriptionType)); + } } diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java index f2d75fb..c104759 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java @@ -60,7 +60,7 @@ public class GRPCExporterProvider extends ModuleProvider { @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { - exporter.initSubscriptionList(); + exporter.fetchSubscriptionList(); } @Override diff --git a/oap-server/exporter/src/main/proto/metric-exporter.proto b/oap-server/exporter/src/main/proto/metric-exporter.proto index 3ade1e3..4013bcc 100644 --- a/oap-server/exporter/src/main/proto/metric-exporter.proto +++ b/oap-server/exporter/src/main/proto/metric-exporter.proto @@ -39,10 +39,16 @@ message ExportMetricValue { int64 longValue = 6; double doubleValue = 7; repeated int64 longValues = 8; + EventType eventType = 9; } message SubscriptionsResp { - repeated string metricNames = 1; + repeated SubscriptionMetric metrics = 1; +} + +message SubscriptionMetric { + string metricName = 1; + EventType eventType = 2; } enum ValueType { @@ -51,6 +57,13 @@ enum ValueType { MULTI_LONG = 2; } +enum EventType { + // The metrics aggregated in this bulk, not include the existing persistent data. + INCREMENT = 0; + // Final result of the metrics at this moment. + TOTAL = 1; +} + message SubscriptionReq { } diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java index 06dce2a..0684e09 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java @@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver; import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue; import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse; import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc; +import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric; import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq; import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp; import org.apache.skywalking.oap.server.library.server.ServerException; @@ -63,9 +64,18 @@ public class ExporterMockReceiver { @Override public void subscription(SubscriptionReq request, StreamObserver<SubscriptionsResp> responseObserver) { responseObserver.onNext(SubscriptionsResp.newBuilder() - .addMetricNames("all_p99") - .addMetricNames("service_cpm") - .addMetricNames("endpoint_sla") + .addMetrics( + SubscriptionMetric + .newBuilder() + .setMetricName("all_p99")) + .addMetrics( + SubscriptionMetric + .newBuilder() + .setMetricName("service_cpm")) + .addMetrics( + SubscriptionMetric + .newBuilder() + .setMetricName("endpoint_sla")) .build()); responseObserver.onCompleted(); } diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java index a1895d3..658a8ca 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java @@ -92,7 +92,7 @@ public class GRPCExporterProviderTest { when(manager.find(CoreModule.NAME)).thenReturn(providerHolder); when(providerHolder.provider()).thenReturn(serviceHolder); - doNothing().when(exporter).initSubscriptionList(); + doNothing().when(exporter).fetchSubscriptionList(); grpcExporterProvider.setManager(manager); Whitebox.setInternalState(grpcExporterProvider, "exporter", exporter); diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java index 467d1e9..de42ce7 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java @@ -33,6 +33,8 @@ import org.junit.Rule; import org.junit.Test; import org.powermock.reflect.Whitebox; +import static org.apache.skywalking.oap.server.core.exporter.ExportEvent.EventType.INCREMENT; + public class GRPCExporterTest { private GRPCExporter exporter; @@ -40,7 +42,7 @@ public class GRPCExporterTest { @Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor(); - private MetricExportServiceGrpc.MetricExportServiceImplBase server = new MockMetricExportServiceImpl(); + private MetricExportServiceGrpc.MetricExportServiceImplBase service = new MockMetricExportServiceImpl(); private MetricsMetaInfo metaInfo = new MetricsMetaInfo("mock-metrics", DefaultScopeDefine.ALL); private MetricExportServiceGrpc.MetricExportServiceBlockingStub stub; @@ -51,8 +53,9 @@ public class GRPCExporterTest { setting.setTargetHost("localhost"); setting.setTargetPort(9870); exporter = new GRPCExporter(setting); - grpcServerRule.getServiceRegistry().addService(server); + grpcServerRule.getServiceRegistry().addService(service); stub = MetricExportServiceGrpc.newBlockingStub(grpcServerRule.getChannel()); + Whitebox.setInternalState(exporter, "blockingStub", stub); } @Test @@ -70,8 +73,7 @@ public class GRPCExporterTest { @Test public void initSubscriptionList() { - Whitebox.setInternalState(exporter, "blockingStub", stub); - exporter.initSubscriptionList(); + exporter.fetchSubscriptionList(); } @Test @@ -100,10 +102,10 @@ public class GRPCExporterTest { private List<ExportData> dataList() { List<ExportData> dataList = new LinkedList<>(); - dataList.add(new ExportData(metaInfo, new MockMetrics())); - dataList.add(new ExportData(metaInfo, new MockIntValueMetrics())); - dataList.add(new ExportData(metaInfo, new MockLongValueMetrics())); - dataList.add(new ExportData(metaInfo, new MockDoubleValueMetrics())); + dataList.add(new ExportData(metaInfo, new MockMetrics(), INCREMENT)); + dataList.add(new ExportData(metaInfo, new MockIntValueMetrics(), INCREMENT)); + dataList.add(new ExportData(metaInfo, new MockLongValueMetrics(), INCREMENT)); + dataList.add(new ExportData(metaInfo, new MockDoubleValueMetrics(), INCREMENT)); return dataList; } } \ No newline at end of file diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java index 6cad2cf..72b3ec4 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java @@ -19,7 +19,9 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc; import io.grpc.stub.StreamObserver; +import org.apache.skywalking.oap.server.exporter.grpc.EventType; import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc; +import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric; import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq; import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp; @@ -27,8 +29,16 @@ public class MockMetricExportServiceImpl extends MetricExportServiceGrpc.MetricE @Override public void subscription(SubscriptionReq request, StreamObserver<SubscriptionsResp> responseObserver) { SubscriptionsResp resp = SubscriptionsResp.newBuilder() - .addMetricNames("first") - .addMetricNames("second") + .addMetrics( + SubscriptionMetric + .newBuilder() + .setMetricName("first") + .setEventType(EventType.INCREMENT)) + .addMetrics( + SubscriptionMetric + .newBuilder() + .setMetricName("second") + .setEventType(EventType.INCREMENT)) .build(); responseObserver.onNext(resp); responseObserver.onCompleted(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java index a638f97..faefa56 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java @@ -19,16 +19,14 @@ package org.apache.skywalking.oap.server.core.exporter; import lombok.Getter; +import lombok.RequiredArgsConstructor; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; @Getter +@RequiredArgsConstructor public class ExportData { - private MetricsMetaInfo meta; - private Metrics metrics; - - public ExportData(MetricsMetaInfo meta, Metrics metrics) { - this.meta = meta; - this.metrics = metrics; - } + private final MetricsMetaInfo meta; + private final Metrics metrics; + private final ExportEvent.EventType eventType; } \ No newline at end of file
