This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch new-exporter
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 9d1f4b62024cdf17ccddb88a8567bae1e0524272
Author: Wu Sheng <[email protected]>
AuthorDate: Tue Mar 23 14:44:33 2021 +0800

    Enhance the export service.
---
 docs/en/setup/backend/metrics-exporter.md          |  18 +++-
 .../exporter/provider/grpc/GRPCExporter.java       | 117 ++++++++++++---------
 .../provider/grpc/GRPCExporterProvider.java        |   2 +-
 .../exporter/src/main/proto/metric-exporter.proto  |  15 ++-
 .../provider/grpc/ExporterMockReceiver.java        |  16 ++-
 .../provider/grpc/GRPCExporterProviderTest.java    |   2 +-
 .../exporter/provider/grpc/GRPCExporterTest.java   |  18 ++--
 .../provider/grpc/MockMetricExportServiceImpl.java |  14 ++-
 .../oap/server/core/exporter/ExportData.java       |  12 +--
 9 files changed, 140 insertions(+), 74 deletions(-)

diff --git a/docs/en/setup/backend/metrics-exporter.md 
b/docs/en/setup/backend/metrics-exporter.md
index b190517..6bfdb20 100644
--- a/docs/en/setup/backend/metrics-exporter.md
+++ b/docs/en/setup/backend/metrics-exporter.md
@@ -31,7 +31,12 @@ message ExportMetricValue {
 }
 
 message SubscriptionsResp {
-    repeated string metricNames = 1;
+    repeated SubscriptionMetric metrics = 1;
+}
+
+message SubscriptionMetric {
+    string metricName = 1;
+    EventType eventType = 2;
 }
 
 enum ValueType {
@@ -40,6 +45,13 @@ enum ValueType {
     MULTI_LONG = 2;
 }
 
+enum EventType {
+    // The metrics aggregated in this bulk, not include the existing 
persistent data.
+    INCREMENT = 0;
+    // Final result of the metrics at this moment.
+    TOTAL = 1;
+}
+
 message SubscriptionReq {
 
 }
@@ -61,8 +73,8 @@ exporter:
 
 ## For target exporter service 
 ### subscription implementation
-Return the expected metrics name list, all the names must match the OAL script 
definition. Return empty list, if you want
-to export all metrics.
+Return the expected metrics name list with event type(increment or total), all 
the names must match the OAL script definition. 
+Return empty list, if you want to export all metrics in increment event type.
 
 ### export implementation
 Stream service, all subscribed metrics will be sent to here, based on OAP core 
schedule. Also, if the OAP deployed as cluster, 
diff --git 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
index e14b38e..84878b0 100644
--- 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++ 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
@@ -20,11 +20,11 @@ package 
org.apache.skywalking.oap.server.exporter.provider.grpc;
 
 import io.grpc.ManagedChannel;
 import io.grpc.stub.StreamObserver;
-import java.util.HashSet;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import 
org.apache.skywalking.oap.server.core.analysis.metrics.DoubleValueHolder;
@@ -37,26 +37,26 @@ import 
org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
 import org.apache.skywalking.oap.server.core.exporter.ExportData;
 import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
 import 
org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
+import org.apache.skywalking.oap.server.exporter.grpc.EventType;
 import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
 import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse;
 import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
+import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
 import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
 import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
 import org.apache.skywalking.oap.server.exporter.grpc.ValueType;
 import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
 import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
 import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public class GRPCExporter extends MetricFormatter implements 
MetricValuesExportService, IConsumer<ExportData> {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GRPCExporter.class);
-
     private GRPCExporterSetting setting;
     private final MetricExportServiceGrpc.MetricExportServiceStub 
exportServiceFutureStub;
     private final MetricExportServiceGrpc.MetricExportServiceBlockingStub 
blockingStub;
     private final DataCarrier exportBuffer;
-    private final Set<String> subscriptionSet;
+    private final List<SubscriptionMetric> subscriptionList;
+    private volatile long lastFetchTimestamp = 0;
 
     public GRPCExporter(GRPCExporterSetting setting) {
         this.setting = setting;
@@ -67,27 +67,42 @@ public class GRPCExporter extends MetricFormatter 
implements MetricValuesExportS
         blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
         exportBuffer = new 
DataCarrier<ExportData>(setting.getBufferChannelNum(), 
setting.getBufferChannelSize());
         exportBuffer.consume(this, 1, 200);
-        subscriptionSet = new HashSet<>();
+        subscriptionList = new ArrayList<>();
     }
 
     @Override
     public void export(ExportEvent event) {
-        if (ExportEvent.EventType.TOTAL == event.getType()) {
-            Metrics metrics = event.getMetrics();
-            if (metrics instanceof WithMetadata) {
-                MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta();
-                if (subscriptionSet.size() == 0 || 
subscriptionSet.contains(meta.getMetricsName())) {
-                    exportBuffer.produce(new ExportData(meta, metrics));
-                }
+        Metrics metrics = event.getMetrics();
+        if (metrics instanceof WithMetadata) {
+            MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta();
+            if (subscriptionList.size() == 0 && 
ExportEvent.EventType.INCREMENT.equals(event.getType())) {
+                exportBuffer.produce(new ExportData(meta, metrics, 
event.getType()));
+            } else {
+                subscriptionList.forEach(subscriptionMetric -> {
+                    if 
(subscriptionMetric.getMetricName().equals(meta.getMetricsName()) &&
+                        eventTypeMatch(event.getType(), 
subscriptionMetric.getEventType())) {
+                        exportBuffer.produce(new ExportData(meta, metrics, 
event.getType()));
+                    }
+                });
             }
+
+            fetchSubscriptionList();
         }
     }
 
-    public void initSubscriptionList() {
-        SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, 
TimeUnit.SECONDS)
-                                                     
.subscription(SubscriptionReq.newBuilder().build());
-        subscription.getMetricNamesList().forEach(subscriptionSet::add);
-        LOGGER.debug("Get exporter subscription list, {}", subscriptionSet);
+    /**
+     * Read the subscription list.
+     */
+    public void fetchSubscriptionList() {
+        final long currentTimeMillis = System.currentTimeMillis();
+        if (currentTimeMillis - lastFetchTimestamp > 30_000) {
+            lastFetchTimestamp = currentTimeMillis;
+            SubscriptionsResp subscription = 
blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+                                                         
.subscription(SubscriptionReq.newBuilder().build());
+            subscriptionList.clear();
+            subscriptionList.addAll(subscription.getMetricsList());
+            log.debug("Get exporter subscription list, {}", subscriptionList);
+        }
     }
 
     @Override
@@ -97,32 +112,28 @@ public class GRPCExporter extends MetricFormatter 
implements MetricValuesExportS
 
     @Override
     public void consume(List<ExportData> data) {
-        if (data.size() == 0) {
-            return;
-        }
-
         GRPCStreamStatus status = new GRPCStreamStatus();
-        StreamObserver<ExportMetricValue> streamObserver = 
exportServiceFutureStub.withDeadlineAfter(
-            10, TimeUnit.SECONDS)
-                                                                               
   .export(
-                                                                               
       new StreamObserver<ExportResponse>() {
-                                                                               
           @Override
-                                                                               
           public void onNext(
-                                                                               
               ExportResponse response) {
-
-                                                                               
           }
-
-                                                                               
           @Override
-                                                                               
           public void onError(
-                                                                               
               Throwable throwable) {
-                                                                               
               status.done();
-                                                                               
           }
-
-                                                                               
           @Override
-                                                                               
           public void onCompleted() {
-                                                                               
               status.done();
-                                                                               
           }
-                                                                               
       });
+        StreamObserver<ExportMetricValue> streamObserver =
+            exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+                                   .export(
+                                       new StreamObserver<ExportResponse>() {
+                                           @Override
+                                           public void onNext(
+                                               ExportResponse response) {
+
+                                           }
+
+                                           @Override
+                                           public void onError(
+                                               Throwable throwable) {
+                                               status.done();
+                                           }
+
+                                           @Override
+                                           public void onCompleted() {
+                                               status.done();
+                                           }
+                                       });
         AtomicInteger exportNum = new AtomicInteger();
         data.forEach(row -> {
             ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();
@@ -152,6 +163,8 @@ public class GRPCExporter extends MetricFormatter 
implements MetricValuesExportS
 
             MetricsMetaInfo meta = row.getMeta();
             builder.setMetricName(meta.getMetricsName());
+            builder.setEventType(
+                EventType.INCREMENT.equals(row.getEventType()) ? 
EventType.INCREMENT : EventType.TOTAL);
             String entityName = getEntityName(meta);
             if (entityName == null) {
                 return;
@@ -179,7 +192,7 @@ public class GRPCExporter extends MetricFormatter 
implements MetricValuesExportS
             }
 
             if (sleepTime > 2000L) {
-                LOGGER.warn(
+                log.warn(
                     "Export {} metrics to {}:{}, wait {} milliseconds.", 
exportNum.get(), setting.getTargetHost(),
                     setting
                         .getTargetPort(), sleepTime
@@ -188,18 +201,26 @@ public class GRPCExporter extends MetricFormatter 
implements MetricValuesExportS
             }
         }
 
-        LOGGER.debug(
+        log.debug(
             "Exported {} metrics to {}:{} in {} milliseconds.", 
exportNum.get(), setting.getTargetHost(), setting
                 .getTargetPort(), sleepTime);
+
+        fetchSubscriptionList();
     }
 
     @Override
     public void onError(List<ExportData> data, Throwable t) {
-        LOGGER.error(t.getMessage(), t);
+        log.error(t.getMessage(), t);
     }
 
     @Override
     public void onExit() {
 
     }
+
+    private boolean eventTypeMatch(ExportEvent.EventType eventType,
+                                   
org.apache.skywalking.oap.server.exporter.grpc.EventType subscriptionType) {
+        return (ExportEvent.EventType.INCREMENT.equals(eventType) && 
EventType.INCREMENT.equals(subscriptionType))
+            || (ExportEvent.EventType.TOTAL.equals(eventType) && 
EventType.TOTAL.equals(subscriptionType));
+    }
 }
diff --git 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
index f2d75fb..c104759 100644
--- 
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
+++ 
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
@@ -60,7 +60,7 @@ public class GRPCExporterProvider extends ModuleProvider {
 
     @Override
     public void notifyAfterCompleted() throws ServiceNotProvidedException, 
ModuleStartException {
-        exporter.initSubscriptionList();
+        exporter.fetchSubscriptionList();
     }
 
     @Override
diff --git a/oap-server/exporter/src/main/proto/metric-exporter.proto 
b/oap-server/exporter/src/main/proto/metric-exporter.proto
index 3ade1e3..4013bcc 100644
--- a/oap-server/exporter/src/main/proto/metric-exporter.proto
+++ b/oap-server/exporter/src/main/proto/metric-exporter.proto
@@ -39,10 +39,16 @@ message ExportMetricValue {
     int64 longValue = 6;
     double doubleValue = 7;
     repeated int64 longValues = 8;
+    EventType eventType = 9;
 }
 
 message SubscriptionsResp {
-    repeated string metricNames = 1;
+    repeated SubscriptionMetric metrics = 1;
+}
+
+message SubscriptionMetric {
+    string metricName = 1;
+    EventType eventType = 2;
 }
 
 enum ValueType {
@@ -51,6 +57,13 @@ enum ValueType {
     MULTI_LONG = 2;
 }
 
+enum EventType {
+    // The metrics aggregated in this bulk, not include the existing 
persistent data.
+    INCREMENT = 0;
+    // Final result of the metrics at this moment.
+    TOTAL = 1;
+}
+
 message SubscriptionReq {
 
 }
diff --git 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
index 06dce2a..0684e09 100644
--- 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
+++ 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
@@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver;
 import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
 import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse;
 import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
+import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
 import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
 import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
 import org.apache.skywalking.oap.server.library.server.ServerException;
@@ -63,9 +64,18 @@ public class ExporterMockReceiver {
         @Override
         public void subscription(SubscriptionReq request, 
StreamObserver<SubscriptionsResp> responseObserver) {
             responseObserver.onNext(SubscriptionsResp.newBuilder()
-                                                     .addMetricNames("all_p99")
-                                                     
.addMetricNames("service_cpm")
-                                                     
.addMetricNames("endpoint_sla")
+                                                     .addMetrics(
+                                                         SubscriptionMetric
+                                                             .newBuilder()
+                                                             
.setMetricName("all_p99"))
+                                                     .addMetrics(
+                                                         SubscriptionMetric
+                                                             .newBuilder()
+                                                             
.setMetricName("service_cpm"))
+                                                     .addMetrics(
+                                                         SubscriptionMetric
+                                                             .newBuilder()
+                                                             
.setMetricName("endpoint_sla"))
                                                      .build());
             responseObserver.onCompleted();
         }
diff --git 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
index a1895d3..658a8ca 100644
--- 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
+++ 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
@@ -92,7 +92,7 @@ public class GRPCExporterProviderTest {
         when(manager.find(CoreModule.NAME)).thenReturn(providerHolder);
         when(providerHolder.provider()).thenReturn(serviceHolder);
 
-        doNothing().when(exporter).initSubscriptionList();
+        doNothing().when(exporter).fetchSubscriptionList();
 
         grpcExporterProvider.setManager(manager);
         Whitebox.setInternalState(grpcExporterProvider, "exporter", exporter);
diff --git 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
index 467d1e9..de42ce7 100644
--- 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
+++ 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
@@ -33,6 +33,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import static 
org.apache.skywalking.oap.server.core.exporter.ExportEvent.EventType.INCREMENT;
+
 public class GRPCExporterTest {
 
     private GRPCExporter exporter;
@@ -40,7 +42,7 @@ public class GRPCExporterTest {
     @Rule
     public final GrpcServerRule grpcServerRule = new 
GrpcServerRule().directExecutor();
 
-    private MetricExportServiceGrpc.MetricExportServiceImplBase server = new 
MockMetricExportServiceImpl();
+    private MetricExportServiceGrpc.MetricExportServiceImplBase service = new 
MockMetricExportServiceImpl();
     private MetricsMetaInfo metaInfo = new MetricsMetaInfo("mock-metrics", 
DefaultScopeDefine.ALL);
 
     private MetricExportServiceGrpc.MetricExportServiceBlockingStub stub;
@@ -51,8 +53,9 @@ public class GRPCExporterTest {
         setting.setTargetHost("localhost");
         setting.setTargetPort(9870);
         exporter = new GRPCExporter(setting);
-        grpcServerRule.getServiceRegistry().addService(server);
+        grpcServerRule.getServiceRegistry().addService(service);
         stub = 
MetricExportServiceGrpc.newBlockingStub(grpcServerRule.getChannel());
+        Whitebox.setInternalState(exporter, "blockingStub", stub);
     }
 
     @Test
@@ -70,8 +73,7 @@ public class GRPCExporterTest {
 
     @Test
     public void initSubscriptionList() {
-        Whitebox.setInternalState(exporter, "blockingStub", stub);
-        exporter.initSubscriptionList();
+        exporter.fetchSubscriptionList();
     }
 
     @Test
@@ -100,10 +102,10 @@ public class GRPCExporterTest {
 
     private List<ExportData> dataList() {
         List<ExportData> dataList = new LinkedList<>();
-        dataList.add(new ExportData(metaInfo, new MockMetrics()));
-        dataList.add(new ExportData(metaInfo, new MockIntValueMetrics()));
-        dataList.add(new ExportData(metaInfo, new MockLongValueMetrics()));
-        dataList.add(new ExportData(metaInfo, new MockDoubleValueMetrics()));
+        dataList.add(new ExportData(metaInfo, new MockMetrics(), INCREMENT));
+        dataList.add(new ExportData(metaInfo, new MockIntValueMetrics(), 
INCREMENT));
+        dataList.add(new ExportData(metaInfo, new MockLongValueMetrics(), 
INCREMENT));
+        dataList.add(new ExportData(metaInfo, new MockDoubleValueMetrics(), 
INCREMENT));
         return dataList;
     }
 }
\ No newline at end of file
diff --git 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java
 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java
index 6cad2cf..72b3ec4 100644
--- 
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java
+++ 
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java
@@ -19,7 +19,9 @@
 package org.apache.skywalking.oap.server.exporter.provider.grpc;
 
 import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.oap.server.exporter.grpc.EventType;
 import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
+import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
 import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
 import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
 
@@ -27,8 +29,16 @@ public class MockMetricExportServiceImpl extends 
MetricExportServiceGrpc.MetricE
     @Override
     public void subscription(SubscriptionReq request, 
StreamObserver<SubscriptionsResp> responseObserver) {
         SubscriptionsResp resp = SubscriptionsResp.newBuilder()
-                                                  .addMetricNames("first")
-                                                  .addMetricNames("second")
+                                                  .addMetrics(
+                                                      SubscriptionMetric
+                                                          .newBuilder()
+                                                          
.setMetricName("first")
+                                                          
.setEventType(EventType.INCREMENT))
+                                                  .addMetrics(
+                                                      SubscriptionMetric
+                                                          .newBuilder()
+                                                          
.setMetricName("second")
+                                                          
.setEventType(EventType.INCREMENT))
                                                   .build();
         responseObserver.onNext(resp);
         responseObserver.onCompleted();
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java
index a638f97..faefa56 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java
@@ -19,16 +19,14 @@
 package org.apache.skywalking.oap.server.core.exporter;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
 
 @Getter
+@RequiredArgsConstructor
 public class ExportData {
-    private MetricsMetaInfo meta;
-    private Metrics metrics;
-
-    public ExportData(MetricsMetaInfo meta, Metrics metrics) {
-        this.meta = meta;
-        this.metrics = metrics;
-    }
+    private final MetricsMetaInfo meta;
+    private final Metrics metrics;
+    private final ExportEvent.EventType eventType;
 }
\ No newline at end of file

Reply via email to