This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a4ee08  Enhance persistent session timeout mechanism. (#7334)
3a4ee08 is described below

commit 3a4ee08e54bd3f08441f1023fd25442d6a2badde
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Tue Jul 20 07:50:27 2021 +0800

    Enhance persistent session timeout mechanism. (#7334)
    
    Fix bug, the enhanced session could cache the metadata metrics(hot entity) 
forever. A new timeout mechanism is designed for avoiding this specific case.
    
    Optimize this timeout mechanism, make it different for ES(one index per 
day) and non-ES storage implementation.
---
 CHANGES.md                                         |  3 ++
 .../oap/server/core/CoreModuleProvider.java        | 10 +++++
 .../analysis/worker/MetricsPersistentWorker.java   | 47 ++++++++++++++++++----
 .../analysis/worker/MetricsStreamProcessor.java    | 25 +++++++++---
 .../oap/server/core/storage/IMetricsDAO.java       | 18 +++++++++
 .../plugin/elasticsearch/base/MetricsEsDAO.java    | 25 ++++++++++++
 6 files changed, 115 insertions(+), 13 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index c1fce5a..52fd322 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,7 @@ Release Notes.
   the master branch codes, please don't use in production environments.
 
 #### Java Agent
+
 * Supports modifying span attributes in async mode.
 * Agent supports the collection of JVM arguments and jar dependency 
information.
 * [Temporary] Support authentication for log report channel. This feature and 
grpc channel is going to be removed after
@@ -108,6 +109,8 @@ Release Notes.
 * Optimization: Concurrency mode of execution stage for metrics is 
removed(added in 8.5.0). Only concurrency of prepare
   stage is meaningful and kept.
 * Fix -meters metrics topic isn't created with namespace issue
+* Enhance persistent session timeout mechanism. Because the enhanced session 
could cache the metadata metrics forever,
+  new timeout mechanism is designed for avoiding this specific case.
 
 #### UI
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 31f5112..1be58a3 100755
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -290,10 +290,20 @@ public class CoreModuleProvider extends ModuleProvider {
         this.registerServiceImplementation(
             UITemplateManagementService.class, new 
UITemplateManagementService(getManager()));
 
+        if (moduleConfig.getMetricsDataTTL() < 2) {
+            throw new ModuleStartException(
+                "Metric TTL should be at least 2 days, current value is " + 
moduleConfig.getMetricsDataTTL());
+        }
+        if (moduleConfig.getRecordDataTTL() < 2) {
+            throw new ModuleStartException(
+                "Record TTL should be at least 2 days, current value is " + 
moduleConfig.getRecordDataTTL());
+        }
+
         final MetricsStreamProcessor metricsStreamProcessor = 
MetricsStreamProcessor.getInstance();
         
metricsStreamProcessor.setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
         
metricsStreamProcessor.setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
         
metricsStreamProcessor.setStorageSessionTimeout(moduleConfig.getStorageSessionTimeout());
+        
metricsStreamProcessor.setMetricsDataTTL(moduleConfig.getMetricsDataTTL());
         
TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
         apdexThresholdConfig = new ApdexThresholdConfig(this);
         ApdexMetrics.setDICT(apdexThresholdConfig);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 65e31a4..172eb04 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -78,11 +78,15 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
      * every {@link #persistentMod} periods. And minute level workers execute 
every time.
      */
     private int persistentMod;
+    /**
+     * @since 8.7.0 TTL settings from {@link 
org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()}
+     */
+    private int metricsDataTTL;
 
     MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model 
model, IMetricsDAO metricsDAO,
                             AbstractWorker<Metrics> nextAlarmWorker, 
AbstractWorker<ExportEvent> nextExportWorker,
                             MetricsTransWorker transWorker, boolean 
enableDatabaseSession, boolean supportUpdate,
-                            long storageSessionTimeout) {
+                            long storageSessionTimeout, int metricsDataTTL) {
         super(moduleDefineHolder, new ReadWriteSafeCache<>(new 
MergableBufferedData(), new MergableBufferedData()));
         this.model = model;
         this.context = new HashMap<>(100);
@@ -95,6 +99,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
         this.sessionTimeout = storageSessionTimeout;
         this.persistentCounter = 0;
         this.persistentMod = 1;
+        this.metricsDataTTL = metricsDataTTL;
 
         String name = "METRICS_L2_AGGREGATION";
         int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
@@ -125,11 +130,16 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
     /**
      * Create the leaf and down-sampling MetricsPersistentWorker, no next step.
      */
-    MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model 
model, IMetricsDAO metricsDAO,
-                            boolean enableDatabaseSession, boolean 
supportUpdate, long storageSessionTimeout) {
+    MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder,
+                            Model model,
+                            IMetricsDAO metricsDAO,
+                            boolean enableDatabaseSession,
+                            boolean supportUpdate,
+                            long storageSessionTimeout,
+                            int metricsDataTTL) {
         this(moduleDefineHolder, model, metricsDAO,
              null, null, null,
-             enableDatabaseSession, supportUpdate, storageSessionTimeout
+             enableDatabaseSession, supportUpdate, storageSessionTimeout, 
metricsDataTTL
         );
         // For a down-sampling metrics, we prolong the session timeout for 4 
times, nearly 5 minutes.
         // And add offset according to worker creation sequence, to avoid 
context clear overlap,
@@ -246,10 +256,33 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
      * Load data from the storage, if {@link #enableDatabaseSession} == true, 
only load data when the id doesn't exist.
      */
     private void loadFromStorage(List<Metrics> metrics) {
+        final long currentTimeMillis = System.currentTimeMillis();
         try {
-            List<Metrics> notInCacheMetrics = metrics.stream()
-                                                     .filter(m -> 
!context.containsKey(m) || !enableDatabaseSession)
-                                                     
.collect(Collectors.toList());
+            List<Metrics> notInCacheMetrics =
+                metrics.stream()
+                       .filter(m -> {
+                           final Metrics cachedValue = context.get(m);
+                           // Not cached or session disabled, the metric could 
be tagged `not in cache`.
+                           if (cachedValue == null || !enableDatabaseSession) {
+                               return true;
+                           }
+                           // The metric is in the cache, but still we have to 
check
+                           // whether the cache is expired due to TTL.
+                           // This is a cache-DB inconsistent case:
+                           // Metrics keep coming due to traffic, but the 
entity in the
+                           // database has been removed due to TTL.
+                           if (!model.isTimeRelativeID() && supportUpdate) {
+                               // Mostly all updatable metadata level metrics 
are required to do this check.
+
+                               if (metricsDAO.isExpiredCache(model, 
cachedValue, currentTimeMillis, metricsDataTTL)) {
+                                   // The expired metrics should be tagged 
`not in cache` directly.
+                                   return true;
+                               }
+                           }
+
+                           return false;
+                       })
+                       .collect(Collectors.toList());
             if (notInCacheMetrics.isEmpty()) {
                 return;
             }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 6c323f4..1fb1d41 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -87,6 +87,11 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
      */
     @Setter
     private long storageSessionTimeout = 70_000;
+    /**
+     * @since 8.7.0 TTL settings from {@link 
org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()}
+     */
+    @Setter
+    private int metricsDataTTL = 3;
 
     public static MetricsStreamProcessor getInstance() {
         return PROCESSOR;
@@ -157,12 +162,16 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         if (supportDownSampling) {
             if (configService.shouldToHour()) {
                 Model model = modelSetter.add(
-                    metricsClass, stream.getScopeId(), new 
Storage(stream.getName(), timeRelativeID, DownSampling.Hour), false);
+                    metricsClass, stream.getScopeId(), new 
Storage(stream.getName(), timeRelativeID, DownSampling.Hour),
+                    false
+                );
                 hourPersistentWorker = downSamplingWorker(moduleDefineHolder, 
metricsDAO, model, supportUpdate);
             }
             if (configService.shouldToDay()) {
                 Model model = modelSetter.add(
-                    metricsClass, stream.getScopeId(), new 
Storage(stream.getName(), timeRelativeID, DownSampling.Day), false);
+                    metricsClass, stream.getScopeId(), new 
Storage(stream.getName(), timeRelativeID, DownSampling.Day),
+                    false
+                );
                 dayPersistentWorker = downSamplingWorker(moduleDefineHolder, 
metricsDAO, model, supportUpdate);
             }
 
@@ -171,7 +180,9 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         }
 
         Model model = modelSetter.add(
-            metricsClass, stream.getScopeId(), new Storage(stream.getName(), 
timeRelativeID, DownSampling.Minute), false);
+            metricsClass, stream.getScopeId(), new Storage(stream.getName(), 
timeRelativeID, DownSampling.Minute),
+            false
+        );
         MetricsPersistentWorker minutePersistentWorker = 
minutePersistentWorker(
             moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
 
@@ -197,8 +208,8 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
 
         MetricsPersistentWorker minutePersistentWorker = new 
MetricsPersistentWorker(
-            moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, 
exportWorker, transWorker, enableDatabaseSession,
-            supportUpdate, storageSessionTimeout
+            moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, 
exportWorker, transWorker,
+            enableDatabaseSession, supportUpdate, storageSessionTimeout, 
metricsDataTTL
         );
         persistentWorkers.add(minutePersistentWorker);
 
@@ -210,7 +221,9 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
                                                        Model model,
                                                        boolean supportUpdate) {
         MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
-            moduleDefineHolder, model, metricsDAO, enableDatabaseSession, 
supportUpdate, storageSessionTimeout);
+            moduleDefineHolder, model, metricsDAO,
+            enableDatabaseSession, supportUpdate, storageSessionTimeout, 
metricsDataTTL
+        );
         persistentWorkers.add(persistentWorker);
 
         return persistentWorker;
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index 2b4c05a..4320a83 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -20,6 +20,8 @@ package org.apache.skywalking.oap.server.core.storage;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -54,4 +56,20 @@ public interface IMetricsDAO extends DAO {
      * executed ASAP.
      */
     UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws 
IOException;
+
+    /**
+     * Calculate the expired status of the metric by given current timestamp, 
metric and TTL.
+     *
+     * @param model             of the given cached value
+     * @param cachedValue       is a metric instance
+     * @param currentTimeMillis current system time of OAP.
+     * @param ttl               from core setting.
+     * @return true if the metric is expired.
+     */
+    default boolean isExpiredCache(Model model, Metrics cachedValue, long 
currentTimeMillis, int ttl) {
+        final long metricTimestamp = TimeBucket.getTimestamp(
+            cachedValue.getTimeBucket(), model.getDownsampling());
+        // If the cached metric is older than the TTL indicated.
+        return currentTimeMillis - metricTimestamp > 
TimeUnit.DAYS.toMillis(ttl);
+    }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 0f204c9..3574a81 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -23,7 +23,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
@@ -34,6 +37,7 @@ import 
org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.joda.time.DateTime;
 
 import static java.util.stream.Collectors.groupingBy;
 
@@ -117,4 +121,25 @@ public class MetricsEsDAO extends EsDAO implements 
IMetricsDAO {
         String id = IndexController.INSTANCE.generateDocId(model, 
metrics.id());
         return getClient().prepareUpdate(modelName, id, builder);
     }
+
+    @Override
+    public boolean isExpiredCache(final Model model,
+                                  final Metrics cachedValue,
+                                  final long currentTimeMillis,
+                                  final int ttl) {
+        final long metricTimestamp = TimeBucket.getTimestamp(
+            cachedValue.getTimeBucket(), model.getDownsampling());
+        // Fast fail check. If the duration is still less than TTL - 1 
days(absolute)
+        // the cache should not be expired.
+        if (currentTimeMillis - metricTimestamp < TimeUnit.DAYS.toMillis(ttl - 
1)) {
+            return false;
+        }
+        final long deadline = Long.parseLong(new 
DateTime(currentTimeMillis).plusDays(-ttl).toString("yyyyMMdd"));
+        final long timeBucket = 
TimeBucket.getTimeBucket(cachedValue.getTimeBucket(), DownSampling.Day);
+        // If time bucket is earlier or equals(mostly) the deadline, then the 
cached metric is expired.
+        if (timeBucket <= deadline) {
+            return true;
+        }
+        return false;
+    }
 }

Reply via email to