This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new cb165eb2ca Scroll all results of Endpoint in ElasticSearch storage and 
refactor scrolling logics to reduce boilerplate codes (#10986)
cb165eb2ca is described below

commit cb165eb2ca8656ae2e3c5c0b71ff90b7658d80e7
Author: kezhenxu94 <[email protected]>
AuthorDate: Sat Jun 24 23:01:56 2023 +0800

    Scroll all results of Endpoint in ElasticSearch storage and refactor 
scrolling logics to reduce boilerplate codes (#10986)
---
 docs/en/changes/changes.md                         |   1 +
 .../elasticsearch/ElasticSearchScroller.java       |  79 +++++
 .../cache/NetworkAddressAliasEsDAO.java            |  57 ++--
 .../query/EBPFProfilingDataEsDAO.java              |  57 +---
 .../elasticsearch/query/MetadataQueryEsDAO.java    | 324 +++++++++++----------
 .../query/SpanAttachedEventEsDAO.java              |  58 ++--
 .../query/zipkin/ZipkinQueryEsDAO.java             |  91 +++---
 7 files changed, 332 insertions(+), 335 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index a5dc7e8e88..40c047bded 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -16,6 +16,7 @@
 * Fix AI Pipeline uri caching NullPointer and IllegalArgument Exceptions.
 * Fix `NPE` in metrics query when the metric is not exist.
 * Remove E2E tests for Istio < 1.15, ElasticSearch < 7.16.3, they might still 
work but are not supported as planed.
+* Scroll all results in ElasticSearch storage and refactor scrolling logics, 
including Service, Instance, Endpoint, Process, etc.
 
 #### UI
 * Fix metric name `browser_app_error_rate` in `Browser-Root` dashboard.
diff --git 
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchScroller.java
 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchScroller.java
new file mode 100644
index 0000000000..55eba02fd2
--- /dev/null
+++ 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchScroller.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+import lombok.Builder;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.function.Function;
+
+@Builder
+@RequiredArgsConstructor
+public class ElasticSearchScroller<T> {
+    public static final Duration SCROLL_CONTEXT_RETENTION = 
Duration.ofSeconds(30);
+
+    final ElasticSearchClient client;
+    final Search search;
+    final String index;
+    @Builder.Default
+    final int queryMaxSize = 0;
+    @Builder.Default
+    final SearchParams params = new SearchParams();
+    final Function<SearchHit, T> resultConverter;
+
+    public List<T> scroll() {
+        final var results = new ArrayList<T>();
+        final var scrollIds = new HashSet<String>();
+
+        params.scroll(SCROLL_CONTEXT_RETENTION);
+
+        var response = client.search(index, search, params);
+
+        try {
+            while (true) {
+                final var scrollId = response.getScrollId();
+                scrollIds.add(scrollId);
+                if (response.getHits().getTotal() == 0) {
+                    break;
+                }
+                for (final var searchHit : response.getHits()) {
+                    results.add(resultConverter.apply(searchHit));
+                }
+                if (search.getSize() != null && response.getHits().getTotal() 
< search.getSize()) {
+                    break;
+                }
+                if (queryMaxSize > 0 && results.size() >= queryMaxSize) {
+                    break;
+                }
+                response = client.scroll(SCROLL_CONTEXT_RETENTION, scrollId);
+            }
+        } finally {
+            scrollIds.forEach(client::deleteScrollContextQuietly);
+        }
+
+        return results;
+    }
+}
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
index 733b25895d..cb9ada4e1f 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
@@ -18,25 +18,22 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
-import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
-import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
-import 
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
 import 
org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 
+import java.util.Collections;
+import java.util.List;
+
 @Slf4j
 public class NetworkAddressAliasEsDAO extends EsDAO implements 
INetworkAddressAliasDAO {
     protected final int resultWindowMaxSize;
@@ -51,7 +48,6 @@ public class NetworkAddressAliasEsDAO extends EsDAO 
implements INetworkAddressAl
 
     @Override
     public List<NetworkAddressAlias> loadLastUpdate(long timeBucketInMinute) {
-        List<NetworkAddressAlias> networkAddressAliases = new ArrayList<>();
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(NetworkAddressAlias.INDEX_NAME);
         try {
@@ -63,41 +59,22 @@ public class NetworkAddressAliasEsDAO extends EsDAO 
implements INetworkAddressAl
             query.must(Query.range(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET)
                              .gte(timeBucketInMinute));
 
-            final Search search = 
Search.builder().query(query).size(batchSize).build();
+            final var search = 
Search.builder().query(query).size(batchSize).build();
+            final var builder = new NetworkAddressAlias.Builder();
 
-            final SearchParams params = new 
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
-            final NetworkAddressAlias.Builder builder = new 
NetworkAddressAlias.Builder();
-
-            SearchResponse results =
-                getClient().search(index, search, params);
-            final Set<String> scrollIds = new HashSet<>();
-            try {
-                while (true) {
-                    final String scrollId = results.getScrollId();
-                    scrollIds.add(scrollId);
-                    if (results.getHits().getTotal() == 0) {
-                        break;
-                    }
-                    for (SearchHit searchHit : results.getHits()) {
-                        networkAddressAliases.add(
-                            builder.storage2Entity(
-                                new 
ElasticSearchConverter.ToEntity(NetworkAddressAlias.INDEX_NAME, 
searchHit.getSource())));
-                    }
-                    if (results.getHits().getTotal() < batchSize) {
-                        break;
-                    }
-                    if (networkAddressAliases.size() >= resultWindowMaxSize) {
-                        break;
-                    }
-                    results = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
scrollId);
-                }
-            } finally {
-                scrollIds.forEach(getClient()::deleteScrollContextQuietly);
-            }
+            final var scroller = ElasticSearchScroller
+                .<NetworkAddressAlias>builder()
+                .client(getClient())
+                .search(search)
+                .index(index)
+                .queryMaxSize(resultWindowMaxSize)
+                .resultConverter(searchHit -> builder.storage2Entity(
+                    new 
ElasticSearchConverter.ToEntity(NetworkAddressAlias.INDEX_NAME, 
searchHit.getSource())))
+                .build();
+            return scroller.scroll();
         } catch (Throwable t) {
             log.error(t.getMessage(), t);
+            return Collections.emptyList();
         }
-
-        return networkAddressAliases;
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
index 76a5947c1c..0b8f9b0795 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
@@ -22,23 +22,16 @@ import 
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuil
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
 import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
-import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
-import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
-import 
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord;
 import 
org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 public class EBPFProfilingDataEsDAO extends EsDAO implements 
IEBPFProfilingDataDAO {
     private final int scrollingBatchSize;
@@ -49,7 +42,7 @@ public class EBPFProfilingDataEsDAO extends EsDAO implements 
IEBPFProfilingDataD
     }
 
     @Override
-    public List<EBPFProfilingDataRecord> queryData(List<String> 
scheduleIdList, long beginTime, long endTime) throws IOException {
+    public List<EBPFProfilingDataRecord> queryData(List<String> 
scheduleIdList, long beginTime, long endTime) {
         final String index =
                 
IndexController.LogicIndicesRegister.getPhysicalTableName(EBPFProfilingDataRecord.INDEX_NAME);
         final BoolQueryBuilder query = Query.bool();
@@ -60,39 +53,17 @@ public class EBPFProfilingDataEsDAO extends EsDAO 
implements IEBPFProfilingDataD
         query.must(Query.terms(EBPFProfilingDataRecord.SCHEDULE_ID, 
scheduleIdList));
         
query.must(Query.range(EBPFProfilingDataRecord.UPLOAD_TIME).gte(beginTime).lt(endTime));
 
-        final SearchParams params = new 
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
-        final List<EBPFProfilingDataRecord> records = new ArrayList<>();
-
-        SearchResponse results = getClient().search(index, search.build(), 
params);
-        final Set<String> scrollIds = new HashSet<>();
-        try {
-            while (true) {
-                final String scrollId = results.getScrollId();
-                scrollIds.add(scrollId);
-                if (results.getHits().getTotal() == 0) {
-                    break;
-                }
-                final List<EBPFProfilingDataRecord> batch = 
buildDataList(results);
-                records.addAll(batch);
-                // The last iterate, there is no more data
-                if (batch.size() < scrollingBatchSize) {
-                    break;
-                }
-                results = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
scrollId);
-            }
-        } finally {
-            scrollIds.forEach(getClient()::deleteScrollContextQuietly);
-        }
-        return records;
-    }
-
-    private List<EBPFProfilingDataRecord> buildDataList(SearchResponse 
response) {
-        List<EBPFProfilingDataRecord> records = new ArrayList<>();
-        for (SearchHit hit : response.getHits()) {
-            final Map<String, Object> sourceAsMap = hit.getSource();
-            final EBPFProfilingDataRecord.Builder builder = new 
EBPFProfilingDataRecord.Builder();
-            records.add(builder.storage2Entity(new 
ElasticSearchConverter.ToEntity(EBPFProfilingDataRecord.INDEX_NAME, 
sourceAsMap)));
-        }
-        return records;
+        final var scroller = ElasticSearchScroller
+            .<EBPFProfilingDataRecord>builder()
+            .client(getClient())
+            .search(search.build())
+            .index(index)
+            .resultConverter(hit -> {
+                final var sourceAsMap = hit.getSource();
+                final var builder = new EBPFProfilingDataRecord.Builder();
+                return builder.storage2Entity(new 
ElasticSearchConverter.ToEntity(EBPFProfilingDataRecord.INDEX_NAME, 
sourceAsMap));
+            })
+            .build();
+        return scroller.scroll();
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index a594d18e08..9f2ab1694f 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -22,21 +22,12 @@ import com.google.common.base.Strings;
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 import org.apache.commons.lang3.StringUtils;
 import 
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import 
org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
 import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
-import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
 import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
 import 
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
@@ -57,12 +48,20 @@ import 
org.apache.skywalking.oap.server.core.query.type.Service;
 import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
 import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import static 
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic.PropertyUtil.LANGUAGE;
 
 public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
@@ -74,6 +73,48 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
     private boolean aliasNameInit = false;
     private final int layerSize;
 
+    protected final Function<SearchHit, Service> searchHitServiceFunction = 
hit -> {
+        final var sourceAsMap = hit.getSource();
+        final var builder = new ServiceTraffic.Builder();
+        final var serviceTraffic = builder.storage2Entity(new 
ElasticSearchConverter.ToEntity(ServiceTraffic.INDEX_NAME, sourceAsMap));
+        final var serviceName = serviceTraffic.getName();
+        final var service = new Service();
+        service.setId(serviceTraffic.getServiceId());
+        service.setName(serviceName);
+        service.setShortName(serviceTraffic.getShortName());
+        service.setGroup(serviceTraffic.getGroup());
+        service.getLayers().add(serviceTraffic.getLayer().name());
+        return service;
+    };
+
+    protected final Function<SearchHit, ServiceInstance> 
searchHitServiceInstanceFunction = hit -> {
+        final var sourceAsMap = hit.getSource();
+
+        final var instanceTraffic =
+            new InstanceTraffic.Builder().storage2Entity(new 
ElasticSearchConverter.ToEntity(InstanceTraffic.INDEX_NAME, sourceAsMap));
+
+        final var serviceInstance = new ServiceInstance();
+        serviceInstance.setId(instanceTraffic.id().build());
+        serviceInstance.setName(instanceTraffic.getName());
+        serviceInstance.setInstanceUUID(serviceInstance.getId());
+
+        final var properties = instanceTraffic.getProperties();
+        if (properties != null) {
+            for (final var property : properties.entrySet()) {
+                final var key = property.getKey();
+                final var value = property.getValue().getAsString();
+                if (key.equals(LANGUAGE)) {
+                    serviceInstance.setLanguage(Language.value(value));
+                } else {
+                    serviceInstance.getAttributes().add(new Attribute(key, 
value));
+                }
+            }
+        } else {
+            serviceInstance.setLanguage(Language.UNKNOWN);
+        }
+        return serviceInstance;
+    };
+
     public MetadataQueryEsDAO(
         ElasticSearchClient client,
         StorageModuleElasticsearchConfig config) {
@@ -84,7 +125,7 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
     }
 
     @Override
-    public List<Service> listServices(final String layer, final String group) 
throws IOException {
+    public List<Service> listServices(final String layer, final String group) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME);
 
@@ -101,38 +142,20 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
         if 
(IndexController.LogicIndicesRegister.isMergedTable(ServiceTraffic.INDEX_NAME)) 
{
             
query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, 
ServiceTraffic.INDEX_NAME));
         }
-        final SearchParams params = new 
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
-        final List<Service> services = new ArrayList<>();
-
-        SearchResponse results = getClient().search(index, search.build(), 
params);
-        Set<String> scrollIds = new HashSet<>();
-        try {
-            while (true) {
-                String scrollId = results.getScrollId();
-                scrollIds.add(scrollId);
-                if (results.getHits().getTotal() == 0) {
-                    break;
-                }
-                final List<Service> batch = buildServices(results);
-                services.addAll(batch);
-                // The last iterate, there is no more data
-                if (batch.size() < batchSize) {
-                    break;
-                }
-                // We've got enough data
-                if (services.size() >= queryMaxSize) {
-                    break;
-                }
-                results = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
scrollId);
-            }
-        } finally {
-            scrollIds.forEach(getClient()::deleteScrollContextQuietly);
-        }
-        return services;
+
+        final var scroller = ElasticSearchScroller
+            .<Service>builder()
+            .client(getClient())
+            .search(search.build())
+            .index(index)
+            .queryMaxSize(queryMaxSize)
+            .resultConverter(searchHitServiceFunction)
+            .build();
+        return scroller.scroll();
     }
 
     @Override
-    public List<Service> getServices(final String serviceId) throws 
IOException {
+    public List<Service> getServices(final String serviceId) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME);
         final BoolQueryBuilder query =
@@ -149,7 +172,7 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
 
     @Override
     public List<ServiceInstance> listInstances(Duration duration,
-                                               String serviceId) throws 
IOException {
+                                               String serviceId) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME);
 
@@ -164,24 +187,19 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
         final int batchSize = Math.min(queryMaxSize, scrollingBatchSize);
         final SearchBuilder search = 
Search.builder().query(query).size(batchSize);
 
-        final List<ServiceInstance> instances = new ArrayList<>();
-        SearchResponse response = getClient().search(index, search.build());
-        while (response.getHits().getTotal() > 0) {
-            final List<ServiceInstance> batch = buildInstances(response);
-            instances.addAll(batch);
-            if (batch.size() < batchSize) {
-                break;
-            }
-            if (batch.size() >= queryMaxSize) {
-                break;
-            }
-            response = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
response.getScrollId());
-        }
-        return instances;
+        final var scroller = ElasticSearchScroller
+            .<ServiceInstance>builder()
+            .client(getClient())
+            .search(search.build())
+            .index(index)
+            .queryMaxSize(queryMaxSize)
+            .resultConverter(searchHitServiceInstanceFunction)
+            .build();
+        return scroller.scroll();
     }
 
     @Override
-    public ServiceInstance getInstance(final String instanceId) throws 
IOException {
+    public ServiceInstance getInstance(final String instanceId) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME);
         String id = instanceId;
@@ -199,7 +217,7 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
     }
 
     @Override
-    public List<ServiceInstance> getInstances(List<String> instanceIds) throws 
IOException {
+    public List<ServiceInstance> getInstances(List<String> instanceIds) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME);
         final BoolQueryBuilder query = Query.bool();
@@ -216,8 +234,7 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
     }
 
     @Override
-    public List<Endpoint> findEndpoint(String keyword, String serviceId, int 
limit)
-        throws IOException {
+    public List<Endpoint> findEndpoint(String keyword, String serviceId, int 
limit) {
         initColumnName();
         final String index = 
IndexController.LogicIndicesRegister.getPhysicalTableName(
             EndpointTraffic.INDEX_NAME);
@@ -235,28 +252,32 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
             
query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, 
EndpointTraffic.INDEX_NAME));
         }
 
-        final SearchBuilder search = Search.builder().query(query).size(limit);
+        final var search = Search.builder().query(query).size(limit);
 
-        final SearchResponse response = getClient().search(index, 
search.build());
+        final var scroller = ElasticSearchScroller
+            .<Endpoint>builder()
+            .client(getClient())
+            .search(search.build())
+            .index(index)
+            .queryMaxSize(queryMaxSize)
+            .resultConverter(searchHit -> {
+                final var sourceAsMap = searchHit.getSource();
 
-        List<Endpoint> endpoints = new ArrayList<>();
-        for (SearchHit searchHit : response.getHits()) {
-            Map<String, Object> sourceAsMap = searchHit.getSource();
+                final var endpointTraffic =
+                    new EndpointTraffic.Builder().storage2Entity(new 
ElasticSearchConverter.ToEntity(EndpointTraffic.INDEX_NAME, sourceAsMap));
 
-            final EndpointTraffic endpointTraffic =
-                new EndpointTraffic.Builder().storage2Entity(new 
ElasticSearchConverter.ToEntity(EndpointTraffic.INDEX_NAME, sourceAsMap));
+                final var endpoint = new Endpoint();
+                endpoint.setId(endpointTraffic.id().build());
+                endpoint.setName(endpointTraffic.getName());
+                return endpoint;
+            })
+            .build();
 
-            Endpoint endpoint = new Endpoint();
-            endpoint.setId(endpointTraffic.id().build());
-            endpoint.setName(endpointTraffic.getName());
-            endpoints.add(endpoint);
-        }
-
-        return endpoints;
+        return scroller.scroll();
     }
 
     @Override
-    public List<Process> listProcesses(String serviceId, 
ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long 
lastPingEndTimeBucket) throws IOException {
+    public List<Process> listProcesses(String serviceId, 
ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long 
lastPingEndTimeBucket) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
 
@@ -266,13 +287,20 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
         }
         final SearchBuilder search = 
Search.builder().query(query).size(queryMaxSize);
         appendProcessWhereQuery(query, serviceId, null, null, supportStatus, 
lastPingStartTimeBucket, lastPingEndTimeBucket, false);
-        final SearchResponse results = getClient().search(index, 
search.build());
 
-        return buildProcesses(results);
+        final var scroller = ElasticSearchScroller
+            .<Process>builder()
+            .client(getClient())
+            .search(search.build())
+            .index(index)
+            .queryMaxSize(queryMaxSize)
+            .resultConverter(this::buildProcess)
+            .build();
+        return scroller.scroll();
     }
 
     @Override
-    public List<Process> listProcesses(String serviceInstanceId, Duration 
duration, boolean includeVirtual) throws IOException {
+    public List<Process> listProcesses(String serviceInstanceId, Duration 
duration, boolean includeVirtual) {
         long lastPingStartTimeBucket = duration.getStartTimeBucket();
         long lastPingEndTimeBucket = duration.getEndTimeBucket();
         final String index =
@@ -284,13 +312,20 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
         }
         final SearchBuilder search = 
Search.builder().query(query).size(queryMaxSize);
         appendProcessWhereQuery(query, null, serviceInstanceId, null, null, 
lastPingStartTimeBucket, lastPingEndTimeBucket, includeVirtual);
-        final SearchResponse results = getClient().search(index, 
search.build());
 
-        return buildProcesses(results);
+        final var scroller = ElasticSearchScroller
+            .<Process>builder()
+            .client(getClient())
+            .search(search.build())
+            .index(index)
+            .queryMaxSize(queryMaxSize)
+            .resultConverter(this::buildProcess)
+            .build();
+        return scroller.scroll();
     }
 
     @Override
-    public List<Process> listProcesses(String agentId) throws IOException {
+    public List<Process> listProcesses(String agentId) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
 
@@ -300,13 +335,20 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
         }
         final SearchBuilder search = 
Search.builder().query(query).size(queryMaxSize);
         appendProcessWhereQuery(query, null, null, agentId, null, 0, 0, false);
-        final SearchResponse results = getClient().search(index, 
search.build());
 
-        return buildProcesses(results);
+        final var scroller = ElasticSearchScroller
+            .<Process>builder()
+            .client(getClient())
+            .search(search.build())
+            .index(index)
+            .queryMaxSize(queryMaxSize)
+            .resultConverter(this::buildProcess)
+            .build();
+        return scroller.scroll();
     }
 
     @Override
-    public long getProcessCount(String serviceId, ProfilingSupportStatus 
profilingSupportStatus, long lastPingStartTimeBucket, long 
lastPingEndTimeBucket) throws IOException {
+    public long getProcessCount(String serviceId, ProfilingSupportStatus 
profilingSupportStatus, long lastPingStartTimeBucket, long 
lastPingEndTimeBucket) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
 
@@ -323,7 +365,7 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
     }
 
     @Override
-    public long getProcessCount(String instanceId) throws IOException {
+    public long getProcessCount(String instanceId) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
 
@@ -365,7 +407,7 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
     }
 
     @Override
-    public Process getProcess(String processId) throws IOException {
+    public Process getProcess(String processId) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
         final BoolQueryBuilder query = Query.bool()
@@ -375,25 +417,18 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
         }
         final SearchBuilder search = 
Search.builder().query(query).size(queryMaxSize);
 
-        final SearchResponse response = getClient().search(index, 
search.build());
-        final List<Process> processes = buildProcesses(response);
-        return processes.isEmpty() ? null : processes.get(0);
+        final var response = getClient().search(index, search.build());
+        final var iterator = response.getHits().iterator();
+        if (iterator.hasNext()) {
+            return buildProcess(iterator.next());
+        }
+        return null;
     }
 
     private List<Service> buildServices(SearchResponse response) {
         List<Service> services = new ArrayList<>();
         for (SearchHit hit : response.getHits()) {
-            final Map<String, Object> sourceAsMap = hit.getSource();
-            final ServiceTraffic.Builder builder = new 
ServiceTraffic.Builder();
-            final ServiceTraffic serviceTraffic = builder.storage2Entity(new 
ElasticSearchConverter.ToEntity(ServiceTraffic.INDEX_NAME, sourceAsMap));
-            String serviceName = serviceTraffic.getName();
-            Service service = new Service();
-            service.setId(serviceTraffic.getServiceId());
-            service.setName(serviceName);
-            service.setShortName(serviceTraffic.getShortName());
-            service.setGroup(serviceTraffic.getGroup());
-            service.getLayers().add(serviceTraffic.getLayer().name());
-            services.add(service);
+            services.add(searchHitServiceFunction.apply(hit));
         }
         return services;
     }
@@ -401,72 +436,45 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
     private List<ServiceInstance> buildInstances(SearchResponse response) {
         List<ServiceInstance> serviceInstances = new ArrayList<>();
         for (SearchHit searchHit : response.getHits()) {
-            Map<String, Object> sourceAsMap = searchHit.getSource();
-
-            final InstanceTraffic instanceTraffic =
-                new InstanceTraffic.Builder().storage2Entity(new 
ElasticSearchConverter.ToEntity(InstanceTraffic.INDEX_NAME, sourceAsMap));
-
-            ServiceInstance serviceInstance = new ServiceInstance();
-            serviceInstance.setId(instanceTraffic.id().build());
-            serviceInstance.setName(instanceTraffic.getName());
-            serviceInstance.setInstanceUUID(serviceInstance.getId());
-
-            JsonObject properties = instanceTraffic.getProperties();
-            if (properties != null) {
-                for (Map.Entry<String, JsonElement> property : 
properties.entrySet()) {
-                    String key = property.getKey();
-                    String value = property.getValue().getAsString();
-                    if (key.equals(LANGUAGE)) {
-                        serviceInstance.setLanguage(Language.value(value));
-                    } else {
-                        serviceInstance.getAttributes().add(new Attribute(key, 
value));
-                    }
-                }
-            } else {
-                serviceInstance.setLanguage(Language.UNKNOWN);
-            }
-            serviceInstances.add(serviceInstance);
+            
serviceInstances.add(searchHitServiceInstanceFunction.apply(searchHit));
         }
         return serviceInstances;
     }
 
-    private List<Process> buildProcesses(SearchResponse response) {
-        List<Process> processes = new ArrayList<>();
-        for (SearchHit searchHit : response.getHits()) {
-            Map<String, Object> sourceAsMap = searchHit.getSource();
-
-            final ProcessTraffic processTraffic =
-                new ProcessTraffic.Builder().storage2Entity(new 
ElasticSearchConverter.ToEntity(ProcessTraffic.INDEX_NAME, sourceAsMap));
-
-            Process process = new Process();
-            process.setId(processTraffic.id().build());
-            process.setName(processTraffic.getName());
-            final String serviceId = processTraffic.getServiceId();
-            process.setServiceId(serviceId);
-            
process.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName());
-            final String instanceId = processTraffic.getInstanceId();
-            process.setInstanceId(instanceId);
-            
process.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName());
-            process.setAgentId(processTraffic.getAgentId());
-            
process.setDetectType(ProcessDetectType.valueOf(processTraffic.getDetectType()).name());
-            
process.setProfilingSupportStatus(ProfilingSupportStatus.valueOf(processTraffic.getProfilingSupportStatus()).name());
-
-            JsonObject properties = processTraffic.getProperties();
-            if (properties != null) {
-                for (Map.Entry<String, JsonElement> property : 
properties.entrySet()) {
-                    String key = property.getKey();
-                    String value = property.getValue().getAsString();
-                    process.getAttributes().add(new Attribute(key, value));
-                }
+    private Process buildProcess(final SearchHit searchHit) {
+        Map<String, Object> sourceAsMap = searchHit.getSource();
+
+        final ProcessTraffic processTraffic =
+            new ProcessTraffic.Builder().storage2Entity(new 
ElasticSearchConverter.ToEntity(ProcessTraffic.INDEX_NAME, sourceAsMap));
+
+        Process process = new Process();
+        process.setId(processTraffic.id().build());
+        process.setName(processTraffic.getName());
+        final String serviceId = processTraffic.getServiceId();
+        process.setServiceId(serviceId);
+        
process.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName());
+        final String instanceId = processTraffic.getInstanceId();
+        process.setInstanceId(instanceId);
+        
process.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName());
+        process.setAgentId(processTraffic.getAgentId());
+        
process.setDetectType(ProcessDetectType.valueOf(processTraffic.getDetectType()).name());
+        
process.setProfilingSupportStatus(ProfilingSupportStatus.valueOf(processTraffic.getProfilingSupportStatus())
+                                                                .name());
+
+        JsonObject properties = processTraffic.getProperties();
+        if (properties != null) {
+            for (Map.Entry<String, JsonElement> property : 
properties.entrySet()) {
+                String key = property.getKey();
+                String value = property.getValue().getAsString();
+                process.getAttributes().add(new Attribute(key, value));
             }
-            final String labelsJson = processTraffic.getLabelsJson();
-            if (StringUtils.isNotEmpty(labelsJson)) {
-                final List<String> labels = 
GSON.<List<String>>fromJson(labelsJson, ArrayList.class);
-                process.getLabels().addAll(labels);
-            }
-            processes.add(process);
         }
-        return processes;
+        final String labelsJson = processTraffic.getLabelsJson();
+        if (StringUtils.isNotEmpty(labelsJson)) {
+            final List<String> labels = 
GSON.<List<String>>fromJson(labelsJson, ArrayList.class);
+            process.getLabels().addAll(labels);
+        }
+        return process;
     }
 
     /**
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
index c2494c0a04..f2a3e1e262 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
@@ -22,29 +22,31 @@ import 
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuil
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
 import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
-import 
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
 import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
 import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
-import 
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventRecord;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventTraceType;
 import 
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.function.Function;
 
 public class SpanAttachedEventEsDAO extends EsDAO implements 
ISpanAttachedEventQueryDAO {
     private final int scrollingBatchSize;
 
+    protected Function<SearchHit, SpanAttachedEventRecord> 
searchHitSpanAttachedEventRecordFunction = hit -> {
+        final var sourceAsMap = hit.getSource();
+        final var builder = new SpanAttachedEventRecord.Builder();
+        return builder.storage2Entity(new 
ElasticSearchConverter.ToEntity(SpanAttachedEventRecord.INDEX_NAME, 
sourceAsMap));
+    };
+
     public SpanAttachedEventEsDAO(ElasticSearchClient client, 
StorageModuleElasticsearchConfig config) {
         super(client);
         this.scrollingBatchSize = config.getProfileDataQueryBatchSize();
@@ -64,39 +66,13 @@ public class SpanAttachedEventEsDAO extends EsDAO 
implements ISpanAttachedEventQ
         search.sort(SpanAttachedEventRecord.START_TIME_SECOND, Sort.Order.ASC);
         search.sort(SpanAttachedEventRecord.START_TIME_NANOS, Sort.Order.ASC);
 
-        final SearchParams params = new 
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
-        final List<SpanAttachedEventRecord> records = new ArrayList<>();
-
-        SearchResponse results = getClient().search(index, search.build(), 
params);
-        final Set<String> scrollIds = new HashSet<>();
-        try {
-            while (true) {
-                final String scrollId = results.getScrollId();
-                scrollIds.add(scrollId);
-                if (results.getHits().getTotal() == 0) {
-                    break;
-                }
-                final List<SpanAttachedEventRecord> batch = 
buildDataList(results);
-                records.addAll(batch);
-                // The last iterate, there is no more data
-                if (batch.size() < scrollingBatchSize) {
-                    break;
-                }
-                results = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
scrollId);
-            }
-        } finally {
-            scrollIds.forEach(getClient()::deleteScrollContextQuietly);
-        }
-        return records;
-    }
-
-    private List<SpanAttachedEventRecord> buildDataList(SearchResponse 
response) {
-        final ArrayList<SpanAttachedEventRecord> records = new ArrayList<>();
-        for (SearchHit hit : response.getHits()) {
-            final Map<String, Object> sourceAsMap = hit.getSource();
-            final SpanAttachedEventRecord.Builder builder = new 
SpanAttachedEventRecord.Builder();
-            records.add(builder.storage2Entity(new 
ElasticSearchConverter.ToEntity(SpanAttachedEventRecord.INDEX_NAME, 
sourceAsMap)));
-        }
-        return records;
+        final var scroller = ElasticSearchScroller
+            .<SpanAttachedEventRecord>builder()
+            .client(getClient())
+            .search(search.build())
+            .index(index)
+            .resultConverter(searchHitSpanAttachedEventRecordFunction)
+            .build();
+        return scroller.scroll();
     }
-}
\ No newline at end of file
+}
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
index 8a69573abe..c9c88a4b57 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
@@ -18,13 +18,6 @@
 
 package 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.zipkin;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import 
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
@@ -44,6 +37,7 @@ import 
org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceSpanTraffic;
 import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceTraffic;
 import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
@@ -54,6 +48,14 @@ import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRa
 import zipkin2.Span;
 import zipkin2.storage.QueryRequest;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
 public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO {
     private final static int NAME_QUERY_MAX_SIZE = 10000;
     private final static int SCROLLING_BATCH_SIZE = 5000;
@@ -71,30 +73,20 @@ public class ZipkinQueryEsDAO extends EsDAO implements 
IZipkinQueryDAO {
             
query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, 
ZipkinServiceTraffic.INDEX_NAME));
         }
         final SearchBuilder search = 
Search.builder().query(query).size(SCROLLING_BATCH_SIZE);
-        final SearchParams params = new 
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
-        final List<String> services = new ArrayList<>();
 
-        SearchResponse response = getClient().search(index, search.build(), 
params);
-        final Set<String> scrollIds = new HashSet<>();
-        try {
-            while (response.getHits().getHits().size() != 0) {
-                String scrollId = response.getScrollId();
-                scrollIds.add(scrollId);
-                for (SearchHit searchHit : response.getHits()) {
-                    Map<String, Object> sourceAsMap = searchHit.getSource();
-                    ZipkinServiceTraffic record = new 
ZipkinServiceTraffic.Builder().storage2Entity(
-                        new 
ElasticSearchConverter.ToEntity(ZipkinServiceTraffic.INDEX_NAME, sourceAsMap));
-                    services.add(record.getServiceName());
-                }
-                if (services.size() < SCROLLING_BATCH_SIZE) {
-                    break;
-                }
-                response = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
scrollId);
-            }
-        } finally {
-            scrollIds.forEach(getClient()::deleteScrollContextQuietly);
-        }
-        return services;
+        final var scroller = ElasticSearchScroller
+            .<String>builder()
+            .client(getClient())
+            .search(search.build())
+            .index(index)
+            .resultConverter(hit -> {
+                final var sourceAsMap = hit.getSource();
+                final var record = new 
ZipkinServiceTraffic.Builder().storage2Entity(
+                    new 
ElasticSearchConverter.ToEntity(ZipkinServiceTraffic.INDEX_NAME, sourceAsMap));
+                return record.getServiceName();
+            })
+            .build();
+        return scroller.scroll();
     }
 
     @Override
@@ -141,30 +133,23 @@ public class ZipkinQueryEsDAO extends EsDAO implements 
IZipkinQueryDAO {
         String index = 
IndexController.LogicIndicesRegister.getPhysicalTableName(ZipkinSpanRecord.INDEX_NAME);
         BoolQueryBuilder query = 
Query.bool().must(Query.term(ZipkinSpanRecord.TRACE_ID, traceId));
         SearchBuilder search = 
Search.builder().query(query).size(SCROLLING_BATCH_SIZE);
-        final SearchParams params = new 
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
+        final var params = new SearchParams();
         RoutingUtils.addRoutingValueToSearchParam(params, traceId);
-        SearchResponse response = getClient().search(index, search.build(), 
params);
-        final Set<String> scrollIds = new HashSet<>();
-        List<Span> trace = new ArrayList<>();
-        try {
-            while (response.getHits().getHits().size() != 0) {
-                String scrollId = response.getScrollId();
-                scrollIds.add(scrollId);
-                for (SearchHit searchHit : response.getHits()) {
-                    Map<String, Object> sourceAsMap = searchHit.getSource();
-                    ZipkinSpanRecord record = new 
ZipkinSpanRecord.Builder().storage2Entity(
-                        new 
ElasticSearchConverter.ToEntity(ZipkinSpanRecord.INDEX_NAME, sourceAsMap));
-                    trace.add(ZipkinSpanRecord.buildSpanFromRecord(record));
-                }
-                if (response.getHits().getHits().size() < 
SCROLLING_BATCH_SIZE) {
-                    break;
-                }
-                response = getClient().scroll(SCROLL_CONTEXT_RETENTION, 
scrollId);
-            }
-        } finally {
-            scrollIds.forEach(getClient()::deleteScrollContextQuietly);
-        }
-        return trace;
+
+        final var scroller = ElasticSearchScroller
+            .<Span>builder()
+            .client(getClient())
+            .search(search.build())
+            .index(index)
+            .params(params)
+            .resultConverter(searchHit -> {
+                final var sourceAsMap = searchHit.getSource();
+                final var record = new 
ZipkinSpanRecord.Builder().storage2Entity(
+                    new 
ElasticSearchConverter.ToEntity(ZipkinSpanRecord.INDEX_NAME, sourceAsMap));
+                return ZipkinSpanRecord.buildSpanFromRecord(record);
+            })
+            .build();
+        return scroller.scroll();
     }
 
     @Override

Reply via email to