This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 3a4ee08 Enhance persistent session timeout mechanism. (#7334)
3a4ee08 is described below
commit 3a4ee08e54bd3f08441f1023fd25442d6a2badde
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Tue Jul 20 07:50:27 2021 +0800
Enhance persistent session timeout mechanism. (#7334)
Fix bug, the enhanced session could cache the metadata metrics(hot entity)
forever. A new timeout mechanism is designed for avoiding this specific case.
Optimize this timeout mechanism, make it different for ES(one index per
day) and non-ES storage implementation.
---
CHANGES.md | 3 ++
.../oap/server/core/CoreModuleProvider.java | 10 +++++
.../analysis/worker/MetricsPersistentWorker.java | 47 ++++++++++++++++++----
.../analysis/worker/MetricsStreamProcessor.java | 25 +++++++++---
.../oap/server/core/storage/IMetricsDAO.java | 18 +++++++++
.../plugin/elasticsearch/base/MetricsEsDAO.java | 25 ++++++++++++
6 files changed, 115 insertions(+), 13 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index c1fce5a..52fd322 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,7 @@ Release Notes.
the master branch codes, please don't use in production environments.
#### Java Agent
+
* Supports modifying span attributes in async mode.
* Agent supports the collection of JVM arguments and jar dependency
information.
* [Temporary] Support authentication for log report channel. This feature and
grpc channel is going to be removed after
@@ -108,6 +109,8 @@ Release Notes.
* Optimization: Concurrency mode of execution stage for metrics is
removed(added in 8.5.0). Only concurrency of prepare
stage is meaningful and kept.
* Fix -meters metrics topic isn't created with namespace issue
+* Enhance persistent session timeout mechanism. Because the enhanced session
could cache the metadata metrics forever,
+ new timeout mechanism is designed for avoiding this specific case.
#### UI
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 31f5112..1be58a3 100755
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -290,10 +290,20 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(
UITemplateManagementService.class, new
UITemplateManagementService(getManager()));
+ if (moduleConfig.getMetricsDataTTL() < 2) {
+ throw new ModuleStartException(
+ "Metric TTL should be at least 2 days, current value is " +
moduleConfig.getMetricsDataTTL());
+ }
+ if (moduleConfig.getRecordDataTTL() < 2) {
+ throw new ModuleStartException(
+ "Record TTL should be at least 2 days, current value is " +
moduleConfig.getRecordDataTTL());
+ }
+
final MetricsStreamProcessor metricsStreamProcessor =
MetricsStreamProcessor.getInstance();
metricsStreamProcessor.setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
metricsStreamProcessor.setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
metricsStreamProcessor.setStorageSessionTimeout(moduleConfig.getStorageSessionTimeout());
+
metricsStreamProcessor.setMetricsDataTTL(moduleConfig.getMetricsDataTTL());
TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
apdexThresholdConfig = new ApdexThresholdConfig(this);
ApdexMetrics.setDICT(apdexThresholdConfig);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 65e31a4..172eb04 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -78,11 +78,15 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
* every {@link #persistentMod} periods. And minute level workers execute
every time.
*/
private int persistentMod;
+ /**
+ * @since 8.7.0 TTL settings from {@link
org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()}
+ */
+ private int metricsDataTTL;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model
model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker,
AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean
enableDatabaseSession, boolean supportUpdate,
- long storageSessionTimeout) {
+ long storageSessionTimeout, int metricsDataTTL) {
super(moduleDefineHolder, new ReadWriteSafeCache<>(new
MergableBufferedData(), new MergableBufferedData()));
this.model = model;
this.context = new HashMap<>(100);
@@ -95,6 +99,7 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
this.sessionTimeout = storageSessionTimeout;
this.persistentCounter = 0;
this.persistentMod = 1;
+ this.metricsDataTTL = metricsDataTTL;
String name = "METRICS_L2_AGGREGATION";
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
@@ -125,11 +130,16 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
/**
* Create the leaf and down-sampling MetricsPersistentWorker, no next step.
*/
- MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model
model, IMetricsDAO metricsDAO,
- boolean enableDatabaseSession, boolean
supportUpdate, long storageSessionTimeout) {
+ MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder,
+ Model model,
+ IMetricsDAO metricsDAO,
+ boolean enableDatabaseSession,
+ boolean supportUpdate,
+ long storageSessionTimeout,
+ int metricsDataTTL) {
this(moduleDefineHolder, model, metricsDAO,
null, null, null,
- enableDatabaseSession, supportUpdate, storageSessionTimeout
+ enableDatabaseSession, supportUpdate, storageSessionTimeout,
metricsDataTTL
);
// For a down-sampling metrics, we prolong the session timeout for 4
times, nearly 5 minutes.
// And add offset according to worker creation sequence, to avoid
context clear overlap,
@@ -246,10 +256,33 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics> {
* Load data from the storage, if {@link #enableDatabaseSession} == true,
only load data when the id doesn't exist.
*/
private void loadFromStorage(List<Metrics> metrics) {
+ final long currentTimeMillis = System.currentTimeMillis();
try {
- List<Metrics> notInCacheMetrics = metrics.stream()
- .filter(m ->
!context.containsKey(m) || !enableDatabaseSession)
-
.collect(Collectors.toList());
+ List<Metrics> notInCacheMetrics =
+ metrics.stream()
+ .filter(m -> {
+ final Metrics cachedValue = context.get(m);
+ // Not cached or session disabled, the metric could
be tagged `not in cache`.
+ if (cachedValue == null || !enableDatabaseSession) {
+ return true;
+ }
+ // The metric is in the cache, but still we have to
check
+ // whether the cache is expired due to TTL.
+ // This is a cache-DB inconsistent case:
+ // Metrics keep coming due to traffic, but the
entity in the
+ // database has been removed due to TTL.
+ if (!model.isTimeRelativeID() && supportUpdate) {
+ // Mostly all updatable metadata level metrics
are required to do this check.
+
+ if (metricsDAO.isExpiredCache(model,
cachedValue, currentTimeMillis, metricsDataTTL)) {
+ // The expired metrics should be tagged
`not in cache` directly.
+ return true;
+ }
+ }
+
+ return false;
+ })
+ .collect(Collectors.toList());
if (notInCacheMetrics.isEmpty()) {
return;
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 6c323f4..1fb1d41 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -87,6 +87,11 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
*/
@Setter
private long storageSessionTimeout = 70_000;
+ /**
+ * @since 8.7.0 TTL settings from {@link
org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()}
+ */
+ @Setter
+ private int metricsDataTTL = 3;
public static MetricsStreamProcessor getInstance() {
return PROCESSOR;
@@ -157,12 +162,16 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
if (supportDownSampling) {
if (configService.shouldToHour()) {
Model model = modelSetter.add(
- metricsClass, stream.getScopeId(), new
Storage(stream.getName(), timeRelativeID, DownSampling.Hour), false);
+ metricsClass, stream.getScopeId(), new
Storage(stream.getName(), timeRelativeID, DownSampling.Hour),
+ false
+ );
hourPersistentWorker = downSamplingWorker(moduleDefineHolder,
metricsDAO, model, supportUpdate);
}
if (configService.shouldToDay()) {
Model model = modelSetter.add(
- metricsClass, stream.getScopeId(), new
Storage(stream.getName(), timeRelativeID, DownSampling.Day), false);
+ metricsClass, stream.getScopeId(), new
Storage(stream.getName(), timeRelativeID, DownSampling.Day),
+ false
+ );
dayPersistentWorker = downSamplingWorker(moduleDefineHolder,
metricsDAO, model, supportUpdate);
}
@@ -171,7 +180,9 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
}
Model model = modelSetter.add(
- metricsClass, stream.getScopeId(), new Storage(stream.getName(),
timeRelativeID, DownSampling.Minute), false);
+ metricsClass, stream.getScopeId(), new Storage(stream.getName(),
timeRelativeID, DownSampling.Minute),
+ false
+ );
MetricsPersistentWorker minutePersistentWorker =
minutePersistentWorker(
moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
@@ -197,8 +208,8 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
MetricsPersistentWorker minutePersistentWorker = new
MetricsPersistentWorker(
- moduleDefineHolder, model, metricsDAO, alarmNotifyWorker,
exportWorker, transWorker, enableDatabaseSession,
- supportUpdate, storageSessionTimeout
+ moduleDefineHolder, model, metricsDAO, alarmNotifyWorker,
exportWorker, transWorker,
+ enableDatabaseSession, supportUpdate, storageSessionTimeout,
metricsDataTTL
);
persistentWorkers.add(minutePersistentWorker);
@@ -210,7 +221,9 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
Model model,
boolean supportUpdate) {
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
- moduleDefineHolder, model, metricsDAO, enableDatabaseSession,
supportUpdate, storageSessionTimeout);
+ moduleDefineHolder, model, metricsDAO,
+ enableDatabaseSession, supportUpdate, storageSessionTimeout,
metricsDataTTL
+ );
persistentWorkers.add(persistentWorker);
return persistentWorker;
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index 2b4c05a..4320a83 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -20,6 +20,8 @@ package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -54,4 +56,20 @@ public interface IMetricsDAO extends DAO {
* executed ASAP.
*/
UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws
IOException;
+
+ /**
+ * Calculate the expired status of the metric by given current timestamp,
metric and TTL.
+ *
+ * @param model of the given cached value
+ * @param cachedValue is a metric instance
+ * @param currentTimeMillis current system time of OAP.
+ * @param ttl from core setting.
+ * @return true if the metric is expired.
+ */
+ default boolean isExpiredCache(Model model, Metrics cachedValue, long
currentTimeMillis, int ttl) {
+ final long metricTimestamp = TimeBucket.getTimestamp(
+ cachedValue.getTimeBucket(), model.getDownsampling());
+ // If the cached metric is older than the TTL indicated.
+ return currentTimeMillis - metricTimestamp >
TimeUnit.DAYS.toMillis(ttl);
+ }
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 0f204c9..3574a81 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -23,7 +23,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
@@ -34,6 +37,7 @@ import
org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.joda.time.DateTime;
import static java.util.stream.Collectors.groupingBy;
@@ -117,4 +121,25 @@ public class MetricsEsDAO extends EsDAO implements
IMetricsDAO {
String id = IndexController.INSTANCE.generateDocId(model,
metrics.id());
return getClient().prepareUpdate(modelName, id, builder);
}
+
+ @Override
+ public boolean isExpiredCache(final Model model,
+ final Metrics cachedValue,
+ final long currentTimeMillis,
+ final int ttl) {
+ final long metricTimestamp = TimeBucket.getTimestamp(
+ cachedValue.getTimeBucket(), model.getDownsampling());
+ // Fast fail check. If the duration is still less than TTL - 1
days(absolute)
+ // the cache should not be expired.
+ if (currentTimeMillis - metricTimestamp < TimeUnit.DAYS.toMillis(ttl -
1)) {
+ return false;
+ }
+ final long deadline = Long.parseLong(new
DateTime(currentTimeMillis).plusDays(-ttl).toString("yyyyMMdd"));
+ final long timeBucket =
TimeBucket.getTimeBucket(cachedValue.getTimeBucket(), DownSampling.Day);
+ // If time bucket is earlier or equals(mostly) the deadline, then the
cached metric is expired.
+ if (timeBucket <= deadline) {
+ return true;
+ }
+ return false;
+ }
}