This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new f3b5671 Enhance the export service. (#6602)
f3b5671 is described below
commit f3b567160ce61675cb692c3417101162d67093de
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Wed Mar 24 09:22:03 2021 +0800
Enhance the export service. (#6602)
---
CHANGES.md | 1 +
docs/en/setup/backend/metrics-exporter.md | 18 ++-
.../exporter/provider/grpc/GRPCExporter.java | 134 +++++++++++++--------
.../provider/grpc/GRPCExporterProvider.java | 2 +-
.../exporter/src/main/proto/metric-exporter.proto | 15 ++-
.../provider/grpc/ExporterMockReceiver.java | 16 ++-
.../provider/grpc/GRPCExporterProviderTest.java | 2 +-
.../exporter/provider/grpc/GRPCExporterTest.java | 18 +--
.../provider/grpc/MockMetricExportServiceImpl.java | 14 ++-
.../oap/server/core/exporter/ExportData.java | 12 +-
10 files changed, 157 insertions(+), 75 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1cf5619..35a7d9b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -57,6 +57,7 @@ Release Notes.
* Support metrics grouped by scope labelValue in MAL, no need global same
labelValue as before.
* Add functions in MAL to filter metrics according to the metric value.
* Optimize the self monitoring grafana dashboard.
+* Enhance the export service.
#### UI
* Update selector scroller to show in all pages.
diff --git a/docs/en/setup/backend/metrics-exporter.md
b/docs/en/setup/backend/metrics-exporter.md
index b190517..04ca492 100644
--- a/docs/en/setup/backend/metrics-exporter.md
+++ b/docs/en/setup/backend/metrics-exporter.md
@@ -31,7 +31,12 @@ message ExportMetricValue {
}
message SubscriptionsResp {
- repeated string metricNames = 1;
+ repeated SubscriptionMetric metrics = 1;
+}
+
+message SubscriptionMetric {
+ string metricName = 1;
+ EventType eventType = 2;
}
enum ValueType {
@@ -40,6 +45,13 @@ enum ValueType {
MULTI_LONG = 2;
}
+enum EventType {
+ // The metrics aggregated in this bulk, not include the existing
persistent data.
+ INCREMENT = 0;
+ // Final result of the metrics at this moment.
+ TOTAL = 1;
+}
+
message SubscriptionReq {
}
@@ -61,8 +73,8 @@ exporter:
## For target exporter service
### subscription implementation
-Return the expected metrics name list, all the names must match the OAL script
definition. Return empty list, if you want
-to export all metrics.
+Return the expected metrics name list with event type(increment or total), all
the names must match the OAL/MAL script definition.
+Return empty list, if you want to export all metrics in increment event type.
### export implementation
Stream service, all subscribed metrics will be sent to here, based on OAP core
schedule. Also, if the OAP deployed as cluster,
diff --git
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
index e14b38e..8758584 100644
---
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
@@ -20,11 +20,12 @@ package
org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
-import java.util.HashSet;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import
org.apache.skywalking.oap.server.core.analysis.metrics.DoubleValueHolder;
@@ -37,26 +38,31 @@ import
org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.exporter.ExportData;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import
org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
+import org.apache.skywalking.oap.server.exporter.grpc.EventType;
import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
+import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
import org.apache.skywalking.oap.server.exporter.grpc.ValueType;
import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public class GRPCExporter extends MetricFormatter implements
MetricValuesExportService, IConsumer<ExportData> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(GRPCExporter.class);
-
- private GRPCExporterSetting setting;
+ /**
+ * The period of subscription list fetching is hardcoded as 30s.
+ */
+ private static final long FETCH_SUBSCRIPTION_PERIOD = 30_000;
+ private final GRPCExporterSetting setting;
private final MetricExportServiceGrpc.MetricExportServiceStub
exportServiceFutureStub;
private final MetricExportServiceGrpc.MetricExportServiceBlockingStub
blockingStub;
private final DataCarrier exportBuffer;
- private final Set<String> subscriptionSet;
+ private final ReentrantLock fetchListLock;
+ private volatile List<SubscriptionMetric> subscriptionList;
+ private volatile long lastFetchTimestamp = 0;
public GRPCExporter(GRPCExporterSetting setting) {
this.setting = setting;
@@ -67,27 +73,51 @@ public class GRPCExporter extends MetricFormatter
implements MetricValuesExportS
blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
exportBuffer = new
DataCarrier<ExportData>(setting.getBufferChannelNum(),
setting.getBufferChannelSize());
exportBuffer.consume(this, 1, 200);
- subscriptionSet = new HashSet<>();
+ subscriptionList = new ArrayList<>();
+ fetchListLock = new ReentrantLock();
}
@Override
public void export(ExportEvent event) {
- if (ExportEvent.EventType.TOTAL == event.getType()) {
- Metrics metrics = event.getMetrics();
- if (metrics instanceof WithMetadata) {
- MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta();
- if (subscriptionSet.size() == 0 ||
subscriptionSet.contains(meta.getMetricsName())) {
- exportBuffer.produce(new ExportData(meta, metrics));
- }
+ Metrics metrics = event.getMetrics();
+ if (metrics instanceof WithMetadata) {
+ MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta();
+ if (subscriptionList.size() == 0 &&
ExportEvent.EventType.INCREMENT.equals(event.getType())) {
+ exportBuffer.produce(new ExportData(meta, metrics,
event.getType()));
+ } else {
+ subscriptionList.forEach(subscriptionMetric -> {
+ if
(subscriptionMetric.getMetricName().equals(meta.getMetricsName()) &&
+ eventTypeMatch(event.getType(),
subscriptionMetric.getEventType())) {
+ exportBuffer.produce(new ExportData(meta, metrics,
event.getType()));
+ }
+ });
}
+
+ fetchSubscriptionList();
}
}
- public void initSubscriptionList() {
- SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10,
TimeUnit.SECONDS)
-
.subscription(SubscriptionReq.newBuilder().build());
- subscription.getMetricNamesList().forEach(subscriptionSet::add);
- LOGGER.debug("Get exporter subscription list, {}", subscriptionSet);
+ /**
+ * Read the subscription list.
+ */
+ public void fetchSubscriptionList() {
+ final long currentTimeMillis = System.currentTimeMillis();
+ if (currentTimeMillis - lastFetchTimestamp >
FETCH_SUBSCRIPTION_PERIOD) {
+ try {
+ fetchListLock.lock();
+ if (currentTimeMillis - lastFetchTimestamp >
FETCH_SUBSCRIPTION_PERIOD) {
+ lastFetchTimestamp = currentTimeMillis;
+ SubscriptionsResp subscription =
blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+
.subscription(SubscriptionReq.newBuilder().build());
+ subscriptionList = subscription.getMetricsList();
+ log.debug("Get exporter subscription list, {}",
subscriptionList);
+ }
+ } catch (Throwable e) {
+ log.error("Getting exporter subscription list fails.", e);
+ } finally {
+ fetchListLock.unlock();
+ }
+ }
}
@Override
@@ -97,32 +127,28 @@ public class GRPCExporter extends MetricFormatter
implements MetricValuesExportS
@Override
public void consume(List<ExportData> data) {
- if (data.size() == 0) {
- return;
- }
-
GRPCStreamStatus status = new GRPCStreamStatus();
- StreamObserver<ExportMetricValue> streamObserver =
exportServiceFutureStub.withDeadlineAfter(
- 10, TimeUnit.SECONDS)
-
.export(
-
new StreamObserver<ExportResponse>() {
-
@Override
-
public void onNext(
-
ExportResponse response) {
-
-
}
-
-
@Override
-
public void onError(
-
Throwable throwable) {
-
status.done();
-
}
-
-
@Override
-
public void onCompleted() {
-
status.done();
-
}
-
});
+ StreamObserver<ExportMetricValue> streamObserver =
+ exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+ .export(
+ new StreamObserver<ExportResponse>() {
+ @Override
+ public void onNext(
+ ExportResponse response) {
+
+ }
+
+ @Override
+ public void onError(
+ Throwable throwable) {
+ status.done();
+ }
+
+ @Override
+ public void onCompleted() {
+ status.done();
+ }
+ });
AtomicInteger exportNum = new AtomicInteger();
data.forEach(row -> {
ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();
@@ -152,6 +178,8 @@ public class GRPCExporter extends MetricFormatter
implements MetricValuesExportS
MetricsMetaInfo meta = row.getMeta();
builder.setMetricName(meta.getMetricsName());
+ builder.setEventType(
+ EventType.INCREMENT.equals(row.getEventType()) ?
EventType.INCREMENT : EventType.TOTAL);
String entityName = getEntityName(meta);
if (entityName == null) {
return;
@@ -179,7 +207,7 @@ public class GRPCExporter extends MetricFormatter
implements MetricValuesExportS
}
if (sleepTime > 2000L) {
- LOGGER.warn(
+ log.warn(
"Export {} metrics to {}:{}, wait {} milliseconds.",
exportNum.get(), setting.getTargetHost(),
setting
.getTargetPort(), sleepTime
@@ -188,18 +216,26 @@ public class GRPCExporter extends MetricFormatter
implements MetricValuesExportS
}
}
- LOGGER.debug(
+ log.debug(
"Exported {} metrics to {}:{} in {} milliseconds.",
exportNum.get(), setting.getTargetHost(), setting
.getTargetPort(), sleepTime);
+
+ fetchSubscriptionList();
}
@Override
public void onError(List<ExportData> data, Throwable t) {
- LOGGER.error(t.getMessage(), t);
+ log.error(t.getMessage(), t);
}
@Override
public void onExit() {
}
+
+ private boolean eventTypeMatch(ExportEvent.EventType eventType,
+
org.apache.skywalking.oap.server.exporter.grpc.EventType subscriptionType) {
+ return (ExportEvent.EventType.INCREMENT.equals(eventType) &&
EventType.INCREMENT.equals(subscriptionType))
+ || (ExportEvent.EventType.TOTAL.equals(eventType) &&
EventType.TOTAL.equals(subscriptionType));
+ }
}
diff --git
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
index f2d75fb..c104759 100644
---
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
+++
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
@@ -60,7 +60,7 @@ public class GRPCExporterProvider extends ModuleProvider {
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException,
ModuleStartException {
- exporter.initSubscriptionList();
+ exporter.fetchSubscriptionList();
}
@Override
diff --git a/oap-server/exporter/src/main/proto/metric-exporter.proto
b/oap-server/exporter/src/main/proto/metric-exporter.proto
index 3ade1e3..4013bcc 100644
--- a/oap-server/exporter/src/main/proto/metric-exporter.proto
+++ b/oap-server/exporter/src/main/proto/metric-exporter.proto
@@ -39,10 +39,16 @@ message ExportMetricValue {
int64 longValue = 6;
double doubleValue = 7;
repeated int64 longValues = 8;
+ EventType eventType = 9;
}
message SubscriptionsResp {
- repeated string metricNames = 1;
+ repeated SubscriptionMetric metrics = 1;
+}
+
+message SubscriptionMetric {
+ string metricName = 1;
+ EventType eventType = 2;
}
enum ValueType {
@@ -51,6 +57,13 @@ enum ValueType {
MULTI_LONG = 2;
}
+enum EventType {
+ // The metrics aggregated in this bulk, not include the existing
persistent data.
+ INCREMENT = 0;
+ // Final result of the metrics at this moment.
+ TOTAL = 1;
+}
+
message SubscriptionReq {
}
diff --git
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
index 06dce2a..0684e09 100644
---
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
+++
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
@@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver;
import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
+import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
import org.apache.skywalking.oap.server.library.server.ServerException;
@@ -63,9 +64,18 @@ public class ExporterMockReceiver {
@Override
public void subscription(SubscriptionReq request,
StreamObserver<SubscriptionsResp> responseObserver) {
responseObserver.onNext(SubscriptionsResp.newBuilder()
- .addMetricNames("all_p99")
-
.addMetricNames("service_cpm")
-
.addMetricNames("endpoint_sla")
+ .addMetrics(
+ SubscriptionMetric
+ .newBuilder()
+
.setMetricName("all_p99"))
+ .addMetrics(
+ SubscriptionMetric
+ .newBuilder()
+
.setMetricName("service_cpm"))
+ .addMetrics(
+ SubscriptionMetric
+ .newBuilder()
+
.setMetricName("endpoint_sla"))
.build());
responseObserver.onCompleted();
}
diff --git
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
index a1895d3..658a8ca 100644
---
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
+++
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
@@ -92,7 +92,7 @@ public class GRPCExporterProviderTest {
when(manager.find(CoreModule.NAME)).thenReturn(providerHolder);
when(providerHolder.provider()).thenReturn(serviceHolder);
- doNothing().when(exporter).initSubscriptionList();
+ doNothing().when(exporter).fetchSubscriptionList();
grpcExporterProvider.setManager(manager);
Whitebox.setInternalState(grpcExporterProvider, "exporter", exporter);
diff --git
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
index 467d1e9..de42ce7 100644
---
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
+++
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
@@ -33,6 +33,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
+import static
org.apache.skywalking.oap.server.core.exporter.ExportEvent.EventType.INCREMENT;
+
public class GRPCExporterTest {
private GRPCExporter exporter;
@@ -40,7 +42,7 @@ public class GRPCExporterTest {
@Rule
public final GrpcServerRule grpcServerRule = new
GrpcServerRule().directExecutor();
- private MetricExportServiceGrpc.MetricExportServiceImplBase server = new
MockMetricExportServiceImpl();
+ private MetricExportServiceGrpc.MetricExportServiceImplBase service = new
MockMetricExportServiceImpl();
private MetricsMetaInfo metaInfo = new MetricsMetaInfo("mock-metrics",
DefaultScopeDefine.ALL);
private MetricExportServiceGrpc.MetricExportServiceBlockingStub stub;
@@ -51,8 +53,9 @@ public class GRPCExporterTest {
setting.setTargetHost("localhost");
setting.setTargetPort(9870);
exporter = new GRPCExporter(setting);
- grpcServerRule.getServiceRegistry().addService(server);
+ grpcServerRule.getServiceRegistry().addService(service);
stub =
MetricExportServiceGrpc.newBlockingStub(grpcServerRule.getChannel());
+ Whitebox.setInternalState(exporter, "blockingStub", stub);
}
@Test
@@ -70,8 +73,7 @@ public class GRPCExporterTest {
@Test
public void initSubscriptionList() {
- Whitebox.setInternalState(exporter, "blockingStub", stub);
- exporter.initSubscriptionList();
+ exporter.fetchSubscriptionList();
}
@Test
@@ -100,10 +102,10 @@ public class GRPCExporterTest {
private List<ExportData> dataList() {
List<ExportData> dataList = new LinkedList<>();
- dataList.add(new ExportData(metaInfo, new MockMetrics()));
- dataList.add(new ExportData(metaInfo, new MockIntValueMetrics()));
- dataList.add(new ExportData(metaInfo, new MockLongValueMetrics()));
- dataList.add(new ExportData(metaInfo, new MockDoubleValueMetrics()));
+ dataList.add(new ExportData(metaInfo, new MockMetrics(), INCREMENT));
+ dataList.add(new ExportData(metaInfo, new MockIntValueMetrics(),
INCREMENT));
+ dataList.add(new ExportData(metaInfo, new MockLongValueMetrics(),
INCREMENT));
+ dataList.add(new ExportData(metaInfo, new MockDoubleValueMetrics(),
INCREMENT));
return dataList;
}
}
\ No newline at end of file
diff --git
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java
index 6cad2cf..72b3ec4 100644
---
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java
+++
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/MockMetricExportServiceImpl.java
@@ -19,7 +19,9 @@
package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.oap.server.exporter.grpc.EventType;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
+import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
@@ -27,8 +29,16 @@ public class MockMetricExportServiceImpl extends
MetricExportServiceGrpc.MetricE
@Override
public void subscription(SubscriptionReq request,
StreamObserver<SubscriptionsResp> responseObserver) {
SubscriptionsResp resp = SubscriptionsResp.newBuilder()
- .addMetricNames("first")
- .addMetricNames("second")
+ .addMetrics(
+ SubscriptionMetric
+ .newBuilder()
+
.setMetricName("first")
+
.setEventType(EventType.INCREMENT))
+ .addMetrics(
+ SubscriptionMetric
+ .newBuilder()
+
.setMetricName("second")
+
.setEventType(EventType.INCREMENT))
.build();
responseObserver.onNext(resp);
responseObserver.onCompleted();
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java
index a638f97..faefa56 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java
@@ -19,16 +19,14 @@
package org.apache.skywalking.oap.server.core.exporter;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
@Getter
+@RequiredArgsConstructor
public class ExportData {
- private MetricsMetaInfo meta;
- private Metrics metrics;
-
- public ExportData(MetricsMetaInfo meta, Metrics metrics) {
- this.meta = meta;
- this.metrics = metrics;
- }
+ private final MetricsMetaInfo meta;
+ private final Metrics metrics;
+ private final ExportEvent.EventType eventType;
}
\ No newline at end of file