This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch refator in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit a7662fdfc33794fc744e9d3842c1038048af214b Author: Wu Sheng <[email protected]> AuthorDate: Sun Dec 4 21:16:33 2022 +0800 Refactor session cache in MetricsPersistentWorker. --- docs/en/changes/changes.md | 1 + .../analysis/worker/MetricsPersistentWorker.java | 35 ++-------- .../core/analysis/worker/MetricsSessionCache.java | 81 ++++++++++++++++++++++ .../server/core/storage/SessionCacheCallback.java | 5 +- 4 files changed, 90 insertions(+), 32 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index b73f770b4c..1286c9129e 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -6,6 +6,7 @@ * Add `ServerStatusService` in the core module to provide a new way to expose booting status to other modules. * Adds Micrometer as a new component.(ID=141) +* Refactor session cache in MetricsPersistentWorker. #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 8ab2e69777..263ba36eff 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -61,25 +61,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { private static long SESSION_TIMEOUT_OFFSITE_COUNTER = 0; private final Model model; - /** - * The session cache holds the latest metrics in-memory. - * There are two ways to make sure metrics in-cache, - * 1. Metrics is read from the Database through {@link #loadFromStorage(List)} - * 2. The built {@link InsertRequest} executed successfully. - * - * There are two cases to remove metrics from the cache. - * 1. The metrics expired. - * 2. The built {@link UpdateRequest} executed failure, which could be caused - * (1) Database error. (2) No data updated, such as the counter of update statement is 0 in JDBC. - */ - private final Map<Metrics, Metrics> sessionCache; + private final MetricsSessionCache sessionCache; private final IMetricsDAO metricsDAO; private final Optional<AbstractWorker<Metrics>> nextAlarmWorker; private final Optional<AbstractWorker<ExportEvent>> nextExportWorker; private final DataCarrier<Metrics> dataCarrier; private final Optional<MetricsTransWorker> transWorker; private final boolean supportUpdate; - private long sessionTimeout; /** * The counter of L2 aggregation. */ @@ -113,17 +101,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) { super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData())); this.model = model; - // Due to the cache would be updated depending on final storage implementation, - // the map/cache could be updated concurrently. - // Set to ConcurrentHashMap in order to avoid HashMap deadlock. - // Since 9.3.0 - this.sessionCache = new ConcurrentHashMap<>(100); + this.sessionCache = new MetricsSessionCache(storageSessionTimeout); this.metricsDAO = metricsDAO; this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker); this.nextExportWorker = Optional.ofNullable(nextExportWorker); this.transWorker = Optional.ofNullable(transWorker); this.supportUpdate = supportUpdate; - this.sessionTimeout = storageSessionTimeout; this.persistentCounter = 0; this.persistentMod = 1; this.metricsDataTTL = metricsDataTTL; @@ -186,7 +169,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { // For a down-sampling metrics, we prolong the session timeout for 4 times, nearly 5 minutes. // And add offset according to worker creation sequence, to avoid context clear overlap, // eventually optimize load of IDs reading. - this.sessionTimeout = this.sessionTimeout * 4 + SESSION_TIMEOUT_OFFSITE_COUNTER * 200; + sessionCache.setTimeoutThreshold(storageSessionTimeout * 4 + SESSION_TIMEOUT_OFFSITE_COUNTER * 200); // The down sampling level worker executes every 4 periods. this.persistentMod = 4; } @@ -351,7 +334,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { return; } - metricsDAO.multiGet(model, notInCacheMetrics).forEach(m -> sessionCache.put(m, m)); + metricsDAO.multiGet(model, notInCacheMetrics).forEach(m -> sessionCache.put(m)); } catch (final Exception e) { log.error("Failed to load metrics for merging", e); } @@ -359,15 +342,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { @Override public void endOfRound() { - Iterator<Metrics> iterator = sessionCache.values().iterator(); - long timestamp = System.currentTimeMillis(); - while (iterator.hasNext()) { - Metrics metrics = iterator.next(); - - if (metrics.isExpired(timestamp, sessionTimeout)) { - iterator.remove(); - } - } + sessionCache.removeExpired(); } /** diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java new file mode 100644 index 0000000000..7d0cc7e7b1 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import lombok.AccessLevel; +import lombok.Setter; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; + +/** + * MetricsSessionCache is a key-value cache to hold hot metric in-memory to reduce payload to pre-read. + * + * There are two ways to make sure metrics in-cache, + * 1. Metrics is read from the Database through {@link MetricsPersistentWorker}.loadFromStorage + * 2. The built {@link InsertRequest} executed successfully. + * + * There are two cases to remove metrics from the cache. + * 1. The metrics expired. + * 2. The built {@link UpdateRequest} executed failure, which could be caused + * (1) Database error. (2) No data updated, such as the counter of update statement is 0 in JDBC. + * + * @since 9.4.0 Created this from MetricsPersistentWorker.sessionCache. + */ +public class MetricsSessionCache { + private final Map<Metrics, Metrics> sessionCache; + @Setter(AccessLevel.PACKAGE) + private long timeoutThreshold; + + public MetricsSessionCache(long timeoutThreshold) { + // Due to the cache would be updated depending on final storage implementation, + // the map/cache could be updated concurrently. + // Set to ConcurrentHashMap in order to avoid HashMap deadlock. + // Since 9.3.0 + this.sessionCache = new ConcurrentHashMap<>(100); + this.timeoutThreshold = timeoutThreshold; + } + + Metrics get(Metrics metrics) { + return sessionCache.get(metrics); + } + + public Metrics remove(Metrics metrics) { + return sessionCache.remove(metrics); + } + + public void put(Metrics metrics) { + sessionCache.put(metrics, metrics); + } + + void removeExpired(){ + Iterator<Metrics> iterator = sessionCache.values().iterator(); + long timestamp = System.currentTimeMillis(); + while (iterator.hasNext()) { + Metrics metrics = iterator.next(); + + if (metrics.isExpired(timestamp, timeoutThreshold)) { + iterator.remove(); + } + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java index e417b71529..fb2ae0d67e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java @@ -21,13 +21,14 @@ package org.apache.skywalking.oap.server.core.storage; import java.util.Map; import lombok.RequiredArgsConstructor; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.analysis.worker.MetricsSessionCache; /** * SessionCacheCallback provides a bridge for storage implementations */ @RequiredArgsConstructor public class SessionCacheCallback { - private final Map<Metrics, Metrics> sessionCache; + private final MetricsSessionCache sessionCache; private final Metrics metrics; /** * In some cases, this callback could be shared by multiple executions, such as SQLExecutor#additionalSQLs. @@ -40,7 +41,7 @@ public class SessionCacheCallback { if (isFailed) { return; } - sessionCache.put(metrics, metrics); + sessionCache.put(metrics); } public void onUpdateFailure() {
