This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 8c31172c3c Refactor session cache in MetricsPersistentWorker. (#10084)
8c31172c3c is described below
commit 8c31172c3c1f4adb2959f03fa694e6146fbaf703
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Sun Dec 4 22:41:57 2022 +0800
Refactor session cache in MetricsPersistentWorker. (#10084)
---
docs/en/changes/changes.md | 1 +
.../analysis/worker/MetricsPersistentWorker.java | 40 ++---------
.../core/analysis/worker/MetricsSessionCache.java | 81 ++++++++++++++++++++++
.../server/core/storage/SessionCacheCallback.java | 6 +-
4 files changed, 90 insertions(+), 38 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index b73f770b4c..1286c9129e 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -6,6 +6,7 @@
* Add `ServerStatusService` in the core module to provide a new way to expose
booting status to other modules.
* Adds Micrometer as a new component.(ID=141)
+* Refactor session cache in MetricsPersistentWorker.
#### UI
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 8ab2e69777..51e9dfd937 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -20,12 +20,9 @@ package
org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.UnexpectedException;
@@ -37,9 +34,7 @@ import
org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
-import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import
org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import
org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
@@ -61,25 +56,13 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
private static long SESSION_TIMEOUT_OFFSITE_COUNTER = 0;
private final Model model;
- /**
- * The session cache holds the latest metrics in-memory.
- * There are two ways to make sure metrics in-cache,
- * 1. Metrics is read from the Database through {@link
#loadFromStorage(List)}
- * 2. The built {@link InsertRequest} executed successfully.
- *
- * There are two cases to remove metrics from the cache.
- * 1. The metrics expired.
- * 2. The built {@link UpdateRequest} executed failure, which could be
caused
- * (1) Database error. (2) No data updated, such as the counter of update
statement is 0 in JDBC.
- */
- private final Map<Metrics, Metrics> sessionCache;
+ private final MetricsSessionCache sessionCache;
private final IMetricsDAO metricsDAO;
private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
private final Optional<MetricsTransWorker> transWorker;
private final boolean supportUpdate;
- private long sessionTimeout;
/**
* The counter of L2 aggregation.
*/
@@ -113,17 +96,12 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
long storageSessionTimeout, int metricsDataTTL,
MetricStreamKind kind) {
super(moduleDefineHolder, new ReadWriteSafeCache<>(new
MergableBufferedData(), new MergableBufferedData()));
this.model = model;
- // Due to the cache would be updated depending on final storage
implementation,
- // the map/cache could be updated concurrently.
- // Set to ConcurrentHashMap in order to avoid HashMap deadlock.
- // Since 9.3.0
- this.sessionCache = new ConcurrentHashMap<>(100);
+ this.sessionCache = new MetricsSessionCache(storageSessionTimeout);
this.metricsDAO = metricsDAO;
this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
this.nextExportWorker = Optional.ofNullable(nextExportWorker);
this.transWorker = Optional.ofNullable(transWorker);
this.supportUpdate = supportUpdate;
- this.sessionTimeout = storageSessionTimeout;
this.persistentCounter = 0;
this.persistentMod = 1;
this.metricsDataTTL = metricsDataTTL;
@@ -186,7 +164,7 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
// For a down-sampling metrics, we prolong the session timeout for 4
times, nearly 5 minutes.
// And add offset according to worker creation sequence, to avoid
context clear overlap,
// eventually optimize load of IDs reading.
- this.sessionTimeout = this.sessionTimeout * 4 +
SESSION_TIMEOUT_OFFSITE_COUNTER * 200;
+ sessionCache.setTimeoutThreshold(storageSessionTimeout * 4 +
SESSION_TIMEOUT_OFFSITE_COUNTER * 200);
// The down sampling level worker executes every 4 periods.
this.persistentMod = 4;
}
@@ -351,7 +329,7 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
return;
}
- metricsDAO.multiGet(model, notInCacheMetrics).forEach(m ->
sessionCache.put(m, m));
+ metricsDAO.multiGet(model, notInCacheMetrics).forEach(m ->
sessionCache.put(m));
} catch (final Exception e) {
log.error("Failed to load metrics for merging", e);
}
@@ -359,15 +337,7 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
@Override
public void endOfRound() {
- Iterator<Metrics> iterator = sessionCache.values().iterator();
- long timestamp = System.currentTimeMillis();
- while (iterator.hasNext()) {
- Metrics metrics = iterator.next();
-
- if (metrics.isExpired(timestamp, sessionTimeout)) {
- iterator.remove();
- }
- }
+ sessionCache.removeExpired();
}
/**
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java
new file mode 100644
index 0000000000..93b1e421cf
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.AccessLevel;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+/**
+ * MetricsSessionCache is a key-value cache to hold hot metric in-memory to
reduce payload to pre-read.
+ *
+ * There are two ways to make sure metrics in-cache,
+ * 1. Metrics is read from the Database through {@link
MetricsPersistentWorker}.loadFromStorage
+ * 2. The built {@link InsertRequest} executed successfully.
+ *
+ * There are two cases to remove metrics from the cache.
+ * 1. The metrics expired.
+ * 2. The built {@link UpdateRequest} executed failure, which could be caused
+ * (1) Database error. (2) No data updated, such as the counter of update
statement is 0 in JDBC.
+ *
+ * @since 9.4.0 Created this from MetricsPersistentWorker.sessionCache.
+ */
+public class MetricsSessionCache {
+ private final Map<Metrics, Metrics> sessionCache;
+ @Setter(AccessLevel.PACKAGE)
+ private long timeoutThreshold;
+
+ public MetricsSessionCache(long timeoutThreshold) {
+ // Due to the cache would be updated depending on final storage
implementation,
+ // the map/cache could be updated concurrently.
+ // Set to ConcurrentHashMap in order to avoid HashMap deadlock.
+ // Since 9.3.0
+ this.sessionCache = new ConcurrentHashMap<>(100);
+ this.timeoutThreshold = timeoutThreshold;
+ }
+
+ Metrics get(Metrics metrics) {
+ return sessionCache.get(metrics);
+ }
+
+ public Metrics remove(Metrics metrics) {
+ return sessionCache.remove(metrics);
+ }
+
+ public void put(Metrics metrics) {
+ sessionCache.put(metrics, metrics);
+ }
+
+ void removeExpired() {
+ Iterator<Metrics> iterator = sessionCache.values().iterator();
+ long timestamp = System.currentTimeMillis();
+ while (iterator.hasNext()) {
+ Metrics metrics = iterator.next();
+
+ if (metrics.isExpired(timestamp, timeoutThreshold)) {
+ iterator.remove();
+ }
+ }
+ }
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
index e417b71529..ceff064e05 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.oap.server.core.storage;
-import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import
org.apache.skywalking.oap.server.core.analysis.worker.MetricsSessionCache;
/**
* SessionCacheCallback provides a bridge for storage implementations
*/
@RequiredArgsConstructor
public class SessionCacheCallback {
- private final Map<Metrics, Metrics> sessionCache;
+ private final MetricsSessionCache sessionCache;
private final Metrics metrics;
/**
* In some cases, this callback could be shared by multiple executions,
such as SQLExecutor#additionalSQLs.
@@ -40,7 +40,7 @@ public class SessionCacheCallback {
if (isFailed) {
return;
}
- sessionCache.put(metrics, metrics);
+ sessionCache.put(metrics);
}
public void onUpdateFailure() {