This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 287b8b9  Add syncBulkActions config to avoid the large amount of 
metrics data written ES in a single sync bulk request. (#5699)
287b8b9 is described below

commit 287b8b9be5cef8f49205f34bafef694284022645
Author: Ax1an <[email protected]>
AuthorDate: Wed Oct 21 12:23:32 2020 +0800

    Add syncBulkActions config to avoid the large amount of metrics data 
written ES in a single sync bulk request. (#5699)
---
 CHANGES.md                                         |  1 +
 changes/changes-8.2.0.md                           |  1 +
 docs/en/setup/backend/backend-storage.md           |  3 ++-
 docs/en/setup/backend/configuration-vocabulary.md  |  6 ++++--
 .../src/main/resources/application.yml             |  6 ++++--
 .../src/test/resources/application.yml             |  3 ++-
 .../StorageModuleElasticsearchConfig.java          |  1 +
 .../StorageModuleElasticsearchProvider.java        |  2 +-
 .../elasticsearch/base/BatchProcessEsDAO.java      | 24 ++++++++++++++--------
 .../StorageModuleElasticsearch7Provider.java       |  2 +-
 tools/profile-exporter/application.yml             |  6 ++++--
 11 files changed, 37 insertions(+), 18 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a9439f9..90465d0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -91,6 +91,7 @@ Release Notes.
 * Fix end time bug in the query process.
 * Fix `Exporter INCREMENT mode` is not working.
 * Fix an error when executing startup.bat when the log directory exists
+* Add syncBulkActions configuration to set up the batch size of the metrics 
persistent.
 
 #### UI
 * Add browser dashboard.
diff --git a/changes/changes-8.2.0.md b/changes/changes-8.2.0.md
index 3dd7409..0bec8e4 100644
--- a/changes/changes-8.2.0.md
+++ b/changes/changes-8.2.0.md
@@ -87,6 +87,7 @@
 * Fix end time bug in the query process.
 * Fix `Exporter INCREMENT mode` is not working.
 * Fix an error when executing startup.bat when the log directory exists
+* Add syncBulkActions configuration to set up the batch size of the metrics 
persistent.
 
 #### UI
 * Add browser dashboard.
diff --git a/docs/en/setup/backend/backend-storage.md 
b/docs/en/setup/backend/backend-storage.md
index cc66183..0a9776e 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -63,7 +63,8 @@ storage:
     superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent 
the number of days in the super size dataset record index, the default value is 
the same as dayStep when the value is less than 0
     superDatasetIndexShardsFactor: 
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} #  This factor provides 
more shards for the super data set, shards number = indexShardsNumber * 
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger 
traces.
     superDatasetIndexReplicasNumber: 
${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas 
number in the super size dataset record index, the default value is 0.
-    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 
1000 requests
+    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk 
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+    syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the 
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
     flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 
10 seconds whatever the number of requests
     concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of 
concurrent requests
     resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
diff --git a/docs/en/setup/backend/configuration-vocabulary.md 
b/docs/en/setup/backend/configuration-vocabulary.md
index e0787aa..a28340f 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -85,7 +85,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. 
**Receiver** mode
 | - | - | superDatasetDayStep | Represent the number of days in the super size 
dataset record index, the default value is the same as dayStep when the value 
is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 |
 | - | - | superDatasetIndexShardsFactor | Super data set has been defined in 
the codes, such as trace segments. This factor provides more shards for the 
super data set, shards number = indexShardsNumber * 
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger 
traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 |
 | - | - | superDatasetIndexReplicasNumber | Represent the replicas number in 
the super size dataset record 
index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 |
-| - | - | bulkActions| Bulk size of the batch execution. | 
SW_STORAGE_ES_BULK_ACTIONS| 1000|
+| - | - | bulkActions| Async bulk size of the record data batch execution. | 
SW_STORAGE_ES_BULK_ACTIONS| 1000|
+| - | - | syncBulkActions| Sync bulk size of the metrics data batch execution. 
| SW_STORAGE_ES_SYNC_BULK_ACTIONS| 50000|
 | - | - | flushInterval| Period of flush, no matter `bulkActions` reached or 
not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10|
 | - | - | concurrentRequests| The number of concurrent requests allowed to be 
executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 |
 | - | - | resultWindowMaxSize | The max size of dataset when OAP loading 
cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000|
@@ -108,7 +109,8 @@ core|default|role|Option values, 
`Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | superDatasetDayStep | Represent the number of days in the super size 
dataset record index, the default value is the same as dayStep when the value 
is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 |
 | - | - | superDatasetIndexShardsFactor | Super data set has been defined in 
the codes, such as trace segments. This factor provides more shards for the 
super data set, shards number = indexShardsNumber * 
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger 
traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 |
 | - | - | superDatasetIndexReplicasNumber | Represent the replicas number in 
the super size dataset record 
index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 |
-| - | - | bulkActions| Bulk size of the batch execution. | 
SW_STORAGE_ES_BULK_ACTIONS| 1000|
+| - | - | bulkActions| Async bulk size of the record data batch execution. | 
SW_STORAGE_ES_BULK_ACTIONS| 1000|
+| - | - | syncBulkActions| Sync bulk size of the metrics data batch execution. 
| SW_STORAGE_ES_SYNC_BULK_ACTIONS| 50000|
 | - | - | flushInterval| Period of flush, no matter `bulkActions` reached or 
not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10|
 | - | - | concurrentRequests| The number of concurrent requests allowed to be 
executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 |
 | - | - | resultWindowMaxSize | The max size of dataset when OAP loading 
cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000|
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml 
b/oap-server/server-bootstrap/src/main/resources/application.yml
index 135e85d..e331857 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -118,7 +118,8 @@ storage:
     superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent 
the number of days in the super size dataset record index, the default value is 
the same as dayStep when the value is less than 0
     superDatasetIndexShardsFactor: 
${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} #  This factor provides 
more shards for the super data set, shards number = indexShardsNumber * 
superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger 
traces.
     superDatasetIndexReplicasNumber: 
${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas 
number in the super size dataset record index, the default value is 0.
-    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 
1000 requests
+    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk 
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+    syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the 
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
     flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 
10 seconds whatever the number of requests
     concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of 
concurrent requests
     resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
@@ -142,7 +143,8 @@ storage:
     user: ${SW_ES_USER:""}
     password: ${SW_ES_PASSWORD:""}
     secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets 
management file in the properties format includes the username, password, which 
are managed by 3rd party tool.
-    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 
1000 requests
+    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk 
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+    syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the 
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
     flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 
10 seconds whatever the number of requests
     concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of 
concurrent requests
     resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
diff --git 
a/oap-server/server-library/library-util/src/test/resources/application.yml 
b/oap-server/server-library/library-util/src/test/resources/application.yml
index 17c968b..f680195 100755
--- a/oap-server/server-library/library-util/src/test/resources/application.yml
+++ b/oap-server/server-library/library-util/src/test/resources/application.yml
@@ -52,7 +52,8 @@ storage:
     indexShardsNumber: ${ES_INDEX_SHARDS_NUMBER:2}
     indexReplicasNumber: ${ES_INDEX_REPLICAS_NUMBER:0}
     # Batch process setting, refer to 
https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
-    bulkActions: ${ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
+    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk 
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+    syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the 
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
     bulkSize: ${ES_BULK_SIZE:20} # flush the bulk every 20mb
     flushInterval: ${ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds 
whatever the number of requests
     concurrentRequests: ${ES_CONCURRENT_REQUESTS:2} # the number of concurrent 
requests
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 02cfc2f..d124beb 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -52,6 +52,7 @@ public class StorageModuleElasticsearchConfig extends 
ModuleConfig {
     private int superDatasetIndexShardsFactor = 5;
     private int indexRefreshInterval = 2;
     private int bulkActions = 2000;
+    private int syncBulkActions = 50000;
     private int flushInterval = 10;
     private int concurrentRequests = 2;
     /**
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 5515e3d..5867bae 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -158,7 +158,7 @@ public class StorageModuleElasticsearchProvider extends 
ModuleProvider {
         );
 
         this.registerServiceImplementation(
-            IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, 
config.getBulkActions(), config
+            IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, 
config.getBulkActions(), config.getSyncBulkActions(), config
                 .getFlushInterval(), config.getConcurrentRequests()));
         this.registerServiceImplementation(StorageDAO.class, new 
StorageEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index 3c87620..2722b60 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -19,6 +19,8 @@
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
 import java.util.List;
+
+import com.google.common.collect.Lists;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -37,12 +39,14 @@ public class BatchProcessEsDAO extends EsDAO implements 
IBatchDAO {
 
     private BulkProcessor bulkProcessor;
     private final int bulkActions;
+    private final int syncBulkActions;
     private final int flushInterval;
     private final int concurrentRequests;
 
-    public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int 
flushInterval, int concurrentRequests) {
+    public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int 
syncBulkActions, int flushInterval, int concurrentRequests) {
         super(client);
         this.bulkActions = bulkActions;
+        this.syncBulkActions = syncBulkActions;
         this.flushInterval = flushInterval;
         this.concurrentRequests = concurrentRequests;
     }
@@ -59,16 +63,20 @@ public class BatchProcessEsDAO extends EsDAO implements 
IBatchDAO {
     @Override
     public void synchronous(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isNotEmpty(prepareRequests)) {
-            BulkRequest request = new BulkRequest();
+            List<List<PrepareRequest>> partitions = 
Lists.partition(prepareRequests, syncBulkActions);
+
+            for (List<PrepareRequest> partition : partitions) {
+                BulkRequest request = new BulkRequest();
 
-            for (PrepareRequest prepareRequest : prepareRequests) {
-                if (prepareRequest instanceof InsertRequest) {
-                    request.add((IndexRequest) prepareRequest);
-                } else {
-                    request.add((UpdateRequest) prepareRequest);
+                for (PrepareRequest prepareRequest : partition) {
+                    if (prepareRequest instanceof InsertRequest) {
+                        request.add((IndexRequest) prepareRequest);
+                    } else {
+                        request.add((UpdateRequest) prepareRequest);
+                    }
                 }
+                getClient().synchronousBulk(request);
             }
-            getClient().synchronousBulk(request);
         }
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
index 9acab87..0f7594c 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
@@ -156,7 +156,7 @@ public class StorageModuleElasticsearch7Provider extends 
ModuleProvider {
         );
 
         this.registerServiceImplementation(
-            IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, 
config.getBulkActions(),
+            IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, 
config.getBulkActions(), config.getSyncBulkActions(),
                                                    config.getFlushInterval(), 
config.getConcurrentRequests()
             ));
         this.registerServiceImplementation(StorageDAO.class, new 
StorageEs7DAO(elasticSearch7Client));
diff --git a/tools/profile-exporter/application.yml 
b/tools/profile-exporter/application.yml
index 6bbfa31..da4ca12 100644
--- a/tools/profile-exporter/application.yml
+++ b/tools/profile-exporter/application.yml
@@ -29,7 +29,8 @@ storage:
     indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
     indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
     # Batch process setting, refer to 
https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
-    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 
1000 requests
+    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk 
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+    syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the 
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
     flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 
10 seconds whatever the number of requests
     concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of 
concurrent requests
     resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
@@ -49,7 +50,8 @@ storage:
     indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
     indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
     # Batch process setting, refer to 
https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
-    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 
1000 requests
+    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk 
record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+    syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the 
sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
     flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 
10 seconds whatever the number of requests
     concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of 
concurrent requests
     resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}

Reply via email to