This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 2197820 Support exporter runs in increment and total modes (#2840)
2197820 is described below
commit 21978205778c10234281a4f2a963380cb0773ce1
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Sun Jun 9 11:32:27 2019 +0800
Support exporter runs in increment and total modes (#2840)
* Make exporter interface better.
* Fix review.
---
.../exporter/provider/grpc/GRPCExporter.java | 14 +++++++---
.../exporter/provider/grpc/GRPCExporterTest.java | 12 ++++++--
.../server/core/analysis/worker/ExportWorker.java | 14 ++++------
.../analysis/worker/MetricsPersistentWorker.java | 13 +++++++--
...icValuesExportService.java => ExportEvent.java} | 32 ++++++++++++++++++----
.../core/exporter/MetricValuesExportService.java | 8 ++++--
6 files changed, 69 insertions(+), 24 deletions(-)
diff --git
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
index 4fdd05c..2e28322 100644
---
a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++
b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
@@ -26,7 +26,7 @@ import lombok.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.metrics.*;
-import
org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
+import org.apache.skywalking.oap.server.core.exporter.*;
import org.apache.skywalking.oap.server.exporter.grpc.*;
import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
@@ -56,9 +56,15 @@ public class GRPCExporter extends MetricFormatter implements
MetricValuesExportS
subscriptionSet = new HashSet<>();
}
- @Override public void export(MetricsMetaInfo meta, Metrics metrics) {
- if (subscriptionSet.size() == 0 ||
subscriptionSet.contains(meta.getMetricsName())) {
- exportBuffer.produce(new ExportData(meta, metrics));
+ @Override public void export(ExportEvent event) {
+ if (ExportEvent.EventType.TOTAL == event.getType()) {
+ Metrics metrics = event.getMetrics();
+ if (metrics instanceof WithMetadata) {
+ MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta();
+ if (subscriptionSet.size() == 0 ||
subscriptionSet.contains(meta.getMetricsName())) {
+ exportBuffer.produce(new ExportData(meta, metrics));
+ }
+ }
}
}
diff --git
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
index 0b347d4..5c0d50d 100644
---
a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
+++
b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
@@ -19,7 +19,8 @@
package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.testing.GrpcServerRule;
-import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
+import org.apache.skywalking.oap.server.core.analysis.metrics.*;
+import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
import org.junit.Before;
@@ -58,7 +59,14 @@ public class GRPCExporterTest {
@Test
public void export() {
- exporter.export(metaInfo, new MockMetrics());
+ ExportEvent event = new ExportEvent(new MockExporterMetrics(),
ExportEvent.EventType.TOTAL);
+ exporter.export(event);
+ }
+
+ public static class MockExporterMetrics extends MockMetrics implements
WithMetadata {
+ @Override public MetricsMetaInfo getMeta() {
+ return new MetricsMetaInfo("mock-metrics", DefaultScopeDefine.ALL);
+ }
}
@Test
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
index 6f1edc0..09fa143 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.core.exporter.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -26,21 +25,20 @@ import
org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author wusheng
*/
-public class ExportWorker extends AbstractWorker<Metrics> {
+public class ExportWorker extends AbstractWorker<ExportEvent> {
private MetricValuesExportService exportService;
public ExportWorker(ModuleDefineHolder moduleDefineHolder) {
super(moduleDefineHolder);
}
- @Override public void in(Metrics metrics) {
+ @Override public void in(ExportEvent event) {
if (exportService != null ||
getModuleDefineHolder().has(ExporterModule.NAME)) {
- if (metrics instanceof WithMetadata) {
- if (exportService == null) {
- exportService =
getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
- }
- exportService.export(((WithMetadata)metrics).getMeta(),
metrics);
+ if (exportService == null) {
+ exportService =
getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
}
+ exportService.export(event);
}
}
+
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 513f3ae..b4b4fd1 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -24,6 +24,7 @@ import
org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -43,12 +44,12 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
private final MergeDataCache<Metrics> mergeDataCache;
private final IMetricsDAO metricsDAO;
private final AbstractWorker<Metrics> nextAlarmWorker;
- private final AbstractWorker<Metrics> nextExportWorker;
+ private final AbstractWorker<ExportEvent> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model
model, int batchSize,
IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
- AbstractWorker<Metrics> nextExportWorker) {
+ AbstractWorker<ExportEvent> nextExportWorker) {
super(moduleDefineHolder, batchSize);
this.model = model;
this.mergeDataCache = new MergeDataCache<>();
@@ -100,6 +101,11 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
@Override public List<Object> prepareBatch(MergeDataCache<Metrics> cache) {
List<Object> batchCollection = new LinkedList<>();
cache.getLast().collection().forEach(data -> {
+ if (Objects.nonNull(nextExportWorker)) {
+ ExportEvent event = new ExportEvent(data,
ExportEvent.EventType.INCREMENT);
+ nextExportWorker.in(event);
+ }
+
Metrics dbData = null;
try {
dbData = metricsDAO.get(model, data);
@@ -120,7 +126,8 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
nextAlarmWorker.in(data);
}
if (Objects.nonNull(nextExportWorker)) {
- nextExportWorker.in(data);
+ ExportEvent event = new ExportEvent(data,
ExportEvent.EventType.TOTAL);
+ nextExportWorker.in(event);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportEvent.java
similarity index 53%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportEvent.java
index 8e66f88..cce4e0c 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportEvent.java
@@ -18,14 +18,36 @@
package org.apache.skywalking.oap.server.core.exporter;
-import org.apache.skywalking.oap.server.core.analysis.metrics.*;
-import org.apache.skywalking.oap.server.library.module.Service;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
/**
- * Export the metrics value from metrics through this service, if provider
exists.
+ * The event for exporter {@link MetricValuesExportService} implementation
processes.
+ * {@link #metrics} should not be changed in any case.
*
* @author wusheng
*/
-public interface MetricValuesExportService extends Service {
- void export(MetricsMetaInfo meta, Metrics metrics);
+@Getter
+public class ExportEvent {
+ /**
+ * Fields of this should not be changed in any case.
+ */
+ private Metrics metrics;
+ private EventType type;
+
+ public ExportEvent(Metrics metrics, EventType type) {
+ this.metrics = metrics;
+ this.type = type;
+ }
+
+ public enum EventType {
+ /**
+ * The metrics aggregated in this bulk, not include the existing
persistent data.
+ */
+ INCREMENT,
+ /**
+ * Final result of the metrics at this moment.
+ */
+ TOTAL
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
index 8e66f88..f6f461e 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.core.exporter;
-import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.library.module.Service;
/**
@@ -27,5 +26,10 @@ import
org.apache.skywalking.oap.server.library.module.Service;
* @author wusheng
*/
public interface MetricValuesExportService extends Service {
- void export(MetricsMetaInfo meta, Metrics metrics);
+ /**
+ * This method is sync-mode export, the performance effects the
persistence result. Queue mode is highly recommended.
+ *
+ * @param event value is only accurate when the method invokes. Don't
cache it.
+ */
+ void export(ExportEvent event);
}