This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch l1-flush
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 72bfa0132185d8b787f71541cf1ee4c8b53bae03
Author: Wu Sheng <[email protected]>
AuthorDate: Wed Jun 30 11:56:29 2021 +0800

    Add the period of L1 aggregation flush to L2 aggregation
---
 CHANGES.md                                         |  3 +-
 docs/en/setup/backend/configuration-vocabulary.md  |  1 +
 .../src/main/resources/application.yml             |  2 ++
 .../oap/server/core/CoreModuleConfig.java          |  9 +++++-
 .../oap/server/core/CoreModuleProvider.java        |  3 +-
 .../analysis/worker/MetricsAggregateWorker.java    | 32 +++++++++++++++-------
 .../analysis/worker/MetricsStreamProcessor.java    | 15 ++++++++--
 7 files changed, 49 insertions(+), 16 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 8ae7936..eb37dc3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -56,7 +56,8 @@ Release Notes.
 * Performance: cache regex pattern and result, optimize string concatenation 
in Envy ALS analyzer.
 * Performance: cache metrics id and entity id in `Metrics` and `ISource`.
 * Performance: enhance persistent session mechanism, about differentiating 
cache timeout for different dimensionality
-  metrics. The timeout of the cache for minute and hour level metrics has been 
prolonged to ~5 min. 
+  metrics. The timeout of the cache for minute and hour level metrics has been 
prolonged to ~5 min.
+* Performance: Add L1 aggregation flush period, which reduce the CPU load and 
help young GC.
 
 #### UI
 
diff --git a/docs/en/setup/backend/configuration-vocabulary.md 
b/docs/en/setup/backend/configuration-vocabulary.md
index c435c8f..3fa9376 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -23,6 +23,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. 
**Receiver** mode
 | - | - | dataKeeperExecutePeriod|The execution period of TTL scheduler, unit 
is minute. Execution doesn't mean deleting data. The storage provider could 
override this, such as ElasticSearch 
storage.|SW_CORE_DATA_KEEPER_EXECUTE_PERIOD|5|
 | - | - | recordDataTTL|The lifecycle of record data. Record data includes 
traces, top n sampled records, and logs. Unit is day. Minimal value is 
2.|SW_CORE_RECORD_DATA_TTL|3|
 | - | - | metricsDataTTL|The lifecycle of metrics data, including the 
metadata. Unit is day. Recommend metricsDataTTL >= recordDataTTL. Minimal value 
is 2.| SW_CORE_METRICS_DATA_TTL|7|
+| - | - | l1FlushPeriod| The period of L1 aggregation flush to L2 aggregation. 
Unit is ms. | SW_CORE_L1_AGGREGATION_FLUSH_PERIOD | 500 |
 | - | - | enableDatabaseSession|Cache metrics data for 1 minute to reduce 
database queries, and if the OAP cluster changes within that 
minute.|SW_CORE_ENABLE_DATABASE_SESSION|true|
 | - | - | topNReportPeriod|The execution period of top N sampler, which saves 
sampled data into the storage. Unit is minute|SW_CORE_TOPN_REPORT_PERIOD|10|
 | - | - | activeExtraModelColumns|Append the names of entity, such as service 
name, into the metrics storage 
entities.|SW_CORE_ACTIVE_EXTRA_MODEL_COLUMNS|false|
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml 
b/oap-server/server-bootstrap/src/main/resources/application.yml
index e6db202..2f888fd 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -85,6 +85,8 @@ core:
     dataKeeperExecutePeriod: ${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5} # How 
often the data keeper executor runs periodically, unit is minute
     recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:3} # Unit is day
     metricsDataTTL: ${SW_CORE_METRICS_DATA_TTL:7} # Unit is day
+    # The period of L1 aggregation flush to L2 aggregation. Unit is ms.
+    l1FlushPeriod: ${SW_CORE_L1_AGGREGATION_FLUSH_PERIOD:500}
     # Cache metrics data for 1 minute to reduce database queries, and if the 
OAP cluster changes within that minute,
     # the metrics may not be accurate within that minute.
     enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 8ffcc56..af5f735 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -46,8 +46,15 @@ public class CoreModuleConfig extends ModuleConfig {
     private String gRPCSslTrustedCAPath;
     private int maxConcurrentCallsPerConnection;
     private int maxMessageSize;
-    private boolean enableDatabaseSession;
     private int topNReportPeriod;
+    /**
+     * The period of L1 aggregation flush. Unit is ms.
+     */
+    private long l1FlushPeriod = 500;
+    /**
+     * Enable database flush session.
+     */
+    private boolean enableDatabaseSession;
     private final List<String> downsampling;
     /**
      * The period of doing data persistence. Unit is second.
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9e99bd1..a8bbcbe 100755
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -46,9 +46,9 @@ import 
org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
 import 
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
 import org.apache.skywalking.oap.server.core.config.NamingControl;
-import 
org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRuleReader4Openapi;
 import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
 import 
org.apache.skywalking.oap.server.core.config.group.EndpointNameGroupingRuleWatcher;
+import 
org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRuleReader4Openapi;
 import 
org.apache.skywalking.oap.server.core.management.ui.template.UITemplateInitializer;
 import 
org.apache.skywalking.oap.server.core.management.ui.template.UITemplateManagementService;
 import org.apache.skywalking.oap.server.core.oal.rt.DisableOALDefine;
@@ -289,6 +289,7 @@ public class CoreModuleProvider extends ModuleProvider {
             UITemplateManagementService.class, new 
UITemplateManagementService(getManager()));
 
         
MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
+        
MetricsStreamProcessor.getInstance().setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
         
TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
         apdexThresholdConfig = new ApdexThresholdConfig(this);
         ApdexMetrics.setDICT(apdexThresholdConfig);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index b6fd9b3..684c5fc 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -42,13 +42,15 @@ import 
org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
  */
 @Slf4j
 public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
+    public final long l1FlushPeriod;
     private AbstractWorker<Metrics> nextWorker;
     private final DataCarrier<Metrics> dataCarrier;
     private final MergableBufferedData<Metrics> mergeDataCache;
     private CounterMetrics aggregationCounter;
+    private long lastSendTime = 0;
 
     MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, 
AbstractWorker<Metrics> nextWorker,
-                           String modelName) {
+                           String modelName, long l1FlushPeriod) {
         super(moduleDefineHolder);
         this.nextWorker = nextWorker;
         this.mergeDataCache = new MergableBufferedData();
@@ -69,8 +71,10 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
                                                           
.getService(MetricsCreator.class);
         aggregationCounter = metricsCreator.createCounter(
             "metrics_aggregation", "The number of rows in aggregation",
-            new MetricsTag.Keys("metricName", "level", "dimensionality"), new 
MetricsTag.Values(modelName, "1", "minute")
+            new MetricsTag.Keys("metricName", "level", "dimensionality"),
+            new MetricsTag.Values(modelName, "1", "minute")
         );
+        this.l1FlushPeriod = l1FlushPeriod;
     }
 
     /**
@@ -93,14 +97,22 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
             mergeDataCache.accept(metrics);
         });
 
-        mergeDataCache.read().forEach(
-            data -> {
-                if (log.isDebugEnabled()) {
-                    log.debug(data.toString());
+        flush();
+    }
+
+    private void flush() {
+        long currentTime = System.currentTimeMillis();
+        if (currentTime - lastSendTime > l1FlushPeriod) {
+            mergeDataCache.read().forEach(
+                data -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug(data.toString());
+                    }
+                    nextWorker.in(data);
                 }
-                nextWorker.in(data);
-            }
-        );
+            );
+            lastSendTime = currentTime;
+        }
     }
 
     private class AggregatorConsumer implements IConsumer<Metrics> {
@@ -122,4 +134,4 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
         public void onExit() {
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 9d8faf7..3a40777 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -71,6 +71,12 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
     private List<MetricsPersistentWorker> persistentWorkers = new 
ArrayList<>();
 
     /**
+     * The period of L1 aggregation flush. Unit is ms.
+     */
+    @Setter
+    @Getter
+    private long l1FlushPeriod = 500;
+    /**
      * Hold and forward CoreModuleConfig#enableDatabaseSession to the 
persistent worker.
      */
     @Setter
@@ -97,7 +103,9 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
      * @param metricsClass       data type of the streaming calculation.
      */
     @Override
-    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, 
Class<? extends Metrics> metricsClass) throws StorageException {
+    public void create(ModuleDefineHolder moduleDefineHolder,
+                       Stream stream,
+                       Class<? extends Metrics> metricsClass) throws 
StorageException {
         this.create(moduleDefineHolder, StreamDefinition.from(stream), 
metricsClass);
     }
 
@@ -108,7 +116,8 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         final StorageBuilderFactory storageBuilderFactory = 
moduleDefineHolder.find(StorageModule.NAME)
                                                                               
.provider()
                                                                               
.getService(StorageBuilderFactory.class);
-        final Class<? extends StorageBuilder> builder = 
storageBuilderFactory.builderOf(metricsClass, stream.getBuilder());
+        final Class<? extends StorageBuilder> builder = 
storageBuilderFactory.builderOf(
+            metricsClass, stream.getBuilder());
 
         StorageDAO storageDAO = 
moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IMetricsDAO metricsDAO;
@@ -167,7 +176,7 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
 
         MetricsRemoteWorker remoteWorker = new 
MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
         MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
-            moduleDefineHolder, remoteWorker, stream.getName());
+            moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod);
 
         entryWorkers.put(metricsClass, aggregateWorker);
     }

Reply via email to