This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 287b8b9 Add syncBulkActions config to avoid the large amount of
metrics data written ES in a single sync bulk request. (#5699)
287b8b9 is described below
commit 287b8b9be5cef8f49205f34bafef694284022645
Author: Ax1an <[email protected]>
AuthorDate: Wed Oct 21 12:23:32 2020 +0800
Add syncBulkActions config to avoid the large amount of metrics data
written ES in a single sync bulk request. (#5699)
---
CHANGES.md | 1 +
changes/changes-8.2.0.md | 1 +
docs/en/setup/backend/backend-storage.md | 3 ++-
docs/en/setup/backend/configuration-vocabulary.md | 6 ++++--
.../src/main/resources/application.yml | 6 ++++--
.../src/test/resources/application.yml | 3 ++-
.../StorageModuleElasticsearchConfig.java | 1 +
.../StorageModuleElasticsearchProvider.java | 2 +-
.../elasticsearch/base/BatchProcessEsDAO.java | 24 ++++++++++++++--------
.../StorageModuleElasticsearch7Provider.java | 2 +-
tools/profile-exporter/application.yml | 6 ++++--
11 files changed, 37 insertions(+), 18 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a9439f9..90465d0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -91,6 +91,7 @@ Release Notes.
* Fix end time bug in the query process.
* Fix `Exporter INCREMENT mode` is not working.
* Fix an error when executing startup.bat when the log directory exists
+* Add syncBulkActions configuration to set up the batch size of the metrics
persistent.
#### UI
* Add browser dashboard.
diff --git a/changes/changes-8.2.0.md b/changes/changes-8.2.0.md
index 3dd7409..0bec8e4 100644
--- a/changes/changes-8.2.0.md
+++ b/changes/changes-8.2.0.md
@@ -87,6 +87,7 @@
* Fix end time bug in the query process.
* Fix `Exporter INCREMENT mode` is not working.
* Fix an error when executing startup.bat when the log directory exists
+* Add syncBulkActions configuration to set up the batch size of the metrics
persistent.
#### UI
* Add browser dashboard.
diff --git a/docs/en/setup/backend/backend-storage.md
b/docs/en/setup/backend/backend-storage.md
index cc66183..0a9776e 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -63,7 +63,8 @@ storage:
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent
the number of days in the super size dataset record index, the default value is
the same as dayStep when the value is less than 0
superDatasetIndexShardsFactor:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides
more shards for the super data set, shards number = indexShardsNumber *
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger
traces.
superDatasetIndexReplicasNumber:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas
number in the super size dataset record index, the default value is 0.
- bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every
1000 requests
+ bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+ syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every
10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of
concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
diff --git a/docs/en/setup/backend/configuration-vocabulary.md
b/docs/en/setup/backend/configuration-vocabulary.md
index e0787aa..a28340f 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -85,7 +85,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`.
**Receiver** mode
| - | - | superDatasetDayStep | Represent the number of days in the super size
dataset record index, the default value is the same as dayStep when the value
is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 |
| - | - | superDatasetIndexShardsFactor | Super data set has been defined in
the codes, such as trace segments. This factor provides more shards for the
super data set, shards number = indexShardsNumber *
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger
traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 |
| - | - | superDatasetIndexReplicasNumber | Represent the replicas number in
the super size dataset record
index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 |
-| - | - | bulkActions| Bulk size of the batch execution. |
SW_STORAGE_ES_BULK_ACTIONS| 1000|
+| - | - | bulkActions| Async bulk size of the record data batch execution. |
SW_STORAGE_ES_BULK_ACTIONS| 1000|
+| - | - | syncBulkActions| Sync bulk size of the metrics data batch execution.
| SW_STORAGE_ES_SYNC_BULK_ACTIONS| 50000|
| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or
not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10|
| - | - | concurrentRequests| The number of concurrent requests allowed to be
executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 |
| - | - | resultWindowMaxSize | The max size of dataset when OAP loading
cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000|
@@ -108,7 +109,8 @@ core|default|role|Option values,
`Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | superDatasetDayStep | Represent the number of days in the super size
dataset record index, the default value is the same as dayStep when the value
is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 |
| - | - | superDatasetIndexShardsFactor | Super data set has been defined in
the codes, such as trace segments. This factor provides more shards for the
super data set, shards number = indexShardsNumber *
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger
traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 |
| - | - | superDatasetIndexReplicasNumber | Represent the replicas number in
the super size dataset record
index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 |
-| - | - | bulkActions| Bulk size of the batch execution. |
SW_STORAGE_ES_BULK_ACTIONS| 1000|
+| - | - | bulkActions| Async bulk size of the record data batch execution. |
SW_STORAGE_ES_BULK_ACTIONS| 1000|
+| - | - | syncBulkActions| Sync bulk size of the metrics data batch execution.
| SW_STORAGE_ES_SYNC_BULK_ACTIONS| 50000|
| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or
not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10|
| - | - | concurrentRequests| The number of concurrent requests allowed to be
executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 |
| - | - | resultWindowMaxSize | The max size of dataset when OAP loading
cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000|
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml
b/oap-server/server-bootstrap/src/main/resources/application.yml
index 135e85d..e331857 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -118,7 +118,8 @@ storage:
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent
the number of days in the super size dataset record index, the default value is
the same as dayStep when the value is less than 0
superDatasetIndexShardsFactor:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides
more shards for the super data set, shards number = indexShardsNumber *
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger
traces.
superDatasetIndexReplicasNumber:
${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas
number in the super size dataset record index, the default value is 0.
- bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every
1000 requests
+ bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+ syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every
10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of
concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
@@ -142,7 +143,8 @@ storage:
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets
management file in the properties format includes the username, password, which
are managed by 3rd party tool.
- bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every
1000 requests
+ bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+ syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every
10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of
concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
diff --git
a/oap-server/server-library/library-util/src/test/resources/application.yml
b/oap-server/server-library/library-util/src/test/resources/application.yml
index 17c968b..f680195 100755
--- a/oap-server/server-library/library-util/src/test/resources/application.yml
+++ b/oap-server/server-library/library-util/src/test/resources/application.yml
@@ -52,7 +52,8 @@ storage:
indexShardsNumber: ${ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to
https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
- bulkActions: ${ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
+ bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+ syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
bulkSize: ${ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds
whatever the number of requests
concurrentRequests: ${ES_CONCURRENT_REQUESTS:2} # the number of concurrent
requests
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 02cfc2f..d124beb 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -52,6 +52,7 @@ public class StorageModuleElasticsearchConfig extends
ModuleConfig {
private int superDatasetIndexShardsFactor = 5;
private int indexRefreshInterval = 2;
private int bulkActions = 2000;
+ private int syncBulkActions = 50000;
private int flushInterval = 10;
private int concurrentRequests = 2;
/**
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 5515e3d..5867bae 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -158,7 +158,7 @@ public class StorageModuleElasticsearchProvider extends
ModuleProvider {
);
this.registerServiceImplementation(
- IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient,
config.getBulkActions(), config
+ IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient,
config.getBulkActions(), config.getSyncBulkActions(), config
.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new
StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index 3c87620..2722b60 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -19,6 +19,8 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.List;
+
+import com.google.common.collect.Lists;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -37,12 +39,14 @@ public class BatchProcessEsDAO extends EsDAO implements
IBatchDAO {
private BulkProcessor bulkProcessor;
private final int bulkActions;
+ private final int syncBulkActions;
private final int flushInterval;
private final int concurrentRequests;
- public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int
flushInterval, int concurrentRequests) {
+ public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int
syncBulkActions, int flushInterval, int concurrentRequests) {
super(client);
this.bulkActions = bulkActions;
+ this.syncBulkActions = syncBulkActions;
this.flushInterval = flushInterval;
this.concurrentRequests = concurrentRequests;
}
@@ -59,16 +63,20 @@ public class BatchProcessEsDAO extends EsDAO implements
IBatchDAO {
@Override
public void synchronous(List<PrepareRequest> prepareRequests) {
if (CollectionUtils.isNotEmpty(prepareRequests)) {
- BulkRequest request = new BulkRequest();
+ List<List<PrepareRequest>> partitions =
Lists.partition(prepareRequests, syncBulkActions);
+
+ for (List<PrepareRequest> partition : partitions) {
+ BulkRequest request = new BulkRequest();
- for (PrepareRequest prepareRequest : prepareRequests) {
- if (prepareRequest instanceof InsertRequest) {
- request.add((IndexRequest) prepareRequest);
- } else {
- request.add((UpdateRequest) prepareRequest);
+ for (PrepareRequest prepareRequest : partition) {
+ if (prepareRequest instanceof InsertRequest) {
+ request.add((IndexRequest) prepareRequest);
+ } else {
+ request.add((UpdateRequest) prepareRequest);
+ }
}
+ getClient().synchronousBulk(request);
}
- getClient().synchronousBulk(request);
}
}
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
index 9acab87..0f7594c 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
@@ -156,7 +156,7 @@ public class StorageModuleElasticsearch7Provider extends
ModuleProvider {
);
this.registerServiceImplementation(
- IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client,
config.getBulkActions(),
+ IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client,
config.getBulkActions(), config.getSyncBulkActions(),
config.getFlushInterval(),
config.getConcurrentRequests()
));
this.registerServiceImplementation(StorageDAO.class, new
StorageEs7DAO(elasticSearch7Client));
diff --git a/tools/profile-exporter/application.yml
b/tools/profile-exporter/application.yml
index 6bbfa31..da4ca12 100644
--- a/tools/profile-exporter/application.yml
+++ b/tools/profile-exporter/application.yml
@@ -29,7 +29,8 @@ storage:
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to
https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
- bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every
1000 requests
+ bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+ syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every
10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of
concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
@@ -49,7 +50,8 @@ storage:
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to
https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
- bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every
1000 requests
+ bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+ syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every
10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of
concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}