This is an automated email from the ASF dual-hosted git repository.

tanjian pushed a commit to branch top_period
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit d24886b7867656c48c8fc17d4a88d196f3ecea9b
Author: Jared.Tan <jian....@daocloud.io>
AuthorDate: Tue Nov 19 16:35:49 2019 +0800

    make topN worker report period configurable.
---
 .../org/apache/skywalking/oap/server/core/CoreModuleConfig.java     | 1 +
 .../org/apache/skywalking/oap/server/core/CoreModuleProvider.java   | 2 ++
 .../oap/server/core/analysis/worker/TopNStreamProcessor.java        | 5 ++++-
 .../skywalking/oap/server/core/analysis/worker/TopNWorker.java      | 6 +++---
 oap-server/server-starter/src/main/assembly/application.yml         | 1 +
 oap-server/server-starter/src/main/resources/application.yml        | 1 +
 6 files changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index e77d59c..df6db62 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -38,6 +38,7 @@ public class CoreModuleConfig extends ModuleConfig {
     @Setter private int maxConcurrentCallsPerConnection;
     @Setter private int maxMessageSize;
     @Setter private boolean enableDatabaseSession;
+    @Setter private int topNReportPeriod;
     private final List<String> downsampling;
     /**
      * The period of doing data persistence.
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9f58df6..b2ae971 100755
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import 
org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
 import org.apache.skywalking.oap.server.core.cache.*;
 import org.apache.skywalking.oap.server.core.cluster.*;
@@ -170,6 +171,7 @@ public class CoreModuleProvider extends ModuleProvider {
         this.registerServiceImplementation(RemoteClientManager.class, 
remoteClientManager);
 
         
MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
+        
TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
     }
 
     @Override public void start() throws ModuleStartException {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
index fcb3a68..7265d73 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
 import lombok.Getter;
+import lombok.Setter;
 import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
@@ -41,6 +42,8 @@ public class TopNStreamProcessor implements 
StreamProcessor<TopN> {
 
     @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();
     private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();
+    @Setter @Getter private int topNWorkerReportCycle = 10;
+    @Setter @Getter private int topSize = 50;
 
     public static TopNStreamProcessor getInstance() {
         return PROCESSOR;
@@ -63,7 +66,7 @@ public class TopNStreamProcessor implements 
StreamProcessor<TopN> {
         IModelSetter modelSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
         Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new 
Storage(stream.name(), true, true, Downsampling.Second), true);
 
-        TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, 
model, 50, recordDAO);
+        TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, 
model, topSize, topNWorkerReportCycle * 60 * 1000L, recordDAO);
         persistentWorkers.add(persistentWorker);
         workers.put(topNClass, persistentWorker);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 8ccde84..b5afabf 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -46,7 +46,7 @@ public class TopNWorker extends PersistenceWorker<TopN, 
LimitedSizeDataCache<Top
     private volatile long lastReportTimestamp;
 
     TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model,
-        int topNSize, IRecordDAO recordDAO) {
+        int topNSize, long reportCycle, IRecordDAO recordDAO) {
         super(moduleDefineHolder);
         this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
         this.recordDAO = recordDAO;
@@ -54,8 +54,8 @@ public class TopNWorker extends PersistenceWorker<TopN, 
LimitedSizeDataCache<Top
         this.dataCarrier = new DataCarrier<>("TopNWorker", 1, 1000);
         this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
         this.lastReportTimestamp = System.currentTimeMillis();
-        // Top N persistent only works per 10 minutes.
-        this.reportCycle = 10 * 60 * 1000L;
+        // Top N persistent works per 10 minutes default.
+        this.reportCycle = reportCycle;
     }
 
     @Override public void cacheData(TopN data) {
diff --git a/oap-server/server-starter/src/main/assembly/application.yml 
b/oap-server/server-starter/src/main/assembly/application.yml
index 83a9926..3bc8ace 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -72,6 +72,7 @@ core:
     # Cache metric data for 1 minute to reduce database queries, and if the 
OAP cluster changes within that minute,
     # the metrics may not be accurate within that minute.
     enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
+    topNReportPeriod: ${SW_CORE_TOPN_REPORT_PERIOD:10} # top_n record worker 
report cycle, unit is minute
 storage:
 #  elasticsearch:
 #    nameSpace: ${SW_NAMESPACE:""}
diff --git a/oap-server/server-starter/src/main/resources/application.yml 
b/oap-server/server-starter/src/main/resources/application.yml
index e23b686..2e7c46e 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -71,6 +71,7 @@ core:
     # Cache metric data for 1 minute to reduce database queries, and if the 
OAP cluster changes within that minute,
     # the metrics may not be accurate within that minute.
     enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
+    topNReportPeriod: ${SW_CORE_TOPN_REPORT_PERIOD:10} # top_n record worker 
report cycle, unit is minute
 storage:
   elasticsearch:
     nameSpace: ${SW_NAMESPACE:""}

Reply via email to