This is an automated email from the ASF dual-hosted git repository.

pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 8605f4c  Change the metrics process flow. (#3157)
8605f4c is described below

commit 8605f4ccc05f13ef466515f2c551c59907b22fab
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Wed Jul 24 18:18:20 2019 +0800

    Change the metrics process flow. (#3157)
    
    * Feature of database session
    
    * Make it configurable.
    
    * Change the metrics process flow.
    before: metrics entrance -> aggregate worker -> remote worker -> trans 
worker -> minute, hour, day, month persistence worker -> storage
    after: metrics entrance -> aggregate worker -> remote worker -> minute 
persistence worker ->  trans worker -> hour, day, month persistence worker -> 
storage
---
 .../analysis/worker/MetricsPersistentWorker.java   |  7 ++++++-
 .../analysis/worker/MetricsStreamProcessor.java    | 14 +++++++-------
 .../core/analysis/worker/MetricsTransWorker.java   | 22 ++--------------------
 3 files changed, 15 insertions(+), 28 deletions(-)

diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 8c2865a..bbe5f7f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -47,10 +47,11 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
     private final AbstractWorker<Metrics> nextAlarmWorker;
     private final AbstractWorker<ExportEvent> nextExportWorker;
     private final DataCarrier<Metrics> dataCarrier;
+    private final MetricsTransWorker transWorker;
     private final boolean enableDatabaseSession;
 
     MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model 
model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
-        AbstractWorker<ExportEvent> nextExportWorker, boolean 
enableDatabaseSession) {
+        AbstractWorker<ExportEvent> nextExportWorker, MetricsTransWorker 
transWorker, boolean enableDatabaseSession) {
         super(moduleDefineHolder);
         this.model = model;
         this.databaseSession = new HashMap<>(100);
@@ -59,6 +60,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
         this.metricsDAO = metricsDAO;
         this.nextAlarmWorker = nextAlarmWorker;
         this.nextExportWorker = nextExportWorker;
+        this.transWorker = transWorker;
 
         String name = "METRICS_L2_AGGREGATION";
         int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
@@ -99,6 +101,9 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
                 ExportEvent event = new ExportEvent(data, 
ExportEvent.EventType.INCREMENT);
                 nextExportWorker.in(event);
             }
+            if (Objects.nonNull(transWorker)) {
+                transWorker.in(data);
+            }
 
             int mod = i % batchGetSize;
             if (mod == 0) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 4ee60ab..5da8a3e 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -86,14 +86,14 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
             monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, 
model);
         }
 
-        Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), 
new Storage(stream.name(), true, true, Downsampling.Minute), false);
-        MetricsPersistentWorker minutePersistentWorker = 
minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
+        MetricsTransWorker transWorker = new 
MetricsTransWorker(moduleDefineHolder, stream.name(), hourPersistentWorker, 
dayPersistentWorker, monthPersistentWorker);
 
-        MetricsTransWorker transWorker = new 
MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, 
hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
+        Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), 
new Storage(stream.name(), true, true, Downsampling.Minute), false);
+        MetricsPersistentWorker minutePersistentWorker = 
minutePersistentWorker(moduleDefineHolder, metricsDAO, model, transWorker);
 
         String remoteReceiverWorkerName = stream.name() + "_rec";
         IWorkerInstanceSetter workerInstanceSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
-        workerInstanceSetter.put(remoteReceiverWorkerName, transWorker, 
metricsClass);
+        workerInstanceSetter.put(remoteReceiverWorkerName, 
minutePersistentWorker, metricsClass);
 
         MetricsRemoteWorker remoteWorker = new 
MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
         MetricsAggregateWorker aggregateWorker = new 
MetricsAggregateWorker(moduleDefineHolder, remoteWorker, stream.name());
@@ -101,18 +101,18 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         entryWorkers.put(metricsClass, aggregateWorker);
     }
 
-    private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder 
moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
+    private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder 
moduleDefineHolder, IMetricsDAO metricsDAO, Model model, MetricsTransWorker 
transWorker) {
         AlarmNotifyWorker alarmNotifyWorker = new 
AlarmNotifyWorker(moduleDefineHolder);
         ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
 
-        MetricsPersistentWorker minutePersistentWorker = new 
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, 
alarmNotifyWorker, exportWorker, enableDatabaseSession);
+        MetricsPersistentWorker minutePersistentWorker = new 
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, 
alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession);
         persistentWorkers.add(minutePersistentWorker);
 
         return minutePersistentWorker;
     }
 
     private MetricsPersistentWorker worker(ModuleDefineHolder 
moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
-        MetricsPersistentWorker persistentWorker = new 
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null, 
enableDatabaseSession);
+        MetricsPersistentWorker persistentWorker = new 
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null, 
null, enableDatabaseSession);
         persistentWorkers.add(persistentWorker);
 
         return persistentWorker;
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
index cade9b9..01c602d 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
@@ -23,11 +23,8 @@ import 
org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.telemetry.api.*;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -36,30 +33,24 @@ public class MetricsTransWorker extends 
AbstractWorker<Metrics> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(MetricsTransWorker.class);
 
-    private final MetricsPersistentWorker minutePersistenceWorker;
     private final MetricsPersistentWorker hourPersistenceWorker;
     private final MetricsPersistentWorker dayPersistenceWorker;
     private final MetricsPersistentWorker monthPersistenceWorker;
 
-    private final CounterMetrics aggregationMinCounter;
     private final CounterMetrics aggregationHourCounter;
     private final CounterMetrics aggregationDayCounter;
     private final CounterMetrics aggregationMonthCounter;
 
     public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String 
modelName,
-        MetricsPersistentWorker minutePersistenceWorker,
         MetricsPersistentWorker hourPersistenceWorker,
         MetricsPersistentWorker dayPersistenceWorker,
         MetricsPersistentWorker monthPersistenceWorker) {
         super(moduleDefineHolder);
-        this.minutePersistenceWorker = minutePersistenceWorker;
         this.hourPersistenceWorker = hourPersistenceWorker;
         this.dayPersistenceWorker = dayPersistenceWorker;
         this.monthPersistenceWorker = monthPersistenceWorker;
 
         MetricsCreator metricsCreator = 
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
-        aggregationMinCounter = 
metricsCreator.createCounter("metrics_aggregation", "The number of rows in 
aggregation",
-            new MetricsTag.Keys("metricName", "level", "dimensionality"), new 
MetricsTag.Values(modelName, "2", "min"));
         aggregationHourCounter = 
metricsCreator.createCounter("metrics_aggregation", "The number of rows in 
aggregation",
             new MetricsTag.Keys("metricName", "level", "dimensionality"), new 
MetricsTag.Values(modelName, "2", "hour"));
         aggregationDayCounter = 
metricsCreator.createCounter("metrics_aggregation", "The number of rows in 
aggregation",
@@ -81,14 +72,5 @@ public class MetricsTransWorker extends 
AbstractWorker<Metrics> {
             aggregationHourCounter.inc();
             monthPersistenceWorker.in(metrics.toMonth());
         }
-
-        /*
-         * Minute persistent must be at the end of all time dimensionalities
-         * Because #toHour, #toDay, #toMonth include clone inside, which could 
avoid concurrency situation.
-         */
-        if (Objects.nonNull(minutePersistenceWorker)) {
-            aggregationMinCounter.inc();
-            minutePersistenceWorker.in(metrics);
-        }
     }
 }

Reply via email to