This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c31172c3c Refactor session cache in MetricsPersistentWorker. (#10084)
8c31172c3c is described below

commit 8c31172c3c1f4adb2959f03fa694e6146fbaf703
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Sun Dec 4 22:41:57 2022 +0800

    Refactor session cache in MetricsPersistentWorker. (#10084)
---
 docs/en/changes/changes.md                         |  1 +
 .../analysis/worker/MetricsPersistentWorker.java   | 40 ++---------
 .../core/analysis/worker/MetricsSessionCache.java  | 81 ++++++++++++++++++++++
 .../server/core/storage/SessionCacheCallback.java  |  6 +-
 4 files changed, 90 insertions(+), 38 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index b73f770b4c..1286c9129e 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -6,6 +6,7 @@
 
 * Add `ServerStatusService` in the core module to provide a new way to expose 
booting status to other modules.
 * Adds Micrometer as a new component.(ID=141)
+* Refactor session cache in MetricsPersistentWorker.
 
 #### UI
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 8ab2e69777..51e9dfd937 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -20,12 +20,9 @@ package 
org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
@@ -37,9 +34,7 @@ import 
org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
-import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
 import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
 import 
org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
@@ -61,25 +56,13 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
     private static long SESSION_TIMEOUT_OFFSITE_COUNTER = 0;
 
     private final Model model;
-    /**
-     * The session cache holds the latest metrics in-memory.
-     * There are two ways to make sure metrics in-cache,
-     * 1. Metrics is read from the Database through {@link 
#loadFromStorage(List)}
-     * 2. The built {@link InsertRequest} executed successfully.
-     *
-     * There are two cases to remove metrics from the cache.
-     * 1. The metrics expired.
-     * 2. The built {@link UpdateRequest} executed failure, which could be 
caused
-     * (1) Database error. (2) No data updated, such as the counter of update 
statement is 0 in JDBC.
-     */
-    private final Map<Metrics, Metrics> sessionCache;
+    private final MetricsSessionCache sessionCache;
     private final IMetricsDAO metricsDAO;
     private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
     private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
     private final DataCarrier<Metrics> dataCarrier;
     private final Optional<MetricsTransWorker> transWorker;
     private final boolean supportUpdate;
-    private long sessionTimeout;
     /**
      * The counter of L2 aggregation.
      */
@@ -113,17 +96,12 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
                             long storageSessionTimeout, int metricsDataTTL, 
MetricStreamKind kind) {
         super(moduleDefineHolder, new ReadWriteSafeCache<>(new 
MergableBufferedData(), new MergableBufferedData()));
         this.model = model;
-        // Due to the cache would be updated depending on final storage 
implementation,
-        // the map/cache could be updated concurrently.
-        // Set to ConcurrentHashMap in order to avoid HashMap deadlock.
-        // Since 9.3.0
-        this.sessionCache = new ConcurrentHashMap<>(100);
+        this.sessionCache = new MetricsSessionCache(storageSessionTimeout);
         this.metricsDAO = metricsDAO;
         this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
         this.nextExportWorker = Optional.ofNullable(nextExportWorker);
         this.transWorker = Optional.ofNullable(transWorker);
         this.supportUpdate = supportUpdate;
-        this.sessionTimeout = storageSessionTimeout;
         this.persistentCounter = 0;
         this.persistentMod = 1;
         this.metricsDataTTL = metricsDataTTL;
@@ -186,7 +164,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
         // For a down-sampling metrics, we prolong the session timeout for 4 
times, nearly 5 minutes.
         // And add offset according to worker creation sequence, to avoid 
context clear overlap,
         // eventually optimize load of IDs reading.
-        this.sessionTimeout = this.sessionTimeout * 4 + 
SESSION_TIMEOUT_OFFSITE_COUNTER * 200;
+        sessionCache.setTimeoutThreshold(storageSessionTimeout * 4 + 
SESSION_TIMEOUT_OFFSITE_COUNTER * 200);
         // The down sampling level worker executes every 4 periods.
         this.persistentMod = 4;
     }
@@ -351,7 +329,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
                 return;
             }
 
-            metricsDAO.multiGet(model, notInCacheMetrics).forEach(m -> 
sessionCache.put(m, m));
+            metricsDAO.multiGet(model, notInCacheMetrics).forEach(m -> 
sessionCache.put(m));
         } catch (final Exception e) {
             log.error("Failed to load metrics for merging", e);
         }
@@ -359,15 +337,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
 
     @Override
     public void endOfRound() {
-        Iterator<Metrics> iterator = sessionCache.values().iterator();
-        long timestamp = System.currentTimeMillis();
-        while (iterator.hasNext()) {
-            Metrics metrics = iterator.next();
-
-            if (metrics.isExpired(timestamp, sessionTimeout)) {
-                iterator.remove();
-            }
-        }
+        sessionCache.removeExpired();
     }
 
     /**
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java
new file mode 100644
index 0000000000..93b1e421cf
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.AccessLevel;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+/**
+ * MetricsSessionCache is a key-value cache to hold hot metric in-memory to 
reduce payload to pre-read.
+ *
+ * There are two ways to make sure metrics in-cache,
+ * 1. Metrics is read from the Database through {@link 
MetricsPersistentWorker}.loadFromStorage
+ * 2. The built {@link InsertRequest} executed successfully.
+ *
+ * There are two cases to remove metrics from the cache.
+ * 1. The metrics expired.
+ * 2. The built {@link UpdateRequest} executed failure, which could be caused
+ * (1) Database error. (2) No data updated, such as the counter of update 
statement is 0 in JDBC.
+ *
+ * @since 9.4.0 Created this from MetricsPersistentWorker.sessionCache.
+ */
+public class MetricsSessionCache {
+    private final Map<Metrics, Metrics> sessionCache;
+    @Setter(AccessLevel.PACKAGE)
+    private long timeoutThreshold;
+
+    public MetricsSessionCache(long timeoutThreshold) {
+        // Due to the cache would be updated depending on final storage 
implementation,
+        // the map/cache could be updated concurrently.
+        // Set to ConcurrentHashMap in order to avoid HashMap deadlock.
+        // Since 9.3.0
+        this.sessionCache = new ConcurrentHashMap<>(100);
+        this.timeoutThreshold = timeoutThreshold;
+    }
+
+    Metrics get(Metrics metrics) {
+        return sessionCache.get(metrics);
+    }
+
+    public Metrics remove(Metrics metrics) {
+        return sessionCache.remove(metrics);
+    }
+
+    public void put(Metrics metrics) {
+        sessionCache.put(metrics, metrics);
+    }
+
+    void removeExpired() {
+        Iterator<Metrics> iterator = sessionCache.values().iterator();
+        long timestamp = System.currentTimeMillis();
+        while (iterator.hasNext()) {
+            Metrics metrics = iterator.next();
+
+            if (metrics.isExpired(timestamp, timeoutThreshold)) {
+                iterator.remove();
+            }
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
index e417b71529..ceff064e05 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.oap.server.core.storage;
 
-import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.MetricsSessionCache;
 
 /**
  * SessionCacheCallback provides a bridge for storage implementations
  */
 @RequiredArgsConstructor
 public class SessionCacheCallback {
-    private final Map<Metrics, Metrics> sessionCache;
+    private final MetricsSessionCache sessionCache;
     private final Metrics metrics;
     /**
      * In some cases, this callback could be shared by multiple executions, 
such as SQLExecutor#additionalSQLs.
@@ -40,7 +40,7 @@ public class SessionCacheCallback {
         if (isFailed) {
             return;
         }
-        sessionCache.put(metrics, metrics);
+        sessionCache.put(metrics);
     }
 
     public void onUpdateFailure() {

Reply via email to