This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch enhance-cache
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 16c4bde932e8dd45423f46bc8b7cdf1879152529
Author: Wu Sheng <[email protected]>
AuthorDate: Mon Dec 5 08:45:49 2022 +0800

    Cache enhancement - don't read new metrics from database in minute 
dimensionality
---
 docs/en/changes/changes.md                         | 14 +++++
 .../analysis/worker/MetricsPersistentWorker.java   | 59 +++++++++++++++++++++-
 .../server/core/status/ServerStatusService.java    |  2 +
 3 files changed, 74 insertions(+), 1 deletion(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 1286c9129e..bf96fcabe7 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -7,6 +7,20 @@
 * Add `ServerStatusService` in the core module to provide a new way to expose 
booting status to other modules.
 * Adds Micrometer as a new component.(ID=141)
 * Refactor session cache in MetricsPersistentWorker.
+* Cache enhancement - don't read new metrics from database in minute 
dimensionality.
+```
+    // When
+    // (1) the time bucket of the server's latest stability status is provided
+    //     1.1 the OAP has booted successfully
+    //     1.2 the current dimensionality is in minute.
+    // (2) the metrics are from the time after the timeOfLatestStabilitySts
+    // (3) the metrics don't exist in the cache
+    // the kernel should NOT try to load it from the database.
+    //
+    // Notice, about condition (2),
+    // for the specific minute of booted successfully, the metrics are 
expected to load from database when
+    // it doesn't exist in the cache.
+```
 
 #### UI
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 51e9dfd937..195c4ab7ae 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -25,11 +25,15 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import 
org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
 import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
+import org.apache.skywalking.oap.server.core.status.ServerStatusService;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -89,6 +93,16 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
      * @since 8.7.0 TTL settings from {@link 
org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()}
      */
     private int metricsDataTTL;
+    /**
+     * @since 9.4.0
+     */
+    private final ServerStatusService serverStatusService;
+    /**
+     * The time bucket is 0 or in minute dimensionality of the system in the 
latest stability status.
+     *
+     * @since 9.4.0
+     */
+    private long timeOfLatestStabilitySts = 0;
 
     MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model 
model, IMetricsDAO metricsDAO,
                             AbstractWorker<Metrics> nextAlarmWorker, 
AbstractWorker<ExportEvent> nextExportWorker,
@@ -145,6 +159,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
             new MetricsTag.Keys("status"), new MetricsTag.Values("cached")
         );
         SESSION_TIMEOUT_OFFSITE_COUNTER++;
+        serverStatusService = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServerStatusService.class);
     }
 
     /**
@@ -165,6 +180,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
         // And add offset according to worker creation sequence, to avoid 
context clear overlap,
         // eventually optimize load of IDs reading.
         sessionCache.setTimeoutThreshold(storageSessionTimeout * 4 + 
SESSION_TIMEOUT_OFFSITE_COUNTER * 200);
+        // Set cache mode in normal mode for high dimensionality metrics, such 
as hour/day metrics.
         // The down sampling level worker executes every 4 periods.
         this.persistentMod = 4;
     }
@@ -191,6 +207,11 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
             return Collections.emptyList();
         }
 
+        if (model.getDownsampling().equals(DownSampling.Minute)) {
+            timeOfLatestStabilitySts = TimeBucket.getMinuteTimeBucket(
+                serverStatusService.getBootingStatus().getUptime());
+        }
+
         /*
          * Hard coded the max size. This only affect the multiIDRead if the 
data doesn't hit the cache.
          */
@@ -299,7 +320,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
             List<Metrics> notInCacheMetrics =
                 metrics.stream()
                        .filter(m -> {
-                           final Metrics cachedValue = sessionCache.get(m);
+                           final Metrics cachedValue = 
requireInitialization(m);
                            // the metric is tagged `not in cache`.
                            if (cachedValue == null) {
                                return true;
@@ -340,6 +361,42 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
         sessionCache.removeExpired();
     }
 
+    /**
+     * Check the metrics whether in the cache, and whether the worker should 
go further to load from database.
+     *
+     * @param metrics the metrics in the streaming process.
+     * @return metrics in cache or null if try to read the metrics from the 
database.
+     */
+    private Metrics requireInitialization(Metrics metrics) {
+        final Metrics cached = sessionCache.get(metrics);
+
+        // All cached metrics, it at least had been written once.
+        if (cached != null) {
+            return cached;
+        }
+
+        // When
+        // (1) the time bucket of the server's latest stability status is 
provided
+        //     1.1 the OAP has booted successfully
+        //     1.2 the current dimensionality is in minute.
+        // (2) the metrics are from the time after the timeOfLatestStabilitySts
+        // (3) the metrics don't exist in the cache
+        // the kernel should NOT try to load it from the database.
+        //
+        // Notice, about condition (2),
+        // for the specific minute of booted successfully, the metrics are 
expected to load from database when
+        // it doesn't exist in the cache.
+        if (timeOfLatestStabilitySts > 0 &&
+            metrics.getTimeBucket() > timeOfLatestStabilitySts
+            && cached == null) {
+            // Return metrics as input to avoid reading from database.
+            sessionCache.put(metrics);
+            return metrics;
+        }
+
+        return null;
+    }
+
     /**
      * Metrics queue processor, merge the received metrics if existing one 
with same ID(s) and time bucket.
      *
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java
index 27a3b328df..fac17a1b5d 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.status;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.Service;
@@ -35,6 +36,7 @@ import 
org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 @RequiredArgsConstructor
 public class ServerStatusService implements Service {
     private final ModuleManager manager;
+    @Getter
     private BootingStatus bootingStatus = new BootingStatus();
 
     public void bootedNow(long uptime) {

Reply via email to