This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch session-polish
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 89aa556b16ccf8e3f90537559a3052cd55b74630
Author: Wu Sheng <[email protected]>
AuthorDate: Thu Jul 1 16:56:41 2021 +0800

    Make session expired mechanism more clear.
---
 .../oap/server/core/analysis/metrics/Metrics.java  | 22 ++++++++++++++++------
 .../analysis/worker/MetricsPersistentWorker.java   | 21 ++++++++++++---------
 .../core/analysis/worker/PersistenceWorker.java    |  4 +---
 .../server/core/analysis/worker/TopNWorker.java    |  2 +-
 .../oap/server/core/storage/PersistenceTimer.java  | 10 ++++------
 5 files changed, 34 insertions(+), 25 deletions(-)

diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java
index 904929e..ec1608a 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java
@@ -50,13 +50,14 @@ public abstract class Metrics extends StreamData implements 
StorageData {
      * Time in the cache, only work when 
MetricsPersistentWorker#enableDatabaseSession == true.
      */
     @Getter
-    private long survivalTime = 0L;
+    private long lastUpdateTimestamp = 0L;
 
     /**
      * Merge the given metrics instance, these two must be the same metrics 
type.
      *
      * @param metrics to be merged
-     * @return {@code true} if the combined metrics should be continuously 
processed. {@code false} means it should be abandoned, and the implementation 
needs to keep the data unaltered in this case.
+     * @return {@code true} if the combined metrics should be continuously 
processed. {@code false} means it should be
+     * abandoned, and the implementation needs to keep the data unaltered in 
this case.
      */
     public abstract boolean combine(Metrics metrics);
 
@@ -80,12 +81,21 @@ public abstract class Metrics extends StreamData implements 
StorageData {
     public abstract Metrics toDay();
 
     /**
-     * Extend the {@link #survivalTime}
+     * Set the last update timestamp
      *
-     * @param value to extend
+     * @param timestamp to extend
      */
-    public void extendSurvivalTime(long value) {
-        survivalTime += value;
+    public void setLastUpdateTimestamp(long timestamp) {
+        lastUpdateTimestamp += timestamp;
+    }
+
+    /**
+     * @param timestamp        of current time
+     * @param expiredThreshold represents the duration between last update 
time and the time point removing from cache.
+     * @return true means this metrics should be removed from cache.
+     */
+    public boolean isSurvival(long timestamp, long expiredThreshold) {
+        return timestamp - lastUpdateTimestamp > expiredThreshold;
     }
 
     public long toTimeBucketInHour() {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index b7c0735..a657121 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -171,6 +171,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
         try {
             loadFromStorage(metricsList);
 
+            long timestamp = System.currentTimeMillis();
             for (Metrics metrics : metricsList) {
                 Metrics cachedMetrics = context.get(metrics);
                 if (cachedMetrics != null) {
@@ -191,10 +192,12 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
                     cachedMetrics.calculate();
                     prepareRequests.add(metricsDAO.prepareBatchUpdate(model, 
cachedMetrics));
                     nextWorker(cachedMetrics);
+                    cachedMetrics.setLastUpdateTimestamp(timestamp);
                 } else {
                     metrics.calculate();
                     prepareRequests.add(metricsDAO.prepareBatchInsert(model, 
metrics));
                     nextWorker(metrics);
+                    metrics.setLastUpdateTimestamp(timestamp);
                 }
 
                 /*
@@ -221,14 +224,14 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
      */
     private void loadFromStorage(List<Metrics> metrics) {
         try {
-            List<Metrics> noInCacheMetrics = metrics.stream()
-                                                    .filter(m -> 
!context.containsKey(m) || !enableDatabaseSession)
-                                                    
.collect(Collectors.toList());
-            if (noInCacheMetrics.isEmpty()) {
+            List<Metrics> notInCacheMetrics = metrics.stream()
+                                                     .filter(m -> 
!context.containsKey(m) || !enableDatabaseSession)
+                                                     
.collect(Collectors.toList());
+            if (notInCacheMetrics.isEmpty()) {
                 return;
             }
-            
-            final List<Metrics> dbMetrics = metricsDAO.multiGet(model, 
noInCacheMetrics);
+
+            final List<Metrics> dbMetrics = metricsDAO.multiGet(model, 
notInCacheMetrics);
             if (!enableDatabaseSession) {
                 // Clear the cache only after results from DB are returned 
successfully.
                 context.clear();
@@ -240,14 +243,14 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics> {
     }
 
     @Override
-    public void endOfRound(long tookTime) {
+    public void endOfRound() {
         if (enableDatabaseSession) {
             Iterator<Metrics> iterator = context.values().iterator();
+            long timestamp = System.currentTimeMillis();
             while (iterator.hasNext()) {
                 Metrics metrics = iterator.next();
-                metrics.extendSurvivalTime(tookTime);
 
-                if (metrics.getSurvivalTime() > sessionTimeout) {
+                if (metrics.isSurvival(timestamp, sessionTimeout)) {
                     iterator.remove();
                 }
             }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index 415371b..e380893 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -55,10 +55,8 @@ public abstract class PersistenceWorker<INPUT extends 
StorageData> extends Abstr
     /**
      * The persistence process is driven by the {@link 
org.apache.skywalking.oap.server.core.storage.PersistenceTimer}.
      * This is a notification method for the worker when every round finished.
-     *
-     * @param tookTime The time costs in this round.
      */
-    public abstract void endOfRound(long tookTime);
+    public abstract void endOfRound();
 
     /**
      * Prepare the batch persistence, transfer all prepared data to the 
executable data format based on the storage
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 9093a2f..1465bf0 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -86,7 +86,7 @@ public class TopNWorker extends PersistenceWorker<TopN> {
      * This method used to clear the expired cache, but TopN is not following 
it.
      */
     @Override
-    public void endOfRound(long tookTime) {
+    public void endOfRound() {
     }
 
     @Override
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index aaafc06..a7e1e89 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -54,9 +54,8 @@ public enum PersistenceTimer {
     private HistogramMetrics prepareLatency;
     private HistogramMetrics executeLatency;
     private HistogramMetrics allLatency;
-    private long lastTime = System.currentTimeMillis();
     private int syncOperationThreadsNum;
-    private int maxSyncoperationNum;
+    private int maxSyncOperationNum;
     private ExecutorService executorService;
     private ExecutorService prepareExecutorService;
 
@@ -89,7 +88,7 @@ public enum PersistenceTimer {
         );
 
         syncOperationThreadsNum = moduleConfig.getSyncThreads();
-        maxSyncoperationNum = moduleConfig.getMaxSyncOperationNum();
+        maxSyncOperationNum = moduleConfig.getMaxSyncOperationNum();
         executorService = 
Executors.newFixedThreadPool(syncOperationThreadsNum);
         prepareExecutorService = 
Executors.newFixedThreadPool(moduleConfig.getPrepareThreads());
         if (!isStarted) {
@@ -116,7 +115,7 @@ public enum PersistenceTimer {
         AtomicBoolean stop = new AtomicBoolean(false);
 
         DefaultBlockingBatchQueue<PrepareRequest> prepareQueue = new 
DefaultBlockingBatchQueue(
-            this.maxSyncoperationNum);
+            this.maxSyncOperationNum);
         try {
             List<PersistenceWorker<? extends StorageData>> persistenceWorkers 
= new ArrayList<>();
             
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
@@ -142,7 +141,7 @@ public enum PersistenceTimer {
                         // Push the prepared requests into 
DefaultBlockingBatchQueue,
                         // the executorService consumes from it when it 
reaches the size of batch.
                         prepareQueue.offer(innerPrepareRequests);
-                        worker.endOfRound(System.currentTimeMillis() - 
lastTime);
+                        worker.endOfRound();
                     } finally {
                         timer.finish();
                         prepareStageCountDownLatch.countDown();
@@ -194,7 +193,6 @@ public enum PersistenceTimer {
 
             stop.set(true);
             allTimer.finish();
-            lastTime = System.currentTimeMillis();
         }
 
         if (debug) {

Reply via email to