This is an automated email from the ASF dual-hosted git repository. wankai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push: new 9ccb04e6cf ElasticSearch: scroll id should be updated when scrolling as it may change (#9199) 9ccb04e6cf is described below commit 9ccb04e6cfcdcd2911a446b23add670ca24dd5d9 Author: kezhenxu94 <kezhenx...@apache.org> AuthorDate: Fri Jun 10 18:13:59 2022 +0800 ElasticSearch: scroll id should be updated when scrolling as it may change (#9199) --- docs/en/changes/changes.md | 2 ++ .../plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java | 8 ++++++-- .../plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java | 8 ++++++-- .../plugin/elasticsearch/query/MetadataQueryEsDAO.java | 8 ++++++-- .../plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java | 11 ++++++++--- 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 385bd80319..75ea5efa44 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -4,6 +4,8 @@ #### OAP Server +* ElasticSearch: scroll id should be updated when scrolling as it may change + #### Documentation * Fix invalid links in release docs diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java index 95cd462962..e81ef85479 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java @@ -19,7 +19,9 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.library.elasticsearch.requests.search.Query; import org.apache.skywalking.library.elasticsearch.requests.search.Search; @@ -62,9 +64,11 @@ public class NetworkAddressAliasEsDAO extends EsDAO implements INetworkAddressAl SearchResponse results = getClient().search(NetworkAddressAlias.INDEX_NAME, search, params); - final String scrollId = results.getScrollId(); + final Set<String> scrollIds = new HashSet<>(); try { while (true) { + final String scrollId = results.getScrollId(); + scrollIds.add(scrollId); if (results.getHits().getTotal() == 0) { break; } @@ -82,7 +86,7 @@ public class NetworkAddressAliasEsDAO extends EsDAO implements INetworkAddressAl results = getClient().scroll(SCROLL_CONTEXT_RETENTION, scrollId); } } finally { - getClient().deleteScrollContextQuietly(scrollId); + scrollIds.forEach(getClient()::deleteScrollContextQuietly); } } catch (Throwable t) { log.error(t.getMessage(), t); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java index c638d082b2..26d747f1a6 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java @@ -35,8 +35,10 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexC import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class EBPFProfilingDataEsDAO extends EsDAO implements IEBPFProfilingDataDAO { private final int scrollingBatchSize; @@ -59,9 +61,11 @@ public class EBPFProfilingDataEsDAO extends EsDAO implements IEBPFProfilingDataD final List<EBPFProfilingDataRecord> records = new ArrayList<>(); SearchResponse results = getClient().search(index, search.build(), params); - final String scrollId = results.getScrollId(); + final Set<String> scrollIds = new HashSet<>(); try { while (true) { + final String scrollId = results.getScrollId(); + scrollIds.add(scrollId); if (results.getHits().getTotal() == 0) { break; } @@ -74,7 +78,7 @@ public class EBPFProfilingDataEsDAO extends EsDAO implements IEBPFProfilingDataD results = getClient().scroll(SCROLL_CONTEXT_RETENTION, scrollId); } } finally { - getClient().deleteScrollContextQuietly(scrollId); + scrollIds.forEach(getClient()::deleteScrollContextQuietly); } return records; } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java index 567e265585..69a98a854d 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java @@ -24,8 +24,10 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder; import org.apache.skywalking.library.elasticsearch.requests.search.Query; @@ -93,9 +95,11 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { final List<Service> services = new ArrayList<>(); SearchResponse results = getClient().search(index, search.build(), params); - String scrollId = results.getScrollId(); + Set<String> scrollIds = new HashSet<>(); try { while (true) { + String scrollId = results.getScrollId(); + scrollIds.add(scrollId); if (results.getHits().getTotal() == 0) { break; } @@ -112,7 +116,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { results = getClient().scroll(SCROLL_CONTEXT_RETENTION, scrollId); } } finally { - getClient().deleteScrollContextQuietly(scrollId); + scrollIds.forEach(getClient()::deleteScrollContextQuietly); } return services; } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java index 1be8036666..ba8c21fac8 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java @@ -151,16 +151,21 @@ public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO { SearchBuilder search = Search.builder().query(query).size(5000); //max span size for 1 scroll final SearchParams params = new SearchParams().scroll(SCROLL_CONTEXT_RETENTION); SearchResponse response = getClient().search(index, search.build(), params); - String scrollId = response.getScrollId(); Map<String, List<Span>> groupedByTraceId = new LinkedHashMap<String, List<Span>>(); + final Set<String> scrollIds = new HashSet<>(); try { - while (response.getHits().getHits().size() != 0) { + while (true) { + String scrollId = response.getScrollId(); + scrollIds.add(scrollId); + if (response.getHits().getHits().size() == 0) { + break; + } buildTraces(response, groupedByTraceId); response = getClient().scroll(SCROLL_CONTEXT_RETENTION, scrollId); } } finally { - getClient().deleteScrollContextQuietly(scrollId); + scrollIds.forEach(getClient()::deleteScrollContextQuietly); } return new ArrayList<>(groupedByTraceId.values()); }