This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch session-polish in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 89aa556b16ccf8e3f90537559a3052cd55b74630 Author: Wu Sheng <[email protected]> AuthorDate: Thu Jul 1 16:56:41 2021 +0800 Make session expired mechanism more clear. --- .../oap/server/core/analysis/metrics/Metrics.java | 22 ++++++++++++++++------ .../analysis/worker/MetricsPersistentWorker.java | 21 ++++++++++++--------- .../core/analysis/worker/PersistenceWorker.java | 4 +--- .../server/core/analysis/worker/TopNWorker.java | 2 +- .../oap/server/core/storage/PersistenceTimer.java | 10 ++++------ 5 files changed, 34 insertions(+), 25 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java index 904929e..ec1608a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java @@ -50,13 +50,14 @@ public abstract class Metrics extends StreamData implements StorageData { * Time in the cache, only work when MetricsPersistentWorker#enableDatabaseSession == true. */ @Getter - private long survivalTime = 0L; + private long lastUpdateTimestamp = 0L; /** * Merge the given metrics instance, these two must be the same metrics type. * * @param metrics to be merged - * @return {@code true} if the combined metrics should be continuously processed. {@code false} means it should be abandoned, and the implementation needs to keep the data unaltered in this case. + * @return {@code true} if the combined metrics should be continuously processed. {@code false} means it should be + * abandoned, and the implementation needs to keep the data unaltered in this case. */ public abstract boolean combine(Metrics metrics); @@ -80,12 +81,21 @@ public abstract class Metrics extends StreamData implements StorageData { public abstract Metrics toDay(); /** - * Extend the {@link #survivalTime} + * Set the last update timestamp * - * @param value to extend + * @param timestamp to extend */ - public void extendSurvivalTime(long value) { - survivalTime += value; + public void setLastUpdateTimestamp(long timestamp) { + lastUpdateTimestamp += timestamp; + } + + /** + * @param timestamp of current time + * @param expiredThreshold represents the duration between last update time and the time point removing from cache. + * @return true means this metrics should be removed from cache. + */ + public boolean isSurvival(long timestamp, long expiredThreshold) { + return timestamp - lastUpdateTimestamp > expiredThreshold; } public long toTimeBucketInHour() { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index b7c0735..a657121 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -171,6 +171,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { try { loadFromStorage(metricsList); + long timestamp = System.currentTimeMillis(); for (Metrics metrics : metricsList) { Metrics cachedMetrics = context.get(metrics); if (cachedMetrics != null) { @@ -191,10 +192,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { cachedMetrics.calculate(); prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cachedMetrics)); nextWorker(cachedMetrics); + cachedMetrics.setLastUpdateTimestamp(timestamp); } else { metrics.calculate(); prepareRequests.add(metricsDAO.prepareBatchInsert(model, metrics)); nextWorker(metrics); + metrics.setLastUpdateTimestamp(timestamp); } /* @@ -221,14 +224,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { */ private void loadFromStorage(List<Metrics> metrics) { try { - List<Metrics> noInCacheMetrics = metrics.stream() - .filter(m -> !context.containsKey(m) || !enableDatabaseSession) - .collect(Collectors.toList()); - if (noInCacheMetrics.isEmpty()) { + List<Metrics> notInCacheMetrics = metrics.stream() + .filter(m -> !context.containsKey(m) || !enableDatabaseSession) + .collect(Collectors.toList()); + if (notInCacheMetrics.isEmpty()) { return; } - - final List<Metrics> dbMetrics = metricsDAO.multiGet(model, noInCacheMetrics); + + final List<Metrics> dbMetrics = metricsDAO.multiGet(model, notInCacheMetrics); if (!enableDatabaseSession) { // Clear the cache only after results from DB are returned successfully. context.clear(); @@ -240,14 +243,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { } @Override - public void endOfRound(long tookTime) { + public void endOfRound() { if (enableDatabaseSession) { Iterator<Metrics> iterator = context.values().iterator(); + long timestamp = System.currentTimeMillis(); while (iterator.hasNext()) { Metrics metrics = iterator.next(); - metrics.extendSurvivalTime(tookTime); - if (metrics.getSurvivalTime() > sessionTimeout) { + if (metrics.isSurvival(timestamp, sessionTimeout)) { iterator.remove(); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java index 415371b..e380893 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java @@ -55,10 +55,8 @@ public abstract class PersistenceWorker<INPUT extends StorageData> extends Abstr /** * The persistence process is driven by the {@link org.apache.skywalking.oap.server.core.storage.PersistenceTimer}. * This is a notification method for the worker when every round finished. - * - * @param tookTime The time costs in this round. */ - public abstract void endOfRound(long tookTime); + public abstract void endOfRound(); /** * Prepare the batch persistence, transfer all prepared data to the executable data format based on the storage diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java index 9093a2f..1465bf0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java @@ -86,7 +86,7 @@ public class TopNWorker extends PersistenceWorker<TopN> { * This method used to clear the expired cache, but TopN is not following it. */ @Override - public void endOfRound(long tookTime) { + public void endOfRound() { } @Override diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java index aaafc06..a7e1e89 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java @@ -54,9 +54,8 @@ public enum PersistenceTimer { private HistogramMetrics prepareLatency; private HistogramMetrics executeLatency; private HistogramMetrics allLatency; - private long lastTime = System.currentTimeMillis(); private int syncOperationThreadsNum; - private int maxSyncoperationNum; + private int maxSyncOperationNum; private ExecutorService executorService; private ExecutorService prepareExecutorService; @@ -89,7 +88,7 @@ public enum PersistenceTimer { ); syncOperationThreadsNum = moduleConfig.getSyncThreads(); - maxSyncoperationNum = moduleConfig.getMaxSyncOperationNum(); + maxSyncOperationNum = moduleConfig.getMaxSyncOperationNum(); executorService = Executors.newFixedThreadPool(syncOperationThreadsNum); prepareExecutorService = Executors.newFixedThreadPool(moduleConfig.getPrepareThreads()); if (!isStarted) { @@ -116,7 +115,7 @@ public enum PersistenceTimer { AtomicBoolean stop = new AtomicBoolean(false); DefaultBlockingBatchQueue<PrepareRequest> prepareQueue = new DefaultBlockingBatchQueue( - this.maxSyncoperationNum); + this.maxSyncOperationNum); try { List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>(); persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers()); @@ -142,7 +141,7 @@ public enum PersistenceTimer { // Push the prepared requests into DefaultBlockingBatchQueue, // the executorService consumes from it when it reaches the size of batch. prepareQueue.offer(innerPrepareRequests); - worker.endOfRound(System.currentTimeMillis() - lastTime); + worker.endOfRound(); } finally { timer.finish(); prepareStageCountDownLatch.countDown(); @@ -194,7 +193,6 @@ public enum PersistenceTimer { stop.set(true); allTimer.finish(); - lastTime = System.currentTimeMillis(); } if (debug) {
