This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 48a0985345 Elasticsearch storage: support specify the settings
`(number_of_shards/number_of_replicas)` for each index individually. (#9914)
48a0985345 is described below
commit 48a0985345bc65a63b17d4539f80de334383dedb
Author: Wan Kai <[email protected]>
AuthorDate: Mon Nov 7 21:16:36 2022 +0800
Elasticsearch storage: support specify the settings
`(number_of_shards/number_of_replicas)` for each index individually. (#9914)
* Elasticsearch storage: Provide system environment
variable(`SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS`) and support specify the
settings `(number_of_shards/number_of_replicas)` for each index individually.
* Elasticsearch storage: Support update index settings
`(number_of_shards/number_of_replicas)` for the index template after
re-configured.
---
docs/en/changes/changes.md | 2 +
docs/en/setup/backend/backend-storage.md | 66 +++++++++++++++-
docs/en/setup/backend/configuration-vocabulary.md | 1 +
.../src/main/resources/application.yml | 5 +-
.../StorageModuleElasticsearchConfig.java | 8 ++
.../plugin/elasticsearch/base/IndexStructures.java | 82 ++++++++++++++-----
.../elasticsearch/base/StorageEsInstaller.java | 91 +++++++++++++++-------
.../elasticsearch/base/IndexStructuresTest.java | 73 ++++++++++++-----
.../elasticsearch/base/MockEsInstallTest.java | 10 +--
9 files changed, 262 insertions(+), 76 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 4737bdf843..14f2ae395d 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -96,6 +96,8 @@
* Optimize data binary parse methods in *LogQueryDAO
* Support different indexType
* Support configuration for TTL and (block|segment) intervals
+* Elasticsearch storage: Provide system environment
variable(`SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS`) and support specify the
settings `(number_of_shards/number_of_replicas)` for each index individually.
+* Elasticsearch storage: Support update index settings
`(number_of_shards/number_of_replicas)` for the index template after rebooting.
* Optimize MQ Topology analysis. Use entry span's peer from the consumer side
as source service when no producer instrumentation(no cross-process reference).
* Refactor JDBC storage implementations to reuse logics.
* Fix `ClassCastException` in `LoggingConfigWatcher`.
diff --git a/docs/en/setup/backend/backend-storage.md
b/docs/en/setup/backend/backend-storage.md
index e9a5100024..074a4c3313 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -80,9 +80,12 @@ storage:
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the
one minute/hour/day index.
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number
of new indexes
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:1} # Replicas
number of new indexes
+ # Specify the settings for each index individually.
+ # If configured, this setting has the highest priority and overrides the
generic settings.
+ specificIndexSettings: ${SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS:""}
# Super data set has been defined in the codes, such as trace segments.The
following 3 config would be improve es performance when storage super size data
in es.
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent
the number of days in the super size dataset record index, the default value is
the same as dayStep when the value is less than 0
- superDatasetIndexShardsFactor:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides
more shards for the super data set, shards number = indexShardsNumber *
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger
traces.
+ superDatasetIndexShardsFactor:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides
more shards for the super data set, shards number = indexShardsNumber *
superDatasetIndexShardsFactor. Also, this factor effects Zipkin traces.
superDatasetIndexReplicasNumber:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas
number in the super size dataset record index, the default value is 0.
indexTemplateOrder: ${SW_STORAGE_ES_INDEX_TEMPLATE_ORDER:0} # the order of
index template
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
@@ -151,7 +154,33 @@ Once it is changed manually or through a 3rd party tool,
such as [Vault](https:/
the storage provider will use the new username, password, and JKS password to
establish the connection and close the old one. If the information exists in
the file,
the `user/password` will be overridden.
-### Advanced Configurations For Elasticsearch Index
+
+### Index Settings
+The following settings control the number of shards and replicas for new and
existing index templates. The update only got applied after OAP reboots.
+```yaml
+storage:
+ elasticsearch:
+ # ......
+ indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1}
+ indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:1}
+ specificIndexSettings: ${SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS:""}
+ superDatasetIndexShardsFactor:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5}
+ superDatasetIndexReplicasNumber:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0}
+```
+The following table shows the relationship between those config items and
Elasticsearch `index number_of_shards/number_of_replicas`.
+And also you can [specify the settings for each index
individually.](#specify-settings-for-each-elasticsearch-index-individually)
+
+| index | number_of_shards | number_of_replicas
|
+|--------------------------------------|------------------|----------------------|
+| sw_ui_template | indexShardsNumber |
indexReplicasNumber |
+| sw_metrics-all-`${day-format}` | indexShardsNumber |
indexReplicasNumber |
+| sw_log-`${day-format}` | indexShardsNumber *
superDatasetIndexShardsFactor | superDatasetIndexReplicasNumber |
+| sw_segment-`${day-format}` | indexShardsNumber *
superDatasetIndexShardsFactor | superDatasetIndexReplicasNumber |
+| sw_browser_error_log-`${day-format}` | indexShardsNumber *
superDatasetIndexShardsFactor | superDatasetIndexReplicasNumber |
+| sw_zipkin_span-`${day-format}` | indexShardsNumber *
superDatasetIndexShardsFactor | superDatasetIndexReplicasNumber |
+| sw_records-all-`${day-format}` | indexShardsNumber |
indexReplicasNumber |
+
+#### Advanced Configurations For Elasticsearch Index
You can add advanced configurations in `JSON` format to set `ElasticSearch
index settings` by following [ElasticSearch
doc](https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html)
For example, set
[translog](https://www.elastic.co/guide/en/elasticsearch/reference/master/index-modules-translog.html)
settings:
@@ -163,6 +192,39 @@ storage:
advanced:
${SW_STORAGE_ES_ADVANCED:"{\"index.translog.durability\":\"request\",\"index.translog.sync_interval\":\"5s\"}"}
```
+#### Specify Settings For Each Elasticsearch Index Individually
+You can specify the settings for one or more indexes individually by using
`SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS`.
+
+**NOTE:**
+Supported settings:
+- number_of_shards
+- number_of_replicas
+
+**NOTE:** These settings have the highest priority and will override the
existing
+generic settings mentioned in [index settings doc](#index-settings).
+
+The settings are in `JSON` format. The index name here is logic entity name,
which should exclude the `${SW_NAMESPACE}` which is `sw` by default, e.g.
+```json
+{
+ "metrics-all":{
+ "number_of_shards":"3",
+ "number_of_replicas":"2"
+ },
+ "segment":{
+ "number_of_shards":"6",
+ "number_of_replicas":"1"
+ }
+}
+```
+
+This configuration in the YAML file is like this,
+```yaml
+storage:
+ elasticsearch:
+ # ......
+ specificIndexSettings:
${SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS:"{\"metrics-all\":{\"number_of_shards\":\"3\",\"number_of_replicas\":\"2\"},\"segment\":{\"number_of_shards\":\"6\",\"number_of_replicas\":\"1\"}}"}
+```
+
### Recommended ElasticSearch server-side configurations
You could add the following configuration to `elasticsearch.yml`, and set the
value based on your environment.
diff --git a/docs/en/setup/backend/configuration-vocabulary.md
b/docs/en/setup/backend/configuration-vocabulary.md
index c3940428b5..f5ebb4c2c5 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -100,6 +100,7 @@ The Configuration Vocabulary lists all available
configurations provided by `app
| - | - | dayStep
| Represents the number
of days in the one-minute/hour/day index.
[...]
| - | - | indexShardsNumber
| Shard number of new
indexes.
[...]
| - | - | indexReplicasNumber
| Replicas number of new
indexes.
[...]
+| - | - | specificIndexSettings
| Specify the settings
for each index individually. If configured, this setting has the highest
priority and overrides the generic settings.
[...]
| - | - | superDatasetDayStep
| Represents the number
of days in the super size dataset record index. Default value is the same as
dayStep when the value is less than 0.
[...]
| - | - | superDatasetIndexShardsFactor
| Super dataset is
defined in the code (e.g. trace segments). This factor provides more shards for
the super dataset: shards number = indexShardsNumber *
superDatasetIndexShardsFactor. This factor also affects Zipkin and Jaeger
traces. [...]
| - | - | superDatasetIndexReplicasNumber
| Represents the replicas
number in the super size dataset record index.
[...]
diff --git a/oap-server/server-starter/src/main/resources/application.yml
b/oap-server/server-starter/src/main/resources/application.yml
index f9d273659c..aa1cf0493e 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -150,9 +150,12 @@ storage:
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the
one minute/hour/day index.
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number
of new indexes
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:1} # Replicas
number of new indexes
+ # Specify the settings for each index individually.
+ # If configured, this setting has the highest priority and overrides the
generic settings.
+ specificIndexSettings: ${SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS:""}
# Super data set has been defined in the codes, such as trace segments.The
following 3 config would be improve es performance when storage super size data
in es.
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent
the number of days in the super size dataset record index, the default value is
the same as dayStep when the value is less than 0
- superDatasetIndexShardsFactor:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides
more shards for the super data set, shards number = indexShardsNumber *
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger
traces.
+ superDatasetIndexShardsFactor:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides
more shards for the super data set, shards number = indexShardsNumber *
superDatasetIndexShardsFactor. Also, this factor effects Zipkin traces.
superDatasetIndexReplicasNumber:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas
number in the super size dataset record index, the default value is 0.
indexTemplateOrder: ${SW_STORAGE_ES_INDEX_TEMPLATE_ORDER:0} # the order of
index template
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 8010ace25e..ac5f5b4c7f 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -58,6 +58,14 @@ public class StorageModuleElasticsearchConfig extends
ModuleConfig {
private int dayStep = 1;
private int indexReplicasNumber = 0;
private int indexShardsNumber = 1;
+ /**
+ * @since 9.3.0, Specify the settings for each index individually.
+ * Use JSON format and the index name in the config should exclude the
`${SW_NAMESPACE}` e.g.
+ *
{"metrics-all":{"number_of_shards":"3","number_of_replicas":"2"},"segment":{"number_of_shards":"6","number_of_replicas":"1"}}
+ * If configured, this setting has the highest priority and overrides the
generic settings.
+ */
+ private String specificIndexSettings;
+
/**
* @since 8.2.0, the record day step is for super size dataset record
index rolling when the value of it is greater
* than 0
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexStructures.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexStructures.java
index ac1596deaf..ff8ee358d7 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexStructures.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexStructures.java
@@ -22,23 +22,27 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import lombok.EqualsAndHashCode;
import org.apache.skywalking.library.elasticsearch.response.Mappings;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
public class IndexStructures {
- private final Map<String, Fields> structures;
+ private final Map<String, Fields> mappingStructures;
+ private final Map<String, UpdatableIndexSettings> indexSettingStructures;
public IndexStructures() {
- this.structures = new HashMap<>();
+ this.mappingStructures = new HashMap<>();
+ this.indexSettingStructures = new HashMap<>();
}
public Mappings getMapping(String tableName) {
Map<String, Object> properties =
- structures.containsKey(tableName) ?
- structures.get(tableName).properties : new HashMap<>();
+ mappingStructures.containsKey(tableName) ?
+ mappingStructures.get(tableName).properties : new HashMap<>();
Mappings.Source source =
- structures.containsKey(tableName) ?
- structures.get(tableName).source : new
Mappings.Source();
+ mappingStructures.containsKey(tableName) ?
+ mappingStructures.get(tableName).source : new
Mappings.Source();
return Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
@@ -46,21 +50,29 @@ public class IndexStructures {
.build();
}
+ public UpdatableIndexSettings getUpdatableIndexSettings(String tableName) {
+ return indexSettingStructures.get(tableName);
+ }
+
/**
- * Add or append field when the current structures don't contain the input
structure or having
+ * Add or append mapping/settings when the current structures don't
contain the input structure or having
* new fields in it.
*/
- public void putStructure(String tableName, Mappings mapping) {
+ public void putStructure(String tableName, Mappings mapping, Map<String,
Object> settings) {
+ if (CollectionUtils.isNotEmpty(settings) &&
Objects.nonNull(settings.get("index"))) {
+ this.indexSettingStructures.putIfAbsent(tableName, new
UpdatableIndexSettings(
+ (Map<String, Object>) settings.get("index")));
+ }
if (Objects.isNull(mapping)
|| Objects.isNull(mapping.getProperties())
|| mapping.getProperties().isEmpty()) {
return;
}
Fields fields = new Fields(mapping);
- if (structures.containsKey(tableName)) {
- structures.get(tableName).appendNewFields(fields);
+ if (mappingStructures.containsKey(tableName)) {
+ mappingStructures.get(tableName).appendNewFields(fields);
} else {
- structures.put(tableName, fields);
+ mappingStructures.put(tableName, fields);
}
}
@@ -69,13 +81,13 @@ public class IndexStructures {
* The input mappings should be history mapping from current index.
* Do not return _source config to avoid current index update conflict.
*/
- public Mappings diffStructure(String tableName, Mappings mappings) {
- if (!structures.containsKey(tableName)) {
+ public Mappings diffMappings(String tableName, Mappings mappings) {
+ if (!mappingStructures.containsKey(tableName)) {
return new Mappings();
}
Map<String, Object> properties = mappings.getProperties();
Map<String, Object> diffProperties =
- structures.get(tableName).diffFields(new Fields(mappings));
+ mappingStructures.get(tableName).diffFields(new Fields(mappings));
return Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(diffProperties)
@@ -86,19 +98,33 @@ public class IndexStructures {
* Returns true when the current structures already contains the
properties of the input
* mappings.
*/
- public boolean containsStructure(String tableName, Mappings mappings) {
+ public boolean containsMapping(String tableName, Mappings mappings) {
if (Objects.isNull(mappings) ||
- Objects.isNull(mappings.getProperties()) ||
- mappings.getProperties().isEmpty()) {
+ CollectionUtils.isEmpty(mappings.getProperties())) {
return true;
}
- return structures.containsKey(tableName)
- && structures.get(tableName)
- .containsAllFields(new Fields(mappings));
+
+ return mappingStructures.containsKey(tableName)
+ && mappingStructures.get(tableName)
+ .containsAllFields(new Fields(mappings));
}
/**
- * The properties of the template or index.
+ * Returns true when the current index setting equals the input.
+ */
+ public boolean compareIndexSetting(String tableName, Map<String, Object>
settings) {
+ if ((CollectionUtils.isEmpty(settings) ||
CollectionUtils.isEmpty((Map) settings.get("index")))
+ && Objects.isNull(indexSettingStructures.get(tableName))) {
+ return true;
+ }
+
+ return indexSettingStructures.containsKey(tableName)
+ && indexSettingStructures.get(tableName).
+ equals(new
UpdatableIndexSettings((Map<String, Object>) settings.get("index")));
+ }
+
+ /**
+ * The mapping properties of the template or index.
*/
public static class Fields {
private final Map<String, Object> properties;
@@ -146,4 +172,18 @@ public class IndexStructures {
));
}
}
+
+ /**
+ * The index settings structure which only include needs to compare and
update fields
+ */
+ @EqualsAndHashCode
+ public static class UpdatableIndexSettings {
+ private final int replicas;
+ private final int shards;
+
+ public UpdatableIndexSettings(Map<String, Object> indexSettings) {
+ this.replicas = Integer.parseInt((String)
indexSettings.getOrDefault("number_of_replicas", "0"));
+ this.shards = Integer.parseInt((String)
indexSettings.getOrDefault("number_of_shards", "0"));
+ }
+ }
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index d99a09f5e9..05629c9565 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -18,8 +18,10 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.google.gson.Gson;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,6 +37,7 @@ import
org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
@@ -43,6 +46,7 @@ public class StorageEsInstaller extends ModelInstaller {
private final Gson gson = new Gson();
private final StorageModuleElasticsearchConfig config;
protected final ColumnTypeEsMapping columnTypeEsMapping;
+ private final Map<String, Map<String, Object>> specificIndexesSettings;
/**
* The mappings of the template .
@@ -56,6 +60,13 @@ public class StorageEsInstaller extends ModelInstaller {
this.columnTypeEsMapping = new ColumnTypeEsMapping();
this.config = config;
this.structures = getStructures();
+ if (StringUtil.isNotEmpty(config.getSpecificIndexSettings())) {
+ this.specificIndexesSettings = gson.fromJson(
+ config.getSpecificIndexSettings(), new
TypeReference<Map<String, Map<String, Object>>>() {
+ }.getType());
+ } else {
+ this.specificIndexesSettings = Collections.emptyMap();
+ }
}
protected IndexStructures getStructures() {
@@ -63,18 +74,18 @@ public class StorageEsInstaller extends ModelInstaller {
}
@Override
- public boolean isExists(Model model) {
+ public boolean isExists(Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient) client;
String tableName = IndexController.INSTANCE.getTableName(model);
IndexController.LogicIndicesRegister.registerRelation(model,
tableName);
if (!model.isTimeSeries()) {
boolean exist = esClient.isExistsIndex(tableName);
if (exist) {
- Mappings historyMapping = esClient.getIndex(tableName)
- .map(Index::getMappings)
- .orElseGet(Mappings::new);
- structures.putStructure(tableName, historyMapping);
- exist = structures.containsStructure(tableName,
createMapping(model));
+ Optional<Index> index = esClient.getIndex(tableName);
+ Mappings historyMapping =
index.map(Index::getMappings).orElseGet(Mappings::new);
+ structures.putStructure(tableName, historyMapping,
index.map(Index::getSettings).orElseGet(HashMap::new));
+ exist = structures.containsMapping(tableName,
createMapping(model))
+ && structures.compareIndexSetting(tableName,
createSetting(model));
}
return exist;
}
@@ -91,9 +102,10 @@ public class StorageEsInstaller extends ModelInstaller {
if (exist) {
structures.putStructure(
- tableName, template.get().getMappings()
+ tableName, template.get().getMappings(),
template.get().getSettings()
);
- exist = structures.containsStructure(tableName,
createMapping(model));
+ exist = structures.containsMapping(tableName, createMapping(model))
+ && structures.compareIndexSetting(tableName,
createSetting(model));
}
return exist;
}
@@ -111,28 +123,36 @@ public class StorageEsInstaller extends ModelInstaller {
ElasticSearchClient esClient = (ElasticSearchClient) client;
String tableName = IndexController.INSTANCE.getTableName(model);
Mappings mapping = createMapping(model);
+ Map<String, Object> settings = createSetting(model);
if (!esClient.isExistsIndex(tableName)) {
- Map<String, Object> settings = createSetting(model);
boolean isAcknowledged = esClient.createIndex(tableName, mapping,
settings);
log.info("create {} index finished, isAcknowledged: {}",
tableName, isAcknowledged);
if (!isAcknowledged) {
- throw new StorageException("create " + tableName + " index
failure, ");
+ throw new StorageException("create " + tableName + " index
failure");
}
} else {
Mappings historyMapping = esClient.getIndex(tableName)
.map(Index::getMappings)
.orElseGet(Mappings::new);
- structures.putStructure(tableName, mapping);
- Mappings appendMapping = structures.diffStructure(tableName,
historyMapping);
+ structures.putStructure(tableName, mapping, settings);
+ Mappings appendMapping = structures.diffMappings(tableName,
historyMapping);
+ //update mapping
if (appendMapping.getProperties() != null &&
!appendMapping.getProperties().isEmpty()) {
boolean isAcknowledged =
esClient.updateIndexMapping(tableName, appendMapping);
- log.info("update {} index finished, isAcknowledged: {}, append
mappings: {}", tableName,
+ log.info("update {} index mapping finished, isAcknowledged:
{}, append mapping: {}", tableName,
isAcknowledged, appendMapping
);
if (!isAcknowledged) {
- throw new StorageException("update " + tableName + " index
failure");
+ throw new StorageException("update " + tableName + " index
mapping failure");
}
}
+ //needs to update settings
+ if (!structures.compareIndexSetting(tableName, settings)) {
+ log.warn(
+ "index {} settings configuration has been updated to {},
please remove it before OAP starts",
+ tableName, settings
+ );
+ }
}
}
@@ -144,9 +164,11 @@ public class StorageEsInstaller extends ModelInstaller {
String indexName = TimeSeriesUtils.latestWriteIndexName(model);
try {
boolean shouldUpdateTemplate =
!esClient.isExistsTemplate(tableName);
- shouldUpdateTemplate = shouldUpdateTemplate ||
!structures.containsStructure(tableName, mapping);
+ shouldUpdateTemplate = shouldUpdateTemplate
+ || !structures.containsMapping(tableName, mapping)
+ || !structures.compareIndexSetting(tableName, settings);
if (shouldUpdateTemplate) {
- structures.putStructure(tableName, mapping);
+ structures.putStructure(tableName, mapping, settings);
boolean isAcknowledged = esClient.createOrUpdateTemplate(
tableName, settings, structures.getMapping(tableName),
config.getIndexTemplateOrder());
log.info("create {} index template finished, isAcknowledged:
{}", tableName, isAcknowledged);
@@ -159,7 +181,8 @@ public class StorageEsInstaller extends ModelInstaller {
Mappings historyMapping = esClient.getIndex(indexName)
.map(Index::getMappings)
.orElseGet(Mappings::new);
- Mappings appendMapping = structures.diffStructure(tableName,
historyMapping);
+ Mappings appendMapping = structures.diffMappings(tableName,
historyMapping);
+ //update mapping
if (appendMapping.getProperties() != null &&
!appendMapping.getProperties().isEmpty()) {
boolean isAcknowledged =
esClient.updateIndexMapping(indexName, appendMapping);
log.info("update {} index finished, isAcknowledged: {},
append mappings: {}", indexName,
@@ -169,6 +192,14 @@ public class StorageEsInstaller extends ModelInstaller {
throw new StorageException("update " + indexName + "
time series index failure");
}
}
+
+ //needs to update settings
+ if (!structures.compareIndexSetting(tableName, settings)) {
+ log.info(
+ "index template {} settings configuration has been
updated to {}, it will applied on new index",
+ tableName, settings
+ );
+ }
} else {
boolean isAcknowledged = esClient.createIndex(indexName);
log.info("create {} index finished, isAcknowledged: {}",
indexName, isAcknowledged);
@@ -183,13 +214,14 @@ public class StorageEsInstaller extends ModelInstaller {
protected Map<String, Object> createSetting(Model model) throws
StorageException {
Map<String, Object> setting = new HashMap<>();
-
- setting.put("index.number_of_replicas", model.isSuperDataset()
- ? config.getSuperDatasetIndexReplicasNumber()
- : config.getIndexReplicasNumber());
- setting.put("index.number_of_shards", model.isSuperDataset()
- ? config.getIndexShardsNumber() *
config.getSuperDatasetIndexShardsFactor()
- : config.getIndexShardsNumber());
+ Map<String, Object> indexSettings = new HashMap<>();
+ setting.put("index", indexSettings);
+ indexSettings.put("number_of_replicas", model.isSuperDataset()
+ ? Integer.toString(config.getSuperDatasetIndexReplicasNumber())
+ : Integer.toString(config.getIndexReplicasNumber()));
+ indexSettings.put("number_of_shards", model.isSuperDataset()
+ ? Integer.toString(config.getIndexShardsNumber() *
config.getSuperDatasetIndexShardsFactor())
+ : Integer.toString(config.getIndexShardsNumber()));
// Set the index refresh period as INT(flushInterval * 2/3). At the
edge case,
// in low traffic(traffic < bulkActions in the whole period), there is
a possible case, 2 period bulks are included in
// one index refresh rebuild operation, which could cause version
conflicts. And this case can't be fixed
@@ -202,13 +234,20 @@ public class StorageEsInstaller extends ModelInstaller {
// even this value is set too small by end user manually.
indexRefreshInterval = 5;
}
- setting.put("index.refresh_interval", indexRefreshInterval + "s");
+ indexSettings.put("refresh_interval", indexRefreshInterval + "s");
List<ModelColumn> columns =
IndexController.LogicIndicesRegister.getPhysicalTableColumns(model);
- setting.put("analysis", getAnalyzerSetting(columns));
+ indexSettings.put("analysis", getAnalyzerSetting(columns));
if (!StringUtil.isEmpty(config.getAdvanced())) {
Map<String, Object> advancedSettings =
gson.fromJson(config.getAdvanced(), Map.class);
setting.putAll(advancedSettings);
}
+
+ //Set the config for the specific index, if has been configured.
+ Map<String, Object> specificSettings =
this.specificIndexesSettings.get(IndexController.INSTANCE.getTableName(model));
+ if (!CollectionUtils.isEmpty(specificSettings)) {
+ indexSettings.putAll(specificSettings);
+ }
+
return setting;
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexStructuresTest.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexStructuresTest.java
index 1d918a790c..059af8abb9 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexStructuresTest.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexStructuresTest.java
@@ -38,7 +38,7 @@ public class IndexStructuresTest {
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
- .build());
+ .build(), new HashMap<>());
Mappings mapping = structures.getMapping("test");
Assert.assertEquals(mapping.getProperties(), properties);
@@ -46,7 +46,7 @@ public class IndexStructuresTest {
"test2", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(new HashMap<>())
- .build());
+ .build(), new HashMap<>());
mapping = structures.getMapping("test2");
Assert.assertTrue(mapping.getProperties().isEmpty());
@@ -59,7 +59,7 @@ public class IndexStructuresTest {
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
- .build());
+ .build(), new HashMap<>());
Assert.assertEquals(properties,
structuresSource.getMapping("test").getProperties());
Assert.assertEquals(source.getExcludes(),
structuresSource.getMapping("test").getSource().getExcludes());
}
@@ -74,7 +74,7 @@ public class IndexStructuresTest {
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
- .build());
+ .build(), new HashMap<>());
Mappings mapping = structures.getMapping("test");
Assert.assertEquals(properties, mapping.getProperties());
HashMap<String, Object> properties2 = new HashMap<>();
@@ -84,7 +84,7 @@ public class IndexStructuresTest {
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
- .build());
+ .build(), new HashMap<>());
mapping = structures.getMapping("test");
HashMap<String, Object> res = new HashMap<>();
res.put("a", "b");
@@ -101,7 +101,7 @@ public class IndexStructuresTest {
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
- .build());
+ .build(), new HashMap<>());
Assert.assertEquals(properties,
structuresSource.getMapping("test").getProperties());
Assert.assertEquals(source.getExcludes(),
structuresSource.getMapping("test").getSource().getExcludes());
@@ -111,14 +111,14 @@ public class IndexStructuresTest {
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
- .source(source2)
- .build());
+ .source(source)
+ .build(), new HashMap<>());
Assert.assertEquals(res,
structuresSource.getMapping("test").getProperties());
- Assert.assertEquals(new HashSet<>(),
structuresSource.getMapping("test").getSource().getExcludes());
+ Assert.assertEquals(new HashSet<>(Arrays.asList("a", "b", "c", "d",
"e")), structuresSource.getMapping("test").getSource().getExcludes());
}
@Test
- public void diffStructure() {
+ public void diffMapping() {
IndexStructures structures = new IndexStructures();
HashMap<String, Object> properties = new HashMap<>();
properties.put("a", "b");
@@ -128,10 +128,10 @@ public class IndexStructuresTest {
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
- .build());
+ .build(), new HashMap<>());
HashMap<String, Object> properties2 = new HashMap<>();
properties2.put("a", "b");
- Mappings diffMappings = structures.diffStructure(
+ Mappings diffMappings = structures.diffMappings(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
@@ -140,7 +140,7 @@ public class IndexStructuresTest {
res.put("c", "d");
res.put("f", "g");
Assert.assertEquals(res, diffMappings.getProperties());
- diffMappings = structures.diffStructure(
+ diffMappings = structures.diffMappings(
"test",
Mappings.builder()
.type(ElasticSearchClient.TYPE)
@@ -158,15 +158,15 @@ public class IndexStructuresTest {
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
- .build());
- diffMappings = structuresSource.diffStructure(
+ .build(), new HashMap<>());
+ diffMappings = structuresSource.diffMappings(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
.source(source)
.build());
Assert.assertEquals(res, diffMappings.getProperties());
- diffMappings = structuresSource.diffStructure(
+ diffMappings = structuresSource.diffMappings(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
@@ -174,7 +174,7 @@ public class IndexStructuresTest {
.build());
Assert.assertEquals(res, diffMappings.getProperties());
Assert.assertNull("Mapping source should not be return by
diffStructure()", diffMappings.getSource());
- diffMappings = structuresSource.diffStructure(
+ diffMappings = structuresSource.diffMappings(
"test",
Mappings.builder()
.type(ElasticSearchClient.TYPE)
@@ -186,7 +186,7 @@ public class IndexStructuresTest {
}
@Test
- public void containsStructure() {
+ public void containsMapping() {
IndexStructures structures = new IndexStructures();
HashMap<String, Object> properties = new HashMap<>();
properties.put("a", "b");
@@ -196,12 +196,12 @@ public class IndexStructuresTest {
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(new Mappings.Source())
- .build());
+ .build(), new HashMap<>());
HashMap<String, Object> properties2 = new HashMap<>();
properties2.put("a", "b");
properties2.put("c", "d");
- Assert.assertTrue(structures.containsStructure(
+ Assert.assertTrue(structures.containsMapping(
"test",
Mappings.builder()
.type(ElasticSearchClient.TYPE)
@@ -213,7 +213,7 @@ public class IndexStructuresTest {
HashMap<String, Object> properties3 = new HashMap<>();
properties3.put("a", "b");
properties3.put("q", "d");
- Assert.assertFalse(structures.containsStructure(
+ Assert.assertFalse(structures.containsMapping(
"test",
Mappings.builder()
.type(ElasticSearchClient.TYPE)
@@ -221,4 +221,35 @@ public class IndexStructuresTest {
.build()
));
}
+
+ @Test
+ public void compareIndexSetting() {
+ IndexStructures structures = new IndexStructures();
+ HashMap<String, Object> settings = new HashMap<>();
+ HashMap<String, Object> indexSettings = new HashMap<>();
+ settings.put("index", indexSettings);
+ indexSettings.put("number_of_replicas", "1");
+ indexSettings.put("number_of_shards", "1");
+ structures.putStructure("test", new Mappings(), settings);
+
+ HashMap<String, Object> settings2 = new HashMap<>();
+ HashMap<String, Object> indexSettings2 = new HashMap<>();
+ settings2.put("index", indexSettings2);
+ indexSettings2.put("number_of_replicas", "1");
+ indexSettings2.put("number_of_shards", "1");
+ Assert.assertTrue(structures.compareIndexSetting(
+ "test",
+ settings2
+ ));
+
+ HashMap<String, Object> settings3 = new HashMap<>();
+ HashMap<String, Object> indexSettings3 = new HashMap<>();
+ settings3.put("index", indexSettings3);
+ indexSettings3.put("number_of_replicas", "1");
+ indexSettings3.put("number_of_shards", "2");
+ Assert.assertFalse(structures.compareIndexSetting(
+ "test",
+ settings3
+ ));
+ }
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MockEsInstallTest.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MockEsInstallTest.java
index 61b461f4e0..e08e6a10de 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MockEsInstallTest.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MockEsInstallTest.java
@@ -248,18 +248,18 @@ public class MockEsInstallTest {
//clone it since the items will be changed after combine
Mappings hisMappingsClone = cloneMappings(this.hisMappings);
//put the current mappings
- structures.putStructure(this.name, this.hisMappings);
+ structures.putStructure(this.name, this.hisMappings, new HashMap<>());
//if current mappings already contains new mappings items
- Assert.assertEquals(this.contains,
structures.containsStructure(this.name, this.newMappings));
+ Assert.assertEquals(this.contains,
structures.containsMapping(this.name, this.newMappings));
//put the new mappings and combine
- structures.putStructure(this.name, this.newMappings);
+ structures.putStructure(this.name, this.newMappings, new HashMap<>());
Mappings mappings = structures.getMapping(this.name);
Assert.assertEquals(this.combineResult,
mapper.writeValueAsString(mappings));
//diff the hisMapping and new, if has new item will update current
index
- structures.putStructure(this.name, this.newMappings);
- Mappings diff = structures.diffStructure(this.name, hisMappingsClone);
+ structures.putStructure(this.name, this.newMappings, new HashMap<>());
+ Mappings diff = structures.diffMappings(this.name, hisMappingsClone);
Assert.assertEquals(this.diffResult, mapper.writeValueAsString(diff));
}