This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new d8fce12835 Cache enhancement - don't read new metrics from database in
minute (#10085)
d8fce12835 is described below
commit d8fce128353e6728846ed98b26e862c1ded31bfc
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Mon Dec 5 10:37:27 2022 +0800
Cache enhancement - don't read new metrics from database in minute (#10085)
---
docs/en/changes/changes.md | 14 ++++++
.../analysis/worker/MetricsPersistentWorker.java | 57 +++++++++++++++++++++-
.../server/core/status/ServerStatusService.java | 2 +
3 files changed, 72 insertions(+), 1 deletion(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 1286c9129e..bf96fcabe7 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -7,6 +7,20 @@
* Add `ServerStatusService` in the core module to provide a new way to expose
booting status to other modules.
* Adds Micrometer as a new component.(ID=141)
* Refactor session cache in MetricsPersistentWorker.
+* Cache enhancement - don't read new metrics from database in minute
dimensionality.
+```
+ // When
+ // (1) the time bucket of the server's latest stability status is provided
+ // 1.1 the OAP has booted successfully
+ // 1.2 the current dimensionality is in minute.
+ // (2) the metrics are from the time after the timeOfLatestStabilitySts
+ // (3) the metrics don't exist in the cache
+ // the kernel should NOT try to load it from the database.
+ //
+ // Notice, about condition (2),
+ // for the specific minute of booted successfully, the metrics are
expected to load from database when
+ // it doesn't exist in the cache.
+```
#### UI
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 51e9dfd937..169332a168 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -25,11 +25,15 @@ import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import
org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
+import org.apache.skywalking.oap.server.core.status.ServerStatusService;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -89,6 +93,16 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
* @since 8.7.0 TTL settings from {@link
org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()}
*/
private int metricsDataTTL;
+ /**
+ * @since 9.4.0
+ */
+ private final ServerStatusService serverStatusService;
+ /**
+ * The time bucket is 0 or in minute dimensionality of the system in the
latest stability status.
+ *
+ * @since 9.4.0
+ */
+ private long timeOfLatestStabilitySts = 0;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model
model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker,
AbstractWorker<ExportEvent> nextExportWorker,
@@ -145,6 +159,7 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
new MetricsTag.Keys("status"), new MetricsTag.Values("cached")
);
SESSION_TIMEOUT_OFFSITE_COUNTER++;
+ serverStatusService =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServerStatusService.class);
}
/**
@@ -191,6 +206,11 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
return Collections.emptyList();
}
+ if (model.getDownsampling().equals(DownSampling.Minute)) {
+ timeOfLatestStabilitySts = TimeBucket.getMinuteTimeBucket(
+ serverStatusService.getBootingStatus().getUptime());
+ }
+
/*
* Hard coded the max size. This only affect the multiIDRead if the
data doesn't hit the cache.
*/
@@ -299,7 +319,7 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
List<Metrics> notInCacheMetrics =
metrics.stream()
.filter(m -> {
- final Metrics cachedValue = sessionCache.get(m);
+ final Metrics cachedValue =
requireInitialization(m);
// the metric is tagged `not in cache`.
if (cachedValue == null) {
return true;
@@ -340,6 +360,41 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
sessionCache.removeExpired();
}
+ /**
+ * Check the metrics whether in the cache, and whether the worker should
go further to load from database.
+ *
+ * @param metrics the metrics in the streaming process.
+ * @return metrics in cache or null if try to read the metrics from the
database.
+ */
+ private Metrics requireInitialization(Metrics metrics) {
+ final Metrics cached = sessionCache.get(metrics);
+
+ // All cached metrics, it at least had been written once.
+ if (cached != null) {
+ return cached;
+ }
+
+ // When
+ // (1) the time bucket of the server's latest stability status is
provided
+ // 1.1 the OAP has booted successfully
+ // 1.2 the current dimensionality is in minute.
+ // (2) the metrics are from the time after the timeOfLatestStabilitySts
+ // (3) the metrics don't exist in the cache
+ // the kernel should NOT try to load it from the database.
+ //
+ // Notice, about condition (2),
+ // for the specific minute of booted successfully, the metrics are
expected to load from database when
+ // it doesn't exist in the cache.
+ if (timeOfLatestStabilitySts > 0 &&
+ metrics.getTimeBucket() > timeOfLatestStabilitySts
+ && cached == null) {
+ // Return metrics as input to avoid reading from database.
+ return metrics;
+ }
+
+ return null;
+ }
+
/**
* Metrics queue processor, merge the received metrics if existing one
with same ID(s) and time bucket.
*
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java
index 27a3b328df..fac17a1b5d 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.status;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
@@ -35,6 +36,7 @@ import
org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
@RequiredArgsConstructor
public class ServerStatusService implements Service {
private final ModuleManager manager;
+ @Getter
private BootingStatus bootingStatus = new BootingStatus();
public void bootedNow(long uptime) {