This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 2df3c68  Super Size Dataset record index  es rolling step (#5282)
2df3c68 is described below

commit 2df3c683baede7a0b7f24885ce1bbe48b13c7dac
Author: Evan <[email protected]>
AuthorDate: Mon Aug 10 14:49:11 2020 +0800

    Super Size Dataset record index  es rolling step (#5282)
---
 docs/en/setup/backend/backend-storage.md           |  4 ++
 .../src/main/resources/application.yml             |  2 +
 .../StorageModuleElasticsearchConfig.java          |  5 +++
 .../StorageModuleElasticsearchProvider.java        | 49 ++++++++++++----------
 .../plugin/elasticsearch/base/TimeSeriesUtils.java | 11 ++++-
 .../elasticsearch/base/TimeSeriesUtilsTest.java    | 40 ++++++++++++++++++
 .../StorageModuleElasticsearch7Provider.java       |  8 ++++
 7 files changed, 96 insertions(+), 23 deletions(-)

diff --git a/docs/en/setup/backend/backend-storage.md 
b/docs/en/setup/backend/backend-storage.md
index e6dcd47..bd48a56 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -54,6 +54,7 @@ storage:
     trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
     trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
     dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the 
one minute/hour/day index.
+    superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent 
the number of days in the super size dataset record index, the default value is 
the same as dayStep when the value lt 0
     user: ${SW_ES_USER:""}
     password: ${SW_ES_PASSWORD:""}
     secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets 
management file in the properties format includes the username, password, which 
are managed by 3rd party tool.
@@ -105,6 +106,9 @@ Such as, if dayStep == 11,
 1. data in [2000-01-01, 2000-01-11] will be merged into the index-20000101.
 1. data in [2000-01-12, 2000-01-22] will be merged into the index-20000112.
 
+`storage/elasticsearch/superDatasetDayStep` override the 
`storage/elasticsearch/dayStep` if the value is positive.
+This would affect the record related entities, such as the trace segment. In 
some cases, the size of metrics is much less than the record(trace), this would 
help the shards balance in the ElasticSearch cluster.
+ 
 NOTICE, TTL deletion would be affected by these. You should set an extra more 
dayStep in your TTL. Such as you want to TTL == 30 days and dayStep == 10, you 
actually need to set TTL = 40;
 
 ### Secrets Management File Of ElasticSearch Authentication
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml 
b/oap-server/server-bootstrap/src/main/resources/application.yml
index 1bda6a9..b6c585a 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -101,6 +101,7 @@ storage:
     password: ${SW_ES_PASSWORD:""}
     secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets 
management file in the properties format includes the username, password, which 
are managed by 3rd party tool.
     dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the 
one minute/hour/day index.
+    superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent 
the number of days in the super size dataset record index, the default value is 
the same as dayStep when the value is less than 0
     indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number 
of new indexes
     superDatasetIndexShardsFactor: 
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # Super data set has been 
defined in the codes, such as trace segments. This factor provides more shards 
for the super data set, shards number = indexShardsNumber * 
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger 
traces.
     indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
@@ -119,6 +120,7 @@ storage:
     trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
     trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
     dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the 
one minute/hour/day index.
+    superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent 
the number of days in the super size dataset record index, the default value is 
the same as dayStep when the value is less than 0
     user: ${SW_ES_USER:""}
     password: ${SW_ES_PASSWORD:""}
     secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets 
management file in the properties format includes the username, password, which 
are managed by 3rd party tool.
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 6bcb653..4fa466d 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -80,6 +80,11 @@ public class StorageModuleElasticsearchConfig extends 
ModuleConfig {
      */
     @Getter
     private int dayStep = 1;
+    /**
+     * @since 8.2.0, the record day step is for super size dataset record 
index rolling when the value of it is greater than 0
+     */
+    @Getter
+    private int superDatasetDayStep = -1;
     @Setter
     private int resultWindowMaxSize = 10000;
     @Setter
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 75f17f7..030b965 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -114,11 +114,15 @@ public class StorageModuleElasticsearchProvider extends 
ModuleProvider {
         }
         if (config.getDayStep() > 1) {
             TimeSeriesUtils.setDAY_STEP(config.getDayStep());
+            TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getDayStep());
+        }
+        if (config.getSuperDatasetDayStep() > 0) {
+            
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getSuperDatasetDayStep());
         }
 
         if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
             MultipleFilesChangeMonitor monitor = new 
MultipleFilesChangeMonitor(
-                    10, readableContents -> {
+                10, readableContents -> {
                 final byte[] secretsFileContent = readableContents.get(0);
                 if (secretsFileContent == null) {
                     return;
@@ -146,47 +150,50 @@ public class StorageModuleElasticsearchProvider extends 
ModuleProvider {
         }
 
         elasticSearchClient = new ElasticSearchClient(
-                config.getClusterNodes(), config.getProtocol(), 
config.getTrustStorePath(), config
-                .getTrustStorePass(), config.getUser(), config.getPassword(),
-                indexNameConverters(config.getNameSpace())
+            config.getClusterNodes(), config.getProtocol(), 
config.getTrustStorePath(), config
+            .getTrustStorePass(), config.getUser(), config.getPassword(),
+            indexNameConverters(config.getNameSpace())
         );
 
         this.registerServiceImplementation(
-                IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, 
config.getBulkActions(), config
-                        .getFlushInterval(), config.getConcurrentRequests()));
+            IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, 
config.getBulkActions(), config
+                .getFlushInterval(), config.getConcurrentRequests()));
         this.registerServiceImplementation(StorageDAO.class, new 
StorageEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
-                IHistoryDeleteDAO.class, new 
HistoryDeleteEsDAO(elasticSearchClient));
+            IHistoryDeleteDAO.class, new 
HistoryDeleteEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
-                INetworkAddressAliasDAO.class, new 
NetworkAddressAliasEsDAO(elasticSearchClient, config
-                        .getResultWindowMaxSize()));
+            INetworkAddressAliasDAO.class, new 
NetworkAddressAliasEsDAO(elasticSearchClient, config
+                .getResultWindowMaxSize()));
         this.registerServiceImplementation(ITopologyQueryDAO.class, new 
TopologyQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IMetricsQueryDAO.class, new 
MetricsQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
-                ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, 
config.getSegmentQueryMaxSize()));
+            ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, 
config.getSegmentQueryMaxSize()));
         this.registerServiceImplementation(
-                IMetadataQueryDAO.class, new 
MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
+            IMetadataQueryDAO.class, new 
MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
         this.registerServiceImplementation(IAggregationQueryDAO.class, new 
AggregationQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IAlarmQueryDAO.class, new 
AlarmQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new 
TopNRecordsQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(ILogQueryDAO.class, new 
LogQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
-                IProfileTaskQueryDAO.class, new 
ProfileTaskQueryEsDAO(elasticSearchClient, config
-                        .getProfileTaskQueryMaxSize()));
+            IProfileTaskQueryDAO.class, new 
ProfileTaskQueryEsDAO(elasticSearchClient, config
+                .getProfileTaskQueryMaxSize()));
         this.registerServiceImplementation(
-                IProfileTaskLogQueryDAO.class, new 
ProfileTaskLogEsDAO(elasticSearchClient, config
-                        .getProfileTaskQueryMaxSize()));
+            IProfileTaskLogQueryDAO.class, new 
ProfileTaskLogEsDAO(elasticSearchClient, config
+                .getProfileTaskQueryMaxSize()));
         this.registerServiceImplementation(
-                IProfileThreadSnapshotQueryDAO.class, new 
ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
-                        .getProfileTaskQueryMaxSize()));
+            IProfileThreadSnapshotQueryDAO.class, new 
ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
+                .getProfileTaskQueryMaxSize()));
         this.registerServiceImplementation(
-                UITemplateManagementDAO.class, new 
UITemplateManagementEsDAO(elasticSearchClient));
+            UITemplateManagementDAO.class, new 
UITemplateManagementEsDAO(elasticSearchClient));
     }
 
     @Override
     public void start() throws ModuleStartException {
-        MetricsCreator metricCreator = 
getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
-        HealthCheckMetrics healthChecker = 
metricCreator.createHealthCheckerGauge("storage_elasticsearch", 
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+        MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
+                                                   .provider()
+                                                   
.getService(MetricsCreator.class);
+        HealthCheckMetrics healthChecker = 
metricCreator.createHealthCheckerGauge(
+            "storage_elasticsearch", MetricsTag.EMPTY_KEY, 
MetricsTag.EMPTY_VALUE);
         elasticSearchClient.registerChecker(healthChecker);
         try {
             elasticSearchClient.connect();
@@ -204,7 +211,7 @@ public class StorageModuleElasticsearchProvider extends 
ModuleProvider {
 
     @Override
     public String[] requiredModules() {
-        return new String[]{CoreModule.NAME};
+        return new String[] {CoreModule.NAME};
     }
 
     public static List<IndexNameConverter> indexNameConverters(String 
namespace) {
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
index 55c2fc6..6764def 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
@@ -40,13 +40,18 @@ public class TimeSeriesUtils {
     private static final DateTime DAY_ONE = 
TIME_BUCKET_FORMATTER.parseDateTime("20000101");
     @Setter
     private static int DAY_STEP = 1;
+    @Setter
+    private static int SUPER_DATASET_DAY_STEP = 1;
 
     /**
      * @return formatted latest index name, based on current timestamp.
      */
     public static String latestWriteIndexName(Model model) {
         long timeBucket;
-        if (model.isRecord()) {
+        if (model.isRecord() && model.isSuperDataset()) {
+            timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), 
model.getDownsampling());
+            return model.getName() + Const.LINE + 
compressTimeBucket(timeBucket / 1000000, SUPER_DATASET_DAY_STEP);
+        } else if (model.isRecord()) {
             timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), 
model.getDownsampling());
             return model.getName() + Const.LINE + 
compressTimeBucket(timeBucket / 1000000, DAY_STEP);
         } else {
@@ -61,7 +66,9 @@ public class TimeSeriesUtils {
     static String writeIndexName(Model model, long timeBucket) {
         final String modelName = model.getName();
 
-        if (model.isRecord()) {
+        if (model.isRecord() && model.isSuperDataset()) {
+            return modelName + Const.LINE + compressTimeBucket(timeBucket / 
1000000, SUPER_DATASET_DAY_STEP);
+        } else if (model.isRecord()) {
             return modelName + Const.LINE + compressTimeBucket(timeBucket / 
1000000, DAY_STEP);
         } else {
             switch (model.getDownsampling()) {
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
index f46eb13..20442d7 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
@@ -18,12 +18,37 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
 
 import static 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils.compressTimeBucket;
+import static 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils.writeIndexName;
 
 public class TimeSeriesUtilsTest {
+
+    private Model superDatasetModel;
+    private Model normalRecordModel;
+    private Model normalMetricsModel;
+
+    @Before
+    public void prepare() {
+        superDatasetModel = new Model("superDatasetModel", 
Lists.newArrayList(), Lists.newArrayList(),
+                                      0, DownSampling.Minute, true, true
+        );
+        normalRecordModel = new Model("normalRecordModel", 
Lists.newArrayList(), Lists.newArrayList(),
+                                      0, DownSampling.Minute, true, false
+        );
+        normalMetricsModel = new Model("normalMetricsModel", 
Lists.newArrayList(), Lists.newArrayList(),
+                                       0, DownSampling.Minute, false, false
+        );
+        TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
+        TimeSeriesUtils.setDAY_STEP(3);
+    }
+
     @Test
     public void testCompressTimeBucket() {
         Assert.assertEquals(20000101L, compressTimeBucket(20000105, 11));
@@ -33,4 +58,19 @@ public class TimeSeriesUtilsTest {
         Assert.assertEquals(20000123L, compressTimeBucket(20000123, 11));
         Assert.assertEquals(20000123L, compressTimeBucket(20000125, 11));
     }
+
+    @Test
+    public void testIndexRolling() {
+        long secondTimeBucket = 2020_0809_1010_59L;
+        long minuteTimeBucket = 2020_0809_1010L;
+        Assert.assertEquals("superDatasetModel-20200809", 
writeIndexName(superDatasetModel, secondTimeBucket));
+        Assert.assertEquals("normalRecordModel-20200807", 
writeIndexName(normalRecordModel, secondTimeBucket));
+        Assert.assertEquals("normalMetricsModel-20200807", 
writeIndexName(normalMetricsModel, minuteTimeBucket));
+        secondTimeBucket += 1000000;
+        minuteTimeBucket += 10000;
+        Assert.assertEquals("superDatasetModel-20200810", 
writeIndexName(superDatasetModel, secondTimeBucket));
+        Assert.assertEquals("normalRecordModel-20200810", 
writeIndexName(normalRecordModel, secondTimeBucket));
+        Assert.assertEquals("normalMetricsModel-20200810", 
writeIndexName(normalMetricsModel, minuteTimeBucket));
+    }
+
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
index 98da53c..414946f 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
@@ -54,6 +54,7 @@ import 
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti
 import 
org.apache.skywalking.oap.server.library.util.MultipleFilesChangeMonitor;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
+import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressAliasEsDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
@@ -110,6 +111,13 @@ public class StorageModuleElasticsearch7Provider extends 
ModuleProvider {
         if (!StringUtil.isEmpty(config.getNameSpace())) {
             config.setNameSpace(config.getNameSpace().toLowerCase());
         }
+        if (config.getDayStep() > 1) {
+            TimeSeriesUtils.setDAY_STEP(config.getDayStep());
+            TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getDayStep());
+        }
+        if (config.getSuperDatasetDayStep() > 0) {
+            
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getSuperDatasetDayStep());
+        }
         if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
             MultipleFilesChangeMonitor monitor = new 
MultipleFilesChangeMonitor(
                     10, readableContents -> {

Reply via email to