This is an automated email from the ASF dual-hosted git repository. tanjian pushed a commit to branch top_period in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit d24886b7867656c48c8fc17d4a88d196f3ecea9b Author: Jared.Tan <jian....@daocloud.io> AuthorDate: Tue Nov 19 16:35:49 2019 +0800 make topN worker report period configurable. --- .../org/apache/skywalking/oap/server/core/CoreModuleConfig.java | 1 + .../org/apache/skywalking/oap/server/core/CoreModuleProvider.java | 2 ++ .../oap/server/core/analysis/worker/TopNStreamProcessor.java | 5 ++++- .../skywalking/oap/server/core/analysis/worker/TopNWorker.java | 6 +++--- oap-server/server-starter/src/main/assembly/application.yml | 1 + oap-server/server-starter/src/main/resources/application.yml | 1 + 6 files changed, 12 insertions(+), 4 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index e77d59c..df6db62 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -38,6 +38,7 @@ public class CoreModuleConfig extends ModuleConfig { @Setter private int maxConcurrentCallsPerConnection; @Setter private int maxMessageSize; @Setter private boolean enableDatabaseSession; + @Setter private int topNReportPeriod; private final List<String> downsampling; /** * The period of doing data persistence. diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 9f58df6..b2ae971 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule; import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; +import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.cache.*; import org.apache.skywalking.oap.server.core.cluster.*; @@ -170,6 +171,7 @@ public class CoreModuleProvider extends ModuleProvider { this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager); MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession()); + TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod()); } @Override public void start() throws ModuleStartException { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java index fcb3a68..7265d73 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.*; import lombok.Getter; +import lombok.Setter; import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.record.Record; @@ -41,6 +42,8 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> { @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>(); private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>(); + @Setter @Getter private int topNWorkerReportCycle = 10; + @Setter @Getter private int topSize = 50; public static TopNStreamProcessor getInstance() { return PROCESSOR; @@ -63,7 +66,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> { IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true); - TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, 50, recordDAO); + TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, topSize, topNWorkerReportCycle * 60 * 1000L, recordDAO); persistentWorkers.add(persistentWorker); workers.put(topNClass, persistentWorker); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java index 8ccde84..b5afabf 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java @@ -46,7 +46,7 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top private volatile long lastReportTimestamp; TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model, - int topNSize, IRecordDAO recordDAO) { + int topNSize, long reportCycle, IRecordDAO recordDAO) { super(moduleDefineHolder); this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize); this.recordDAO = recordDAO; @@ -54,8 +54,8 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top this.dataCarrier = new DataCarrier<>("TopNWorker", 1, 1000); this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1); this.lastReportTimestamp = System.currentTimeMillis(); - // Top N persistent only works per 10 minutes. - this.reportCycle = 10 * 60 * 1000L; + // Top N persistent works per 10 minutes default. + this.reportCycle = reportCycle; } @Override public void cacheData(TopN data) { diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml index 83a9926..3bc8ace 100644 --- a/oap-server/server-starter/src/main/assembly/application.yml +++ b/oap-server/server-starter/src/main/assembly/application.yml @@ -72,6 +72,7 @@ core: # Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute, # the metrics may not be accurate within that minute. enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true} + topNReportPeriod: ${SW_CORE_TOPN_REPORT_PERIOD:10} # top_n record worker report cycle, unit is minute storage: # elasticsearch: # nameSpace: ${SW_NAMESPACE:""} diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index e23b686..2e7c46e 100755 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -71,6 +71,7 @@ core: # Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute, # the metrics may not be accurate within that minute. enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true} + topNReportPeriod: ${SW_CORE_TOPN_REPORT_PERIOD:10} # top_n record worker report cycle, unit is minute storage: elasticsearch: nameSpace: ${SW_NAMESPACE:""}