This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch refator
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit a7662fdfc33794fc744e9d3842c1038048af214b
Author: Wu Sheng <[email protected]>
AuthorDate: Sun Dec 4 21:16:33 2022 +0800

    Refactor session cache in MetricsPersistentWorker.
---
 docs/en/changes/changes.md                         |  1 +
 .../analysis/worker/MetricsPersistentWorker.java   | 35 ++--------
 .../core/analysis/worker/MetricsSessionCache.java  | 81 ++++++++++++++++++++++
 .../server/core/storage/SessionCacheCallback.java  |  5 +-
 4 files changed, 90 insertions(+), 32 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index b73f770b4c..1286c9129e 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -6,6 +6,7 @@
 
 * Add `ServerStatusService` in the core module to provide a new way to expose 
booting status to other modules.
 * Adds Micrometer as a new component.(ID=141)
+* Refactor session cache in MetricsPersistentWorker.
 
 #### UI
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 8ab2e69777..263ba36eff 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -61,25 +61,13 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
     private static long SESSION_TIMEOUT_OFFSITE_COUNTER = 0;
 
     private final Model model;
-    /**
-     * The session cache holds the latest metrics in-memory.
-     * There are two ways to make sure metrics in-cache,
-     * 1. Metrics is read from the Database through {@link 
#loadFromStorage(List)}
-     * 2. The built {@link InsertRequest} executed successfully.
-     *
-     * There are two cases to remove metrics from the cache.
-     * 1. The metrics expired.
-     * 2. The built {@link UpdateRequest} executed failure, which could be 
caused
-     * (1) Database error. (2) No data updated, such as the counter of update 
statement is 0 in JDBC.
-     */
-    private final Map<Metrics, Metrics> sessionCache;
+    private final MetricsSessionCache sessionCache;
     private final IMetricsDAO metricsDAO;
     private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
     private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
     private final DataCarrier<Metrics> dataCarrier;
     private final Optional<MetricsTransWorker> transWorker;
     private final boolean supportUpdate;
-    private long sessionTimeout;
     /**
      * The counter of L2 aggregation.
      */
@@ -113,17 +101,12 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
                             long storageSessionTimeout, int metricsDataTTL, 
MetricStreamKind kind) {
         super(moduleDefineHolder, new ReadWriteSafeCache<>(new 
MergableBufferedData(), new MergableBufferedData()));
         this.model = model;
-        // Due to the cache would be updated depending on final storage 
implementation,
-        // the map/cache could be updated concurrently.
-        // Set to ConcurrentHashMap in order to avoid HashMap deadlock.
-        // Since 9.3.0
-        this.sessionCache = new ConcurrentHashMap<>(100);
+        this.sessionCache = new MetricsSessionCache(storageSessionTimeout);
         this.metricsDAO = metricsDAO;
         this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
         this.nextExportWorker = Optional.ofNullable(nextExportWorker);
         this.transWorker = Optional.ofNullable(transWorker);
         this.supportUpdate = supportUpdate;
-        this.sessionTimeout = storageSessionTimeout;
         this.persistentCounter = 0;
         this.persistentMod = 1;
         this.metricsDataTTL = metricsDataTTL;
@@ -186,7 +169,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
         // For a down-sampling metrics, we prolong the session timeout for 4 
times, nearly 5 minutes.
         // And add offset according to worker creation sequence, to avoid 
context clear overlap,
         // eventually optimize load of IDs reading.
-        this.sessionTimeout = this.sessionTimeout * 4 + 
SESSION_TIMEOUT_OFFSITE_COUNTER * 200;
+        sessionCache.setTimeoutThreshold(storageSessionTimeout * 4 + 
SESSION_TIMEOUT_OFFSITE_COUNTER * 200);
         // The down sampling level worker executes every 4 periods.
         this.persistentMod = 4;
     }
@@ -351,7 +334,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
                 return;
             }
 
-            metricsDAO.multiGet(model, notInCacheMetrics).forEach(m -> 
sessionCache.put(m, m));
+            metricsDAO.multiGet(model, notInCacheMetrics).forEach(m -> 
sessionCache.put(m));
         } catch (final Exception e) {
             log.error("Failed to load metrics for merging", e);
         }
@@ -359,15 +342,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
 
     @Override
     public void endOfRound() {
-        Iterator<Metrics> iterator = sessionCache.values().iterator();
-        long timestamp = System.currentTimeMillis();
-        while (iterator.hasNext()) {
-            Metrics metrics = iterator.next();
-
-            if (metrics.isExpired(timestamp, sessionTimeout)) {
-                iterator.remove();
-            }
-        }
+        sessionCache.removeExpired();
     }
 
     /**
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java
new file mode 100644
index 0000000000..7d0cc7e7b1
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.AccessLevel;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+/**
+ * MetricsSessionCache is a key-value cache to hold hot metric in-memory to 
reduce payload to pre-read.
+ *
+ * There are two ways to make sure metrics in-cache,
+ * 1. Metrics is read from the Database through {@link 
MetricsPersistentWorker}.loadFromStorage
+ * 2. The built {@link InsertRequest} executed successfully.
+ *
+ * There are two cases to remove metrics from the cache.
+ * 1. The metrics expired.
+ * 2. The built {@link UpdateRequest} executed failure, which could be caused
+ * (1) Database error. (2) No data updated, such as the counter of update 
statement is 0 in JDBC.
+ *
+ * @since 9.4.0 Created this from MetricsPersistentWorker.sessionCache.
+ */
+public class MetricsSessionCache {
+    private final Map<Metrics, Metrics> sessionCache;
+    @Setter(AccessLevel.PACKAGE)
+    private long timeoutThreshold;
+
+    public MetricsSessionCache(long timeoutThreshold) {
+        // Due to the cache would be updated depending on final storage 
implementation,
+        // the map/cache could be updated concurrently.
+        // Set to ConcurrentHashMap in order to avoid HashMap deadlock.
+        // Since 9.3.0
+        this.sessionCache = new ConcurrentHashMap<>(100);
+        this.timeoutThreshold = timeoutThreshold;
+    }
+
+    Metrics get(Metrics metrics) {
+        return sessionCache.get(metrics);
+    }
+
+    public Metrics remove(Metrics metrics) {
+        return sessionCache.remove(metrics);
+    }
+
+    public void put(Metrics metrics) {
+        sessionCache.put(metrics, metrics);
+    }
+
+    void removeExpired(){
+        Iterator<Metrics> iterator = sessionCache.values().iterator();
+        long timestamp = System.currentTimeMillis();
+        while (iterator.hasNext()) {
+            Metrics metrics = iterator.next();
+
+            if (metrics.isExpired(timestamp, timeoutThreshold)) {
+                iterator.remove();
+            }
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
index e417b71529..fb2ae0d67e 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
@@ -21,13 +21,14 @@ package org.apache.skywalking.oap.server.core.storage;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.MetricsSessionCache;
 
 /**
  * SessionCacheCallback provides a bridge for storage implementations
  */
 @RequiredArgsConstructor
 public class SessionCacheCallback {
-    private final Map<Metrics, Metrics> sessionCache;
+    private final MetricsSessionCache sessionCache;
     private final Metrics metrics;
     /**
      * In some cases, this callback could be shared by multiple executions, 
such as SQLExecutor#additionalSQLs.
@@ -40,7 +41,7 @@ public class SessionCacheCallback {
         if (isFailed) {
             return;
         }
-        sessionCache.put(metrics, metrics);
+        sessionCache.put(metrics);
     }
 
     public void onUpdateFailure() {

Reply via email to