This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 2df3c68 Super Size Dataset record index es rolling step (#5282)
2df3c68 is described below
commit 2df3c683baede7a0b7f24885ce1bbe48b13c7dac
Author: Evan <[email protected]>
AuthorDate: Mon Aug 10 14:49:11 2020 +0800
Super Size Dataset record index es rolling step (#5282)
---
docs/en/setup/backend/backend-storage.md | 4 ++
.../src/main/resources/application.yml | 2 +
.../StorageModuleElasticsearchConfig.java | 5 +++
.../StorageModuleElasticsearchProvider.java | 49 ++++++++++++----------
.../plugin/elasticsearch/base/TimeSeriesUtils.java | 11 ++++-
.../elasticsearch/base/TimeSeriesUtilsTest.java | 40 ++++++++++++++++++
.../StorageModuleElasticsearch7Provider.java | 8 ++++
7 files changed, 96 insertions(+), 23 deletions(-)
diff --git a/docs/en/setup/backend/backend-storage.md
b/docs/en/setup/backend/backend-storage.md
index e6dcd47..bd48a56 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -54,6 +54,7 @@ storage:
trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the
one minute/hour/day index.
+ superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent
the number of days in the super size dataset record index, the default value is
the same as dayStep when the value lt 0
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets
management file in the properties format includes the username, password, which
are managed by 3rd party tool.
@@ -105,6 +106,9 @@ Such as, if dayStep == 11,
1. data in [2000-01-01, 2000-01-11] will be merged into the index-20000101.
1. data in [2000-01-12, 2000-01-22] will be merged into the index-20000112.
+`storage/elasticsearch/superDatasetDayStep` override the
`storage/elasticsearch/dayStep` if the value is positive.
+This would affect the record related entities, such as the trace segment. In
some cases, the size of metrics is much less than the record(trace), this would
help the shards balance in the ElasticSearch cluster.
+
NOTICE, TTL deletion would be affected by these. You should set an extra more
dayStep in your TTL. Such as you want to TTL == 30 days and dayStep == 10, you
actually need to set TTL = 40;
### Secrets Management File Of ElasticSearch Authentication
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml
b/oap-server/server-bootstrap/src/main/resources/application.yml
index 1bda6a9..b6c585a 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -101,6 +101,7 @@ storage:
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets
management file in the properties format includes the username, password, which
are managed by 3rd party tool.
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the
one minute/hour/day index.
+ superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent
the number of days in the super size dataset record index, the default value is
the same as dayStep when the value is less than 0
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number
of new indexes
superDatasetIndexShardsFactor:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # Super data set has been
defined in the codes, such as trace segments. This factor provides more shards
for the super data set, shards number = indexShardsNumber *
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger
traces.
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
@@ -119,6 +120,7 @@ storage:
trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the
one minute/hour/day index.
+ superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent
the number of days in the super size dataset record index, the default value is
the same as dayStep when the value is less than 0
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets
management file in the properties format includes the username, password, which
are managed by 3rd party tool.
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 6bcb653..4fa466d 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -80,6 +80,11 @@ public class StorageModuleElasticsearchConfig extends
ModuleConfig {
*/
@Getter
private int dayStep = 1;
+ /**
+ * @since 8.2.0, the record day step is for super size dataset record
index rolling when the value of it is greater than 0
+ */
+ @Getter
+ private int superDatasetDayStep = -1;
@Setter
private int resultWindowMaxSize = 10000;
@Setter
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 75f17f7..030b965 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -114,11 +114,15 @@ public class StorageModuleElasticsearchProvider extends
ModuleProvider {
}
if (config.getDayStep() > 1) {
TimeSeriesUtils.setDAY_STEP(config.getDayStep());
+ TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getDayStep());
+ }
+ if (config.getSuperDatasetDayStep() > 0) {
+
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getSuperDatasetDayStep());
}
if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
MultipleFilesChangeMonitor monitor = new
MultipleFilesChangeMonitor(
- 10, readableContents -> {
+ 10, readableContents -> {
final byte[] secretsFileContent = readableContents.get(0);
if (secretsFileContent == null) {
return;
@@ -146,47 +150,50 @@ public class StorageModuleElasticsearchProvider extends
ModuleProvider {
}
elasticSearchClient = new ElasticSearchClient(
- config.getClusterNodes(), config.getProtocol(),
config.getTrustStorePath(), config
- .getTrustStorePass(), config.getUser(), config.getPassword(),
- indexNameConverters(config.getNameSpace())
+ config.getClusterNodes(), config.getProtocol(),
config.getTrustStorePath(), config
+ .getTrustStorePass(), config.getUser(), config.getPassword(),
+ indexNameConverters(config.getNameSpace())
);
this.registerServiceImplementation(
- IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient,
config.getBulkActions(), config
- .getFlushInterval(), config.getConcurrentRequests()));
+ IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient,
config.getBulkActions(), config
+ .getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new
StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(
- IHistoryDeleteDAO.class, new
HistoryDeleteEsDAO(elasticSearchClient));
+ IHistoryDeleteDAO.class, new
HistoryDeleteEsDAO(elasticSearchClient));
this.registerServiceImplementation(
- INetworkAddressAliasDAO.class, new
NetworkAddressAliasEsDAO(elasticSearchClient, config
- .getResultWindowMaxSize()));
+ INetworkAddressAliasDAO.class, new
NetworkAddressAliasEsDAO(elasticSearchClient, config
+ .getResultWindowMaxSize()));
this.registerServiceImplementation(ITopologyQueryDAO.class, new
TopologyQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new
MetricsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(
- ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient,
config.getSegmentQueryMaxSize()));
+ ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient,
config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(
- IMetadataQueryDAO.class, new
MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
+ IMetadataQueryDAO.class, new
MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new
AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new
AlarmQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new
TopNRecordsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ILogQueryDAO.class, new
LogQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(
- IProfileTaskQueryDAO.class, new
ProfileTaskQueryEsDAO(elasticSearchClient, config
- .getProfileTaskQueryMaxSize()));
+ IProfileTaskQueryDAO.class, new
ProfileTaskQueryEsDAO(elasticSearchClient, config
+ .getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(
- IProfileTaskLogQueryDAO.class, new
ProfileTaskLogEsDAO(elasticSearchClient, config
- .getProfileTaskQueryMaxSize()));
+ IProfileTaskLogQueryDAO.class, new
ProfileTaskLogEsDAO(elasticSearchClient, config
+ .getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(
- IProfileThreadSnapshotQueryDAO.class, new
ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
- .getProfileTaskQueryMaxSize()));
+ IProfileThreadSnapshotQueryDAO.class, new
ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
+ .getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(
- UITemplateManagementDAO.class, new
UITemplateManagementEsDAO(elasticSearchClient));
+ UITemplateManagementDAO.class, new
UITemplateManagementEsDAO(elasticSearchClient));
}
@Override
public void start() throws ModuleStartException {
- MetricsCreator metricCreator =
getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
- HealthCheckMetrics healthChecker =
metricCreator.createHealthCheckerGauge("storage_elasticsearch",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
+ .provider()
+
.getService(MetricsCreator.class);
+ HealthCheckMetrics healthChecker =
metricCreator.createHealthCheckerGauge(
+ "storage_elasticsearch", MetricsTag.EMPTY_KEY,
MetricsTag.EMPTY_VALUE);
elasticSearchClient.registerChecker(healthChecker);
try {
elasticSearchClient.connect();
@@ -204,7 +211,7 @@ public class StorageModuleElasticsearchProvider extends
ModuleProvider {
@Override
public String[] requiredModules() {
- return new String[]{CoreModule.NAME};
+ return new String[] {CoreModule.NAME};
}
public static List<IndexNameConverter> indexNameConverters(String
namespace) {
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
index 55c2fc6..6764def 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
@@ -40,13 +40,18 @@ public class TimeSeriesUtils {
private static final DateTime DAY_ONE =
TIME_BUCKET_FORMATTER.parseDateTime("20000101");
@Setter
private static int DAY_STEP = 1;
+ @Setter
+ private static int SUPER_DATASET_DAY_STEP = 1;
/**
* @return formatted latest index name, based on current timestamp.
*/
public static String latestWriteIndexName(Model model) {
long timeBucket;
- if (model.isRecord()) {
+ if (model.isRecord() && model.isSuperDataset()) {
+ timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(),
model.getDownsampling());
+ return model.getName() + Const.LINE +
compressTimeBucket(timeBucket / 1000000, SUPER_DATASET_DAY_STEP);
+ } else if (model.isRecord()) {
timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(),
model.getDownsampling());
return model.getName() + Const.LINE +
compressTimeBucket(timeBucket / 1000000, DAY_STEP);
} else {
@@ -61,7 +66,9 @@ public class TimeSeriesUtils {
static String writeIndexName(Model model, long timeBucket) {
final String modelName = model.getName();
- if (model.isRecord()) {
+ if (model.isRecord() && model.isSuperDataset()) {
+ return modelName + Const.LINE + compressTimeBucket(timeBucket /
1000000, SUPER_DATASET_DAY_STEP);
+ } else if (model.isRecord()) {
return modelName + Const.LINE + compressTimeBucket(timeBucket /
1000000, DAY_STEP);
} else {
switch (model.getDownsampling()) {
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
index f46eb13..20442d7 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
@@ -18,12 +18,37 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
import static
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils.compressTimeBucket;
+import static
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils.writeIndexName;
public class TimeSeriesUtilsTest {
+
+ private Model superDatasetModel;
+ private Model normalRecordModel;
+ private Model normalMetricsModel;
+
+ @Before
+ public void prepare() {
+ superDatasetModel = new Model("superDatasetModel",
Lists.newArrayList(), Lists.newArrayList(),
+ 0, DownSampling.Minute, true, true
+ );
+ normalRecordModel = new Model("normalRecordModel",
Lists.newArrayList(), Lists.newArrayList(),
+ 0, DownSampling.Minute, true, false
+ );
+ normalMetricsModel = new Model("normalMetricsModel",
Lists.newArrayList(), Lists.newArrayList(),
+ 0, DownSampling.Minute, false, false
+ );
+ TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
+ TimeSeriesUtils.setDAY_STEP(3);
+ }
+
@Test
public void testCompressTimeBucket() {
Assert.assertEquals(20000101L, compressTimeBucket(20000105, 11));
@@ -33,4 +58,19 @@ public class TimeSeriesUtilsTest {
Assert.assertEquals(20000123L, compressTimeBucket(20000123, 11));
Assert.assertEquals(20000123L, compressTimeBucket(20000125, 11));
}
+
+ @Test
+ public void testIndexRolling() {
+ long secondTimeBucket = 2020_0809_1010_59L;
+ long minuteTimeBucket = 2020_0809_1010L;
+ Assert.assertEquals("superDatasetModel-20200809",
writeIndexName(superDatasetModel, secondTimeBucket));
+ Assert.assertEquals("normalRecordModel-20200807",
writeIndexName(normalRecordModel, secondTimeBucket));
+ Assert.assertEquals("normalMetricsModel-20200807",
writeIndexName(normalMetricsModel, minuteTimeBucket));
+ secondTimeBucket += 1000000;
+ minuteTimeBucket += 10000;
+ Assert.assertEquals("superDatasetModel-20200810",
writeIndexName(superDatasetModel, secondTimeBucket));
+ Assert.assertEquals("normalRecordModel-20200810",
writeIndexName(normalRecordModel, secondTimeBucket));
+ Assert.assertEquals("normalMetricsModel-20200810",
writeIndexName(normalMetricsModel, minuteTimeBucket));
+ }
+
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
index 98da53c..414946f 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
@@ -54,6 +54,7 @@ import
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti
import
org.apache.skywalking.oap.server.library.util.MultipleFilesChangeMonitor;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressAliasEsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
@@ -110,6 +111,13 @@ public class StorageModuleElasticsearch7Provider extends
ModuleProvider {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
+ if (config.getDayStep() > 1) {
+ TimeSeriesUtils.setDAY_STEP(config.getDayStep());
+ TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getDayStep());
+ }
+ if (config.getSuperDatasetDayStep() > 0) {
+
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getSuperDatasetDayStep());
+ }
if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
MultipleFilesChangeMonitor monitor = new
MultipleFilesChangeMonitor(
10, readableContents -> {