This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch exporter in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 4c6677369255a33795fc612cac6d120bfc902e64 Author: Wu Sheng <[email protected]> AuthorDate: Sat Jun 8 22:23:03 2019 +0800 Make exporter interface better. --- .../exporter/provider/grpc/GRPCExporter.java | 14 +++++++--- .../exporter/provider/grpc/GRPCExporterTest.java | 12 ++++++-- .../server/core/analysis/worker/ExportWorker.java | 14 ++++------ .../analysis/worker/MetricsPersistentWorker.java | 13 +++++++-- ...icValuesExportService.java => ExportEvent.java} | 32 ++++++++++++++++++---- .../core/exporter/MetricValuesExportService.java | 8 ++++-- 6 files changed, 69 insertions(+), 24 deletions(-) diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java index 4fdd05c..895dfd4 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java @@ -26,7 +26,7 @@ import lombok.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.analysis.metrics.*; -import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService; +import org.apache.skywalking.oap.server.core.exporter.*; import org.apache.skywalking.oap.server.exporter.grpc.*; import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter; import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient; @@ -56,9 +56,15 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS subscriptionSet = new HashSet<>(); } - @Override public void export(MetricsMetaInfo meta, Metrics metrics) { - if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) { - exportBuffer.produce(new ExportData(meta, metrics)); + @Override public void export(ExportEvent event) { + if (ExportEvent.EventType.TOTAL.equals(event.getType())) { + Metrics metrics = event.getMetrics(); + if (metrics instanceof WithMetadata) { + MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta(); + if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) { + exportBuffer.produce(new ExportData(meta, metrics)); + } + } } } diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java index 0b347d4..5c0d50d 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java @@ -19,7 +19,8 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc; import io.grpc.testing.GrpcServerRule; -import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; +import org.apache.skywalking.oap.server.core.analysis.metrics.*; +import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc; import org.junit.Before; @@ -58,7 +59,14 @@ public class GRPCExporterTest { @Test public void export() { - exporter.export(metaInfo, new MockMetrics()); + ExportEvent event = new ExportEvent(new MockExporterMetrics(), ExportEvent.EventType.TOTAL); + exporter.export(event); + } + + public static class MockExporterMetrics extends MockMetrics implements WithMetadata { + @Override public MetricsMetaInfo getMeta() { + return new MetricsMetaInfo("mock-metrics", DefaultScopeDefine.ALL); + } } @Test diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java index 6f1edc0..09fa143 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import org.apache.skywalking.oap.server.core.analysis.metrics.*; import org.apache.skywalking.oap.server.core.exporter.*; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -26,21 +25,20 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** * @author wusheng */ -public class ExportWorker extends AbstractWorker<Metrics> { +public class ExportWorker extends AbstractWorker<ExportEvent> { private MetricValuesExportService exportService; public ExportWorker(ModuleDefineHolder moduleDefineHolder) { super(moduleDefineHolder); } - @Override public void in(Metrics metrics) { + @Override public void in(ExportEvent event) { if (exportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) { - if (metrics instanceof WithMetadata) { - if (exportService == null) { - exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class); - } - exportService.export(((WithMetadata)metrics).getMeta(), metrics); + if (exportService == null) { + exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class); } + exportService.export(event); } } + } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 513f3ae..b4b4fd1 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -24,6 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.*; import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; @@ -43,12 +44,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat private final MergeDataCache<Metrics> mergeDataCache; private final IMetricsDAO metricsDAO; private final AbstractWorker<Metrics> nextAlarmWorker; - private final AbstractWorker<Metrics> nextExportWorker; + private final AbstractWorker<ExportEvent> nextExportWorker; private final DataCarrier<Metrics> dataCarrier; MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, int batchSize, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker, - AbstractWorker<Metrics> nextExportWorker) { + AbstractWorker<ExportEvent> nextExportWorker) { super(moduleDefineHolder, batchSize); this.model = model; this.mergeDataCache = new MergeDataCache<>(); @@ -100,6 +101,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat @Override public List<Object> prepareBatch(MergeDataCache<Metrics> cache) { List<Object> batchCollection = new LinkedList<>(); cache.getLast().collection().forEach(data -> { + if (Objects.nonNull(nextExportWorker)) { + ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT); + nextExportWorker.in(event); + } + Metrics dbData = null; try { dbData = metricsDAO.get(model, data); @@ -120,7 +126,8 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat nextAlarmWorker.in(data); } if (Objects.nonNull(nextExportWorker)) { - nextExportWorker.in(data); + ExportEvent event = new ExportEvent(data, ExportEvent.EventType.TOTAL); + nextExportWorker.in(event); } } catch (Throwable t) { logger.error(t.getMessage(), t); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportEvent.java similarity index 53% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportEvent.java index 8e66f88..cce4e0c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportEvent.java @@ -18,14 +18,36 @@ package org.apache.skywalking.oap.server.core.exporter; -import org.apache.skywalking.oap.server.core.analysis.metrics.*; -import org.apache.skywalking.oap.server.library.module.Service; +import lombok.Getter; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; /** - * Export the metrics value from metrics through this service, if provider exists. + * The event for exporter {@link MetricValuesExportService} implementation processes. + * {@link #metrics} should not be changed in any case. * * @author wusheng */ -public interface MetricValuesExportService extends Service { - void export(MetricsMetaInfo meta, Metrics metrics); +@Getter +public class ExportEvent { + /** + * Fields of this should not be changed in any case. + */ + private Metrics metrics; + private EventType type; + + public ExportEvent(Metrics metrics, EventType type) { + this.metrics = metrics; + this.type = type; + } + + public enum EventType { + /** + * The metrics aggregated in this bulk, not include the existing persistent data. + */ + INCREMENT, + /** + * Final result of the metrics at this moment. + */ + TOTAL + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java index 8e66f88..18779f3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.core.exporter; -import org.apache.skywalking.oap.server.core.analysis.metrics.*; import org.apache.skywalking.oap.server.library.module.Service; /** @@ -27,5 +26,10 @@ import org.apache.skywalking.oap.server.library.module.Service; * @author wusheng */ public interface MetricValuesExportService extends Service { - void export(MetricsMetaInfo meta, Metrics metrics); + /** + * This method is sync-mode export, the performance effects the persistence result. Queue mode is high recommended. + * + * @param event value is only accurate when the method invokes. Don't cache it. + */ + void export(ExportEvent event); }
