This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch enhance-cache in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 16c4bde932e8dd45423f46bc8b7cdf1879152529 Author: Wu Sheng <[email protected]> AuthorDate: Mon Dec 5 08:45:49 2022 +0800 Cache enhancement - don't read new metrics from database in minute dimensionality --- docs/en/changes/changes.md | 14 +++++ .../analysis/worker/MetricsPersistentWorker.java | 59 +++++++++++++++++++++- .../server/core/status/ServerStatusService.java | 2 + 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 1286c9129e..bf96fcabe7 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -7,6 +7,20 @@ * Add `ServerStatusService` in the core module to provide a new way to expose booting status to other modules. * Adds Micrometer as a new component.(ID=141) * Refactor session cache in MetricsPersistentWorker. +* Cache enhancement - don't read new metrics from database in minute dimensionality. +``` + // When + // (1) the time bucket of the server's latest stability status is provided + // 1.1 the OAP has booted successfully + // 1.2 the current dimensionality is in minute. + // (2) the metrics are from the time after the timeOfLatestStabilitySts + // (3) the metrics don't exist in the cache + // the kernel should NOT try to load it from the database. + // + // Notice, about condition (2), + // for the specific minute of booted successfully, the metrics are expected to load from database when + // it doesn't exist in the cache. +``` #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 51e9dfd937..195c4ab7ae 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -25,11 +25,15 @@ import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.analysis.DownSampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData; import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.exporter.ExportEvent; +import org.apache.skywalking.oap.server.core.status.ServerStatusService; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.core.storage.model.Model; @@ -89,6 +93,16 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { * @since 8.7.0 TTL settings from {@link org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()} */ private int metricsDataTTL; + /** + * @since 9.4.0 + */ + private final ServerStatusService serverStatusService; + /** + * The time bucket is 0 or in minute dimensionality of the system in the latest stability status. + * + * @since 9.4.0 + */ + private long timeOfLatestStabilitySts = 0; MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker, @@ -145,6 +159,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { new MetricsTag.Keys("status"), new MetricsTag.Values("cached") ); SESSION_TIMEOUT_OFFSITE_COUNTER++; + serverStatusService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServerStatusService.class); } /** @@ -165,6 +180,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { // And add offset according to worker creation sequence, to avoid context clear overlap, // eventually optimize load of IDs reading. sessionCache.setTimeoutThreshold(storageSessionTimeout * 4 + SESSION_TIMEOUT_OFFSITE_COUNTER * 200); + // Set cache mode in normal mode for high dimensionality metrics, such as hour/day metrics. // The down sampling level worker executes every 4 periods. this.persistentMod = 4; } @@ -191,6 +207,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { return Collections.emptyList(); } + if (model.getDownsampling().equals(DownSampling.Minute)) { + timeOfLatestStabilitySts = TimeBucket.getMinuteTimeBucket( + serverStatusService.getBootingStatus().getUptime()); + } + /* * Hard coded the max size. This only affect the multiIDRead if the data doesn't hit the cache. */ @@ -299,7 +320,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { List<Metrics> notInCacheMetrics = metrics.stream() .filter(m -> { - final Metrics cachedValue = sessionCache.get(m); + final Metrics cachedValue = requireInitialization(m); // the metric is tagged `not in cache`. if (cachedValue == null) { return true; @@ -340,6 +361,42 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { sessionCache.removeExpired(); } + /** + * Check the metrics whether in the cache, and whether the worker should go further to load from database. + * + * @param metrics the metrics in the streaming process. + * @return metrics in cache or null if try to read the metrics from the database. + */ + private Metrics requireInitialization(Metrics metrics) { + final Metrics cached = sessionCache.get(metrics); + + // All cached metrics, it at least had been written once. + if (cached != null) { + return cached; + } + + // When + // (1) the time bucket of the server's latest stability status is provided + // 1.1 the OAP has booted successfully + // 1.2 the current dimensionality is in minute. + // (2) the metrics are from the time after the timeOfLatestStabilitySts + // (3) the metrics don't exist in the cache + // the kernel should NOT try to load it from the database. + // + // Notice, about condition (2), + // for the specific minute of booted successfully, the metrics are expected to load from database when + // it doesn't exist in the cache. + if (timeOfLatestStabilitySts > 0 && + metrics.getTimeBucket() > timeOfLatestStabilitySts + && cached == null) { + // Return metrics as input to avoid reading from database. + sessionCache.put(metrics); + return metrics; + } + + return null; + } + /** * Metrics queue processor, merge the received metrics if existing one with same ID(s) and time bucket. * diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java index 27a3b328df..fac17a1b5d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.core.status; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.Service; @@ -35,6 +36,7 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; @RequiredArgsConstructor public class ServerStatusService implements Service { private final ModuleManager manager; + @Getter private BootingStatus bootingStatus = new BootingStatus(); public void bootedNow(long uptime) {
