This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new cb165eb2ca Scroll all results of Endpoint in ElasticSearch storage and
refactor scrolling logics to reduce boilerplate codes (#10986)
cb165eb2ca is described below
commit cb165eb2ca8656ae2e3c5c0b71ff90b7658d80e7
Author: kezhenxu94 <[email protected]>
AuthorDate: Sat Jun 24 23:01:56 2023 +0800
Scroll all results of Endpoint in ElasticSearch storage and refactor
scrolling logics to reduce boilerplate codes (#10986)
---
docs/en/changes/changes.md | 1 +
.../elasticsearch/ElasticSearchScroller.java | 79 +++++
.../cache/NetworkAddressAliasEsDAO.java | 57 ++--
.../query/EBPFProfilingDataEsDAO.java | 57 +---
.../elasticsearch/query/MetadataQueryEsDAO.java | 324 +++++++++++----------
.../query/SpanAttachedEventEsDAO.java | 58 ++--
.../query/zipkin/ZipkinQueryEsDAO.java | 91 +++---
7 files changed, 332 insertions(+), 335 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index a5dc7e8e88..40c047bded 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -16,6 +16,7 @@
* Fix AI Pipeline uri caching NullPointer and IllegalArgument Exceptions.
* Fix `NPE` in metrics query when the metric is not exist.
* Remove E2E tests for Istio < 1.15, ElasticSearch < 7.16.3, they might still
work but are not supported as planed.
+* Scroll all results in ElasticSearch storage and refactor scrolling logics,
including Service, Instance, Endpoint, Process, etc.
#### UI
* Fix metric name `browser_app_error_rate` in `Browser-Root` dashboard.
diff --git
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchScroller.java
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchScroller.java
new file mode 100644
index 0000000000..55eba02fd2
--- /dev/null
+++
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchScroller.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+import lombok.Builder;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.function.Function;
+
+@Builder
+@RequiredArgsConstructor
+public class ElasticSearchScroller<T> {
+ public static final Duration SCROLL_CONTEXT_RETENTION =
Duration.ofSeconds(30);
+
+ final ElasticSearchClient client;
+ final Search search;
+ final String index;
+ @Builder.Default
+ final int queryMaxSize = 0;
+ @Builder.Default
+ final SearchParams params = new SearchParams();
+ final Function<SearchHit, T> resultConverter;
+
+ public List<T> scroll() {
+ final var results = new ArrayList<T>();
+ final var scrollIds = new HashSet<String>();
+
+ params.scroll(SCROLL_CONTEXT_RETENTION);
+
+ var response = client.search(index, search, params);
+
+ try {
+ while (true) {
+ final var scrollId = response.getScrollId();
+ scrollIds.add(scrollId);
+ if (response.getHits().getTotal() == 0) {
+ break;
+ }
+ for (final var searchHit : response.getHits()) {
+ results.add(resultConverter.apply(searchHit));
+ }
+ if (search.getSize() != null && response.getHits().getTotal()
< search.getSize()) {
+ break;
+ }
+ if (queryMaxSize > 0 && results.size() >= queryMaxSize) {
+ break;
+ }
+ response = client.scroll(SCROLL_CONTEXT_RETENTION, scrollId);
+ }
+ } finally {
+ scrollIds.forEach(client::deleteScrollContextQuietly);
+ }
+
+ return results;
+ }
+}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
index 733b25895d..cb9ada4e1f 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
@@ -18,25 +18,22 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
-import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
-import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
-import
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import
org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
import
org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
+import java.util.Collections;
+import java.util.List;
+
@Slf4j
public class NetworkAddressAliasEsDAO extends EsDAO implements
INetworkAddressAliasDAO {
protected final int resultWindowMaxSize;
@@ -51,7 +48,6 @@ public class NetworkAddressAliasEsDAO extends EsDAO
implements INetworkAddressAl
@Override
public List<NetworkAddressAlias> loadLastUpdate(long timeBucketInMinute) {
- List<NetworkAddressAlias> networkAddressAliases = new ArrayList<>();
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(NetworkAddressAlias.INDEX_NAME);
try {
@@ -63,41 +59,22 @@ public class NetworkAddressAliasEsDAO extends EsDAO
implements INetworkAddressAl
query.must(Query.range(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET)
.gte(timeBucketInMinute));
- final Search search =
Search.builder().query(query).size(batchSize).build();
+ final var search =
Search.builder().query(query).size(batchSize).build();
+ final var builder = new NetworkAddressAlias.Builder();
- final SearchParams params = new
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
- final NetworkAddressAlias.Builder builder = new
NetworkAddressAlias.Builder();
-
- SearchResponse results =
- getClient().search(index, search, params);
- final Set<String> scrollIds = new HashSet<>();
- try {
- while (true) {
- final String scrollId = results.getScrollId();
- scrollIds.add(scrollId);
- if (results.getHits().getTotal() == 0) {
- break;
- }
- for (SearchHit searchHit : results.getHits()) {
- networkAddressAliases.add(
- builder.storage2Entity(
- new
ElasticSearchConverter.ToEntity(NetworkAddressAlias.INDEX_NAME,
searchHit.getSource())));
- }
- if (results.getHits().getTotal() < batchSize) {
- break;
- }
- if (networkAddressAliases.size() >= resultWindowMaxSize) {
- break;
- }
- results = getClient().scroll(SCROLL_CONTEXT_RETENTION,
scrollId);
- }
- } finally {
- scrollIds.forEach(getClient()::deleteScrollContextQuietly);
- }
+ final var scroller = ElasticSearchScroller
+ .<NetworkAddressAlias>builder()
+ .client(getClient())
+ .search(search)
+ .index(index)
+ .queryMaxSize(resultWindowMaxSize)
+ .resultConverter(searchHit -> builder.storage2Entity(
+ new
ElasticSearchConverter.ToEntity(NetworkAddressAlias.INDEX_NAME,
searchHit.getSource())))
+ .build();
+ return scroller.scroll();
} catch (Throwable t) {
log.error(t.getMessage(), t);
+ return Collections.emptyList();
}
-
- return networkAddressAliases;
}
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
index 76a5947c1c..0b8f9b0795 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
@@ -22,23 +22,16 @@ import
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuil
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import
org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
-import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
-import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
-import
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord;
import
org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
public class EBPFProfilingDataEsDAO extends EsDAO implements
IEBPFProfilingDataDAO {
private final int scrollingBatchSize;
@@ -49,7 +42,7 @@ public class EBPFProfilingDataEsDAO extends EsDAO implements
IEBPFProfilingDataD
}
@Override
- public List<EBPFProfilingDataRecord> queryData(List<String>
scheduleIdList, long beginTime, long endTime) throws IOException {
+ public List<EBPFProfilingDataRecord> queryData(List<String>
scheduleIdList, long beginTime, long endTime) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(EBPFProfilingDataRecord.INDEX_NAME);
final BoolQueryBuilder query = Query.bool();
@@ -60,39 +53,17 @@ public class EBPFProfilingDataEsDAO extends EsDAO
implements IEBPFProfilingDataD
query.must(Query.terms(EBPFProfilingDataRecord.SCHEDULE_ID,
scheduleIdList));
query.must(Query.range(EBPFProfilingDataRecord.UPLOAD_TIME).gte(beginTime).lt(endTime));
- final SearchParams params = new
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
- final List<EBPFProfilingDataRecord> records = new ArrayList<>();
-
- SearchResponse results = getClient().search(index, search.build(),
params);
- final Set<String> scrollIds = new HashSet<>();
- try {
- while (true) {
- final String scrollId = results.getScrollId();
- scrollIds.add(scrollId);
- if (results.getHits().getTotal() == 0) {
- break;
- }
- final List<EBPFProfilingDataRecord> batch =
buildDataList(results);
- records.addAll(batch);
- // The last iterate, there is no more data
- if (batch.size() < scrollingBatchSize) {
- break;
- }
- results = getClient().scroll(SCROLL_CONTEXT_RETENTION,
scrollId);
- }
- } finally {
- scrollIds.forEach(getClient()::deleteScrollContextQuietly);
- }
- return records;
- }
-
- private List<EBPFProfilingDataRecord> buildDataList(SearchResponse
response) {
- List<EBPFProfilingDataRecord> records = new ArrayList<>();
- for (SearchHit hit : response.getHits()) {
- final Map<String, Object> sourceAsMap = hit.getSource();
- final EBPFProfilingDataRecord.Builder builder = new
EBPFProfilingDataRecord.Builder();
- records.add(builder.storage2Entity(new
ElasticSearchConverter.ToEntity(EBPFProfilingDataRecord.INDEX_NAME,
sourceAsMap)));
- }
- return records;
+ final var scroller = ElasticSearchScroller
+ .<EBPFProfilingDataRecord>builder()
+ .client(getClient())
+ .search(search.build())
+ .index(index)
+ .resultConverter(hit -> {
+ final var sourceAsMap = hit.getSource();
+ final var builder = new EBPFProfilingDataRecord.Builder();
+ return builder.storage2Entity(new
ElasticSearchConverter.ToEntity(EBPFProfilingDataRecord.INDEX_NAME,
sourceAsMap));
+ })
+ .build();
+ return scroller.scroll();
}
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index a594d18e08..9f2ab1694f 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -22,21 +22,12 @@ import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
import org.apache.commons.lang3.StringUtils;
import
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import
org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import
org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
-import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
@@ -57,12 +48,20 @@ import
org.apache.skywalking.oap.server.core.query.type.Service;
import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
import static
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic.PropertyUtil.LANGUAGE;
public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
@@ -74,6 +73,48 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
private boolean aliasNameInit = false;
private final int layerSize;
+ protected final Function<SearchHit, Service> searchHitServiceFunction =
hit -> {
+ final var sourceAsMap = hit.getSource();
+ final var builder = new ServiceTraffic.Builder();
+ final var serviceTraffic = builder.storage2Entity(new
ElasticSearchConverter.ToEntity(ServiceTraffic.INDEX_NAME, sourceAsMap));
+ final var serviceName = serviceTraffic.getName();
+ final var service = new Service();
+ service.setId(serviceTraffic.getServiceId());
+ service.setName(serviceName);
+ service.setShortName(serviceTraffic.getShortName());
+ service.setGroup(serviceTraffic.getGroup());
+ service.getLayers().add(serviceTraffic.getLayer().name());
+ return service;
+ };
+
+ protected final Function<SearchHit, ServiceInstance>
searchHitServiceInstanceFunction = hit -> {
+ final var sourceAsMap = hit.getSource();
+
+ final var instanceTraffic =
+ new InstanceTraffic.Builder().storage2Entity(new
ElasticSearchConverter.ToEntity(InstanceTraffic.INDEX_NAME, sourceAsMap));
+
+ final var serviceInstance = new ServiceInstance();
+ serviceInstance.setId(instanceTraffic.id().build());
+ serviceInstance.setName(instanceTraffic.getName());
+ serviceInstance.setInstanceUUID(serviceInstance.getId());
+
+ final var properties = instanceTraffic.getProperties();
+ if (properties != null) {
+ for (final var property : properties.entrySet()) {
+ final var key = property.getKey();
+ final var value = property.getValue().getAsString();
+ if (key.equals(LANGUAGE)) {
+ serviceInstance.setLanguage(Language.value(value));
+ } else {
+ serviceInstance.getAttributes().add(new Attribute(key,
value));
+ }
+ }
+ } else {
+ serviceInstance.setLanguage(Language.UNKNOWN);
+ }
+ return serviceInstance;
+ };
+
public MetadataQueryEsDAO(
ElasticSearchClient client,
StorageModuleElasticsearchConfig config) {
@@ -84,7 +125,7 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
}
@Override
- public List<Service> listServices(final String layer, final String group)
throws IOException {
+ public List<Service> listServices(final String layer, final String group) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME);
@@ -101,38 +142,20 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
if
(IndexController.LogicIndicesRegister.isMergedTable(ServiceTraffic.INDEX_NAME))
{
query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
ServiceTraffic.INDEX_NAME));
}
- final SearchParams params = new
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
- final List<Service> services = new ArrayList<>();
-
- SearchResponse results = getClient().search(index, search.build(),
params);
- Set<String> scrollIds = new HashSet<>();
- try {
- while (true) {
- String scrollId = results.getScrollId();
- scrollIds.add(scrollId);
- if (results.getHits().getTotal() == 0) {
- break;
- }
- final List<Service> batch = buildServices(results);
- services.addAll(batch);
- // The last iterate, there is no more data
- if (batch.size() < batchSize) {
- break;
- }
- // We've got enough data
- if (services.size() >= queryMaxSize) {
- break;
- }
- results = getClient().scroll(SCROLL_CONTEXT_RETENTION,
scrollId);
- }
- } finally {
- scrollIds.forEach(getClient()::deleteScrollContextQuietly);
- }
- return services;
+
+ final var scroller = ElasticSearchScroller
+ .<Service>builder()
+ .client(getClient())
+ .search(search.build())
+ .index(index)
+ .queryMaxSize(queryMaxSize)
+ .resultConverter(searchHitServiceFunction)
+ .build();
+ return scroller.scroll();
}
@Override
- public List<Service> getServices(final String serviceId) throws
IOException {
+ public List<Service> getServices(final String serviceId) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME);
final BoolQueryBuilder query =
@@ -149,7 +172,7 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
@Override
public List<ServiceInstance> listInstances(Duration duration,
- String serviceId) throws
IOException {
+ String serviceId) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME);
@@ -164,24 +187,19 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
final int batchSize = Math.min(queryMaxSize, scrollingBatchSize);
final SearchBuilder search =
Search.builder().query(query).size(batchSize);
- final List<ServiceInstance> instances = new ArrayList<>();
- SearchResponse response = getClient().search(index, search.build());
- while (response.getHits().getTotal() > 0) {
- final List<ServiceInstance> batch = buildInstances(response);
- instances.addAll(batch);
- if (batch.size() < batchSize) {
- break;
- }
- if (batch.size() >= queryMaxSize) {
- break;
- }
- response = getClient().scroll(SCROLL_CONTEXT_RETENTION,
response.getScrollId());
- }
- return instances;
+ final var scroller = ElasticSearchScroller
+ .<ServiceInstance>builder()
+ .client(getClient())
+ .search(search.build())
+ .index(index)
+ .queryMaxSize(queryMaxSize)
+ .resultConverter(searchHitServiceInstanceFunction)
+ .build();
+ return scroller.scroll();
}
@Override
- public ServiceInstance getInstance(final String instanceId) throws
IOException {
+ public ServiceInstance getInstance(final String instanceId) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME);
String id = instanceId;
@@ -199,7 +217,7 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
}
@Override
- public List<ServiceInstance> getInstances(List<String> instanceIds) throws
IOException {
+ public List<ServiceInstance> getInstances(List<String> instanceIds) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME);
final BoolQueryBuilder query = Query.bool();
@@ -216,8 +234,7 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
}
@Override
- public List<Endpoint> findEndpoint(String keyword, String serviceId, int
limit)
- throws IOException {
+ public List<Endpoint> findEndpoint(String keyword, String serviceId, int
limit) {
initColumnName();
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(
EndpointTraffic.INDEX_NAME);
@@ -235,28 +252,32 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
EndpointTraffic.INDEX_NAME));
}
- final SearchBuilder search = Search.builder().query(query).size(limit);
+ final var search = Search.builder().query(query).size(limit);
- final SearchResponse response = getClient().search(index,
search.build());
+ final var scroller = ElasticSearchScroller
+ .<Endpoint>builder()
+ .client(getClient())
+ .search(search.build())
+ .index(index)
+ .queryMaxSize(queryMaxSize)
+ .resultConverter(searchHit -> {
+ final var sourceAsMap = searchHit.getSource();
- List<Endpoint> endpoints = new ArrayList<>();
- for (SearchHit searchHit : response.getHits()) {
- Map<String, Object> sourceAsMap = searchHit.getSource();
+ final var endpointTraffic =
+ new EndpointTraffic.Builder().storage2Entity(new
ElasticSearchConverter.ToEntity(EndpointTraffic.INDEX_NAME, sourceAsMap));
- final EndpointTraffic endpointTraffic =
- new EndpointTraffic.Builder().storage2Entity(new
ElasticSearchConverter.ToEntity(EndpointTraffic.INDEX_NAME, sourceAsMap));
+ final var endpoint = new Endpoint();
+ endpoint.setId(endpointTraffic.id().build());
+ endpoint.setName(endpointTraffic.getName());
+ return endpoint;
+ })
+ .build();
- Endpoint endpoint = new Endpoint();
- endpoint.setId(endpointTraffic.id().build());
- endpoint.setName(endpointTraffic.getName());
- endpoints.add(endpoint);
- }
-
- return endpoints;
+ return scroller.scroll();
}
@Override
- public List<Process> listProcesses(String serviceId,
ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long
lastPingEndTimeBucket) throws IOException {
+ public List<Process> listProcesses(String serviceId,
ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long
lastPingEndTimeBucket) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
@@ -266,13 +287,20 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
}
final SearchBuilder search =
Search.builder().query(query).size(queryMaxSize);
appendProcessWhereQuery(query, serviceId, null, null, supportStatus,
lastPingStartTimeBucket, lastPingEndTimeBucket, false);
- final SearchResponse results = getClient().search(index,
search.build());
- return buildProcesses(results);
+ final var scroller = ElasticSearchScroller
+ .<Process>builder()
+ .client(getClient())
+ .search(search.build())
+ .index(index)
+ .queryMaxSize(queryMaxSize)
+ .resultConverter(this::buildProcess)
+ .build();
+ return scroller.scroll();
}
@Override
- public List<Process> listProcesses(String serviceInstanceId, Duration
duration, boolean includeVirtual) throws IOException {
+ public List<Process> listProcesses(String serviceInstanceId, Duration
duration, boolean includeVirtual) {
long lastPingStartTimeBucket = duration.getStartTimeBucket();
long lastPingEndTimeBucket = duration.getEndTimeBucket();
final String index =
@@ -284,13 +312,20 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
}
final SearchBuilder search =
Search.builder().query(query).size(queryMaxSize);
appendProcessWhereQuery(query, null, serviceInstanceId, null, null,
lastPingStartTimeBucket, lastPingEndTimeBucket, includeVirtual);
- final SearchResponse results = getClient().search(index,
search.build());
- return buildProcesses(results);
+ final var scroller = ElasticSearchScroller
+ .<Process>builder()
+ .client(getClient())
+ .search(search.build())
+ .index(index)
+ .queryMaxSize(queryMaxSize)
+ .resultConverter(this::buildProcess)
+ .build();
+ return scroller.scroll();
}
@Override
- public List<Process> listProcesses(String agentId) throws IOException {
+ public List<Process> listProcesses(String agentId) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
@@ -300,13 +335,20 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
}
final SearchBuilder search =
Search.builder().query(query).size(queryMaxSize);
appendProcessWhereQuery(query, null, null, agentId, null, 0, 0, false);
- final SearchResponse results = getClient().search(index,
search.build());
- return buildProcesses(results);
+ final var scroller = ElasticSearchScroller
+ .<Process>builder()
+ .client(getClient())
+ .search(search.build())
+ .index(index)
+ .queryMaxSize(queryMaxSize)
+ .resultConverter(this::buildProcess)
+ .build();
+ return scroller.scroll();
}
@Override
- public long getProcessCount(String serviceId, ProfilingSupportStatus
profilingSupportStatus, long lastPingStartTimeBucket, long
lastPingEndTimeBucket) throws IOException {
+ public long getProcessCount(String serviceId, ProfilingSupportStatus
profilingSupportStatus, long lastPingStartTimeBucket, long
lastPingEndTimeBucket) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
@@ -323,7 +365,7 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
}
@Override
- public long getProcessCount(String instanceId) throws IOException {
+ public long getProcessCount(String instanceId) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
@@ -365,7 +407,7 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
}
@Override
- public Process getProcess(String processId) throws IOException {
+ public Process getProcess(String processId) {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
final BoolQueryBuilder query = Query.bool()
@@ -375,25 +417,18 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
}
final SearchBuilder search =
Search.builder().query(query).size(queryMaxSize);
- final SearchResponse response = getClient().search(index,
search.build());
- final List<Process> processes = buildProcesses(response);
- return processes.isEmpty() ? null : processes.get(0);
+ final var response = getClient().search(index, search.build());
+ final var iterator = response.getHits().iterator();
+ if (iterator.hasNext()) {
+ return buildProcess(iterator.next());
+ }
+ return null;
}
private List<Service> buildServices(SearchResponse response) {
List<Service> services = new ArrayList<>();
for (SearchHit hit : response.getHits()) {
- final Map<String, Object> sourceAsMap = hit.getSource();
- final ServiceTraffic.Builder builder = new
ServiceTraffic.Builder();
- final ServiceTraffic serviceTraffic = builder.storage2Entity(new
ElasticSearchConverter.ToEntity(ServiceTraffic.INDEX_NAME, sourceAsMap));
- String serviceName = serviceTraffic.getName();
- Service service = new Service();
- service.setId(serviceTraffic.getServiceId());
- service.setName(serviceName);
- service.setShortName(serviceTraffic.getShortName());
- service.setGroup(serviceTraffic.getGroup());
- service.getLayers().add(serviceTraffic.getLayer().name());
- services.add(service);
+ services.add(searchHitServiceFunction.apply(hit));
}
return services;
}
@@ -401,72 +436,45 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
private List<ServiceInstance> buildInstances(SearchResponse response) {
List<ServiceInstance> serviceInstances = new ArrayList<>();
for (SearchHit searchHit : response.getHits()) {
- Map<String, Object> sourceAsMap = searchHit.getSource();
-
- final InstanceTraffic instanceTraffic =
- new InstanceTraffic.Builder().storage2Entity(new
ElasticSearchConverter.ToEntity(InstanceTraffic.INDEX_NAME, sourceAsMap));
-
- ServiceInstance serviceInstance = new ServiceInstance();
- serviceInstance.setId(instanceTraffic.id().build());
- serviceInstance.setName(instanceTraffic.getName());
- serviceInstance.setInstanceUUID(serviceInstance.getId());
-
- JsonObject properties = instanceTraffic.getProperties();
- if (properties != null) {
- for (Map.Entry<String, JsonElement> property :
properties.entrySet()) {
- String key = property.getKey();
- String value = property.getValue().getAsString();
- if (key.equals(LANGUAGE)) {
- serviceInstance.setLanguage(Language.value(value));
- } else {
- serviceInstance.getAttributes().add(new Attribute(key,
value));
- }
- }
- } else {
- serviceInstance.setLanguage(Language.UNKNOWN);
- }
- serviceInstances.add(serviceInstance);
+
serviceInstances.add(searchHitServiceInstanceFunction.apply(searchHit));
}
return serviceInstances;
}
- private List<Process> buildProcesses(SearchResponse response) {
- List<Process> processes = new ArrayList<>();
- for (SearchHit searchHit : response.getHits()) {
- Map<String, Object> sourceAsMap = searchHit.getSource();
-
- final ProcessTraffic processTraffic =
- new ProcessTraffic.Builder().storage2Entity(new
ElasticSearchConverter.ToEntity(ProcessTraffic.INDEX_NAME, sourceAsMap));
-
- Process process = new Process();
- process.setId(processTraffic.id().build());
- process.setName(processTraffic.getName());
- final String serviceId = processTraffic.getServiceId();
- process.setServiceId(serviceId);
-
process.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName());
- final String instanceId = processTraffic.getInstanceId();
- process.setInstanceId(instanceId);
-
process.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName());
- process.setAgentId(processTraffic.getAgentId());
-
process.setDetectType(ProcessDetectType.valueOf(processTraffic.getDetectType()).name());
-
process.setProfilingSupportStatus(ProfilingSupportStatus.valueOf(processTraffic.getProfilingSupportStatus()).name());
-
- JsonObject properties = processTraffic.getProperties();
- if (properties != null) {
- for (Map.Entry<String, JsonElement> property :
properties.entrySet()) {
- String key = property.getKey();
- String value = property.getValue().getAsString();
- process.getAttributes().add(new Attribute(key, value));
- }
+ private Process buildProcess(final SearchHit searchHit) {
+ Map<String, Object> sourceAsMap = searchHit.getSource();
+
+ final ProcessTraffic processTraffic =
+ new ProcessTraffic.Builder().storage2Entity(new
ElasticSearchConverter.ToEntity(ProcessTraffic.INDEX_NAME, sourceAsMap));
+
+ Process process = new Process();
+ process.setId(processTraffic.id().build());
+ process.setName(processTraffic.getName());
+ final String serviceId = processTraffic.getServiceId();
+ process.setServiceId(serviceId);
+
process.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName());
+ final String instanceId = processTraffic.getInstanceId();
+ process.setInstanceId(instanceId);
+
process.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName());
+ process.setAgentId(processTraffic.getAgentId());
+
process.setDetectType(ProcessDetectType.valueOf(processTraffic.getDetectType()).name());
+
process.setProfilingSupportStatus(ProfilingSupportStatus.valueOf(processTraffic.getProfilingSupportStatus())
+ .name());
+
+ JsonObject properties = processTraffic.getProperties();
+ if (properties != null) {
+ for (Map.Entry<String, JsonElement> property :
properties.entrySet()) {
+ String key = property.getKey();
+ String value = property.getValue().getAsString();
+ process.getAttributes().add(new Attribute(key, value));
}
- final String labelsJson = processTraffic.getLabelsJson();
- if (StringUtils.isNotEmpty(labelsJson)) {
- final List<String> labels =
GSON.<List<String>>fromJson(labelsJson, ArrayList.class);
- process.getLabels().addAll(labels);
- }
- processes.add(process);
}
- return processes;
+ final String labelsJson = processTraffic.getLabelsJson();
+ if (StringUtils.isNotEmpty(labelsJson)) {
+ final List<String> labels =
GSON.<List<String>>fromJson(labelsJson, ArrayList.class);
+ process.getLabels().addAll(labels);
+ }
+ return process;
}
/**
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
index c2494c0a04..f2a3e1e262 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
@@ -22,29 +22,31 @@ import
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuil
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import
org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
-import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
-import
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventRecord;
import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventTraceType;
import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.function.Function;
public class SpanAttachedEventEsDAO extends EsDAO implements
ISpanAttachedEventQueryDAO {
private final int scrollingBatchSize;
+ protected Function<SearchHit, SpanAttachedEventRecord>
searchHitSpanAttachedEventRecordFunction = hit -> {
+ final var sourceAsMap = hit.getSource();
+ final var builder = new SpanAttachedEventRecord.Builder();
+ return builder.storage2Entity(new
ElasticSearchConverter.ToEntity(SpanAttachedEventRecord.INDEX_NAME,
sourceAsMap));
+ };
+
public SpanAttachedEventEsDAO(ElasticSearchClient client,
StorageModuleElasticsearchConfig config) {
super(client);
this.scrollingBatchSize = config.getProfileDataQueryBatchSize();
@@ -64,39 +66,13 @@ public class SpanAttachedEventEsDAO extends EsDAO
implements ISpanAttachedEventQ
search.sort(SpanAttachedEventRecord.START_TIME_SECOND, Sort.Order.ASC);
search.sort(SpanAttachedEventRecord.START_TIME_NANOS, Sort.Order.ASC);
- final SearchParams params = new
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
- final List<SpanAttachedEventRecord> records = new ArrayList<>();
-
- SearchResponse results = getClient().search(index, search.build(),
params);
- final Set<String> scrollIds = new HashSet<>();
- try {
- while (true) {
- final String scrollId = results.getScrollId();
- scrollIds.add(scrollId);
- if (results.getHits().getTotal() == 0) {
- break;
- }
- final List<SpanAttachedEventRecord> batch =
buildDataList(results);
- records.addAll(batch);
- // The last iterate, there is no more data
- if (batch.size() < scrollingBatchSize) {
- break;
- }
- results = getClient().scroll(SCROLL_CONTEXT_RETENTION,
scrollId);
- }
- } finally {
- scrollIds.forEach(getClient()::deleteScrollContextQuietly);
- }
- return records;
- }
-
- private List<SpanAttachedEventRecord> buildDataList(SearchResponse
response) {
- final ArrayList<SpanAttachedEventRecord> records = new ArrayList<>();
- for (SearchHit hit : response.getHits()) {
- final Map<String, Object> sourceAsMap = hit.getSource();
- final SpanAttachedEventRecord.Builder builder = new
SpanAttachedEventRecord.Builder();
- records.add(builder.storage2Entity(new
ElasticSearchConverter.ToEntity(SpanAttachedEventRecord.INDEX_NAME,
sourceAsMap)));
- }
- return records;
+ final var scroller = ElasticSearchScroller
+ .<SpanAttachedEventRecord>builder()
+ .client(getClient())
+ .search(search.build())
+ .index(index)
+ .resultConverter(searchHitSpanAttachedEventRecordFunction)
+ .build();
+ return scroller.scroll();
}
-}
\ No newline at end of file
+}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
index 8a69573abe..c9c88a4b57 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
@@ -18,13 +18,6 @@
package
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.zipkin;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
import
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
@@ -44,6 +37,7 @@ import
org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceSpanTraffic;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceTraffic;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
@@ -54,6 +48,14 @@ import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRa
import zipkin2.Span;
import zipkin2.storage.QueryRequest;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO {
private final static int NAME_QUERY_MAX_SIZE = 10000;
private final static int SCROLLING_BATCH_SIZE = 5000;
@@ -71,30 +73,20 @@ public class ZipkinQueryEsDAO extends EsDAO implements
IZipkinQueryDAO {
query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
ZipkinServiceTraffic.INDEX_NAME));
}
final SearchBuilder search =
Search.builder().query(query).size(SCROLLING_BATCH_SIZE);
- final SearchParams params = new
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
- final List<String> services = new ArrayList<>();
- SearchResponse response = getClient().search(index, search.build(),
params);
- final Set<String> scrollIds = new HashSet<>();
- try {
- while (response.getHits().getHits().size() != 0) {
- String scrollId = response.getScrollId();
- scrollIds.add(scrollId);
- for (SearchHit searchHit : response.getHits()) {
- Map<String, Object> sourceAsMap = searchHit.getSource();
- ZipkinServiceTraffic record = new
ZipkinServiceTraffic.Builder().storage2Entity(
- new
ElasticSearchConverter.ToEntity(ZipkinServiceTraffic.INDEX_NAME, sourceAsMap));
- services.add(record.getServiceName());
- }
- if (services.size() < SCROLLING_BATCH_SIZE) {
- break;
- }
- response = getClient().scroll(SCROLL_CONTEXT_RETENTION,
scrollId);
- }
- } finally {
- scrollIds.forEach(getClient()::deleteScrollContextQuietly);
- }
- return services;
+ final var scroller = ElasticSearchScroller
+ .<String>builder()
+ .client(getClient())
+ .search(search.build())
+ .index(index)
+ .resultConverter(hit -> {
+ final var sourceAsMap = hit.getSource();
+ final var record = new
ZipkinServiceTraffic.Builder().storage2Entity(
+ new
ElasticSearchConverter.ToEntity(ZipkinServiceTraffic.INDEX_NAME, sourceAsMap));
+ return record.getServiceName();
+ })
+ .build();
+ return scroller.scroll();
}
@Override
@@ -141,30 +133,23 @@ public class ZipkinQueryEsDAO extends EsDAO implements
IZipkinQueryDAO {
String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(ZipkinSpanRecord.INDEX_NAME);
BoolQueryBuilder query =
Query.bool().must(Query.term(ZipkinSpanRecord.TRACE_ID, traceId));
SearchBuilder search =
Search.builder().query(query).size(SCROLLING_BATCH_SIZE);
- final SearchParams params = new
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
+ final var params = new SearchParams();
RoutingUtils.addRoutingValueToSearchParam(params, traceId);
- SearchResponse response = getClient().search(index, search.build(),
params);
- final Set<String> scrollIds = new HashSet<>();
- List<Span> trace = new ArrayList<>();
- try {
- while (response.getHits().getHits().size() != 0) {
- String scrollId = response.getScrollId();
- scrollIds.add(scrollId);
- for (SearchHit searchHit : response.getHits()) {
- Map<String, Object> sourceAsMap = searchHit.getSource();
- ZipkinSpanRecord record = new
ZipkinSpanRecord.Builder().storage2Entity(
- new
ElasticSearchConverter.ToEntity(ZipkinSpanRecord.INDEX_NAME, sourceAsMap));
- trace.add(ZipkinSpanRecord.buildSpanFromRecord(record));
- }
- if (response.getHits().getHits().size() <
SCROLLING_BATCH_SIZE) {
- break;
- }
- response = getClient().scroll(SCROLL_CONTEXT_RETENTION,
scrollId);
- }
- } finally {
- scrollIds.forEach(getClient()::deleteScrollContextQuietly);
- }
- return trace;
+
+ final var scroller = ElasticSearchScroller
+ .<Span>builder()
+ .client(getClient())
+ .search(search.build())
+ .index(index)
+ .params(params)
+ .resultConverter(searchHit -> {
+ final var sourceAsMap = searchHit.getSource();
+ final var record = new
ZipkinSpanRecord.Builder().storage2Entity(
+ new
ElasticSearchConverter.ToEntity(ZipkinSpanRecord.INDEX_NAME, sourceAsMap));
+ return ZipkinSpanRecord.buildSpanFromRecord(record);
+ })
+ .build();
+ return scroller.scroll();
}
@Override