This is an automated email from the ASF dual-hosted git repository.

wankai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ccb04e6cf ElasticSearch: scroll id should be updated when scrolling 
as it may change (#9199)
9ccb04e6cf is described below

commit 9ccb04e6cfcdcd2911a446b23add670ca24dd5d9
Author: kezhenxu94 <kezhenx...@apache.org>
AuthorDate: Fri Jun 10 18:13:59 2022 +0800

    ElasticSearch: scroll id should be updated when scrolling as it may change 
(#9199)
---
 docs/en/changes/changes.md                                    |  2 ++
 .../plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java  |  8 ++++++--
 .../plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java    |  8 ++++++--
 .../plugin/elasticsearch/query/MetadataQueryEsDAO.java        |  8 ++++++--
 .../plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java   | 11 ++++++++---
 5 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 385bd80319..75ea5efa44 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -4,6 +4,8 @@
 
 #### OAP Server
 
+* ElasticSearch: scroll id should be updated when scrolling as it may change
+
 #### Documentation
 
 * Fix invalid links in release docs
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
index 95cd462962..e81ef85479 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
@@ -19,7 +19,9 @@
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
@@ -62,9 +64,11 @@ public class NetworkAddressAliasEsDAO extends EsDAO 
implements INetworkAddressAl
 
             SearchResponse results =
                 getClient().search(NetworkAddressAlias.INDEX_NAME, search, 
params);
-            final String scrollId = results.getScrollId();
+            final Set<String> scrollIds = new HashSet<>();
             try {
                 while (true) {
+                    final String scrollId = results.getScrollId();
+                    scrollIds.add(scrollId);
                     if (results.getHits().getTotal() == 0) {
                         break;
                     }
@@ -82,7 +86,7 @@ public class NetworkAddressAliasEsDAO extends EsDAO 
implements INetworkAddressAl
                     results = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
scrollId);
                 }
             } finally {
-                getClient().deleteScrollContextQuietly(scrollId);
+                scrollIds.forEach(getClient()::deleteScrollContextQuietly);
             }
         } catch (Throwable t) {
             log.error(t.getMessage(), t);
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
index c638d082b2..26d747f1a6 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
@@ -35,8 +35,10 @@ import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexC
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class EBPFProfilingDataEsDAO extends EsDAO implements 
IEBPFProfilingDataDAO {
     private final int scrollingBatchSize;
@@ -59,9 +61,11 @@ public class EBPFProfilingDataEsDAO extends EsDAO implements 
IEBPFProfilingDataD
         final List<EBPFProfilingDataRecord> records = new ArrayList<>();
 
         SearchResponse results = getClient().search(index, search.build(), 
params);
-        final String scrollId = results.getScrollId();
+        final Set<String> scrollIds = new HashSet<>();
         try {
             while (true) {
+                final String scrollId = results.getScrollId();
+                scrollIds.add(scrollId);
                 if (results.getHits().getTotal() == 0) {
                     break;
                 }
@@ -74,7 +78,7 @@ public class EBPFProfilingDataEsDAO extends EsDAO implements 
IEBPFProfilingDataD
                 results = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
scrollId);
             }
         } finally {
-            getClient().deleteScrollContextQuietly(scrollId);
+            scrollIds.forEach(getClient()::deleteScrollContextQuietly);
         }
         return records;
     }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index 567e265585..69a98a854d 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -24,8 +24,10 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import 
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
@@ -93,9 +95,11 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
         final List<Service> services = new ArrayList<>();
 
         SearchResponse results = getClient().search(index, search.build(), 
params);
-        String scrollId = results.getScrollId();
+        Set<String> scrollIds = new HashSet<>();
         try {
             while (true) {
+                String scrollId = results.getScrollId();
+                scrollIds.add(scrollId);
                 if (results.getHits().getTotal() == 0) {
                     break;
                 }
@@ -112,7 +116,7 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
                 results = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
scrollId);
             }
         } finally {
-            getClient().deleteScrollContextQuietly(scrollId);
+            scrollIds.forEach(getClient()::deleteScrollContextQuietly);
         }
         return services;
     }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
index 1be8036666..ba8c21fac8 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
@@ -151,16 +151,21 @@ public class ZipkinQueryEsDAO extends EsDAO implements 
IZipkinQueryDAO {
         SearchBuilder search = Search.builder().query(query).size(5000); //max 
span size for 1 scroll
         final SearchParams params = new 
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
         SearchResponse response = getClient().search(index, search.build(), 
params);
-        String scrollId = response.getScrollId();
 
         Map<String, List<Span>> groupedByTraceId = new LinkedHashMap<String, 
List<Span>>();
+        final Set<String> scrollIds = new HashSet<>();
         try {
-            while (response.getHits().getHits().size() != 0) {
+            while (true) {
+                String scrollId = response.getScrollId();
+                scrollIds.add(scrollId);
+                if (response.getHits().getHits().size() == 0) {
+                    break;
+                }
                 buildTraces(response, groupedByTraceId);
                 response = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
scrollId);
             }
         } finally {
-            getClient().deleteScrollContextQuietly(scrollId);
+            scrollIds.forEach(getClient()::deleteScrollContextQuietly);
         }
         return new ArrayList<>(groupedByTraceId.values());
     }

Reply via email to