This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch feature/elasticsearch-client in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit e469ebc1651f3a311a6dc6e72a1d0277bbc82104 Author: kezhenxu94 <kezhenx...@apache.org> AuthorDate: Sat Aug 21 10:29:31 2021 +0800 Rebuilt ElasticSearch client on top of their REST API --- .../storage/type/StorageDataComplexObject.java | 22 ++ oap-server/server-library/library-client/pom.xml | 6 + .../client/elasticsearch/ElasticSearchClient.java | 295 ++++++--------------- .../elasticsearch/ElasticSearchInsertRequest.java | 26 +- .../elasticsearch/ElasticSearchUpdateRequest.java | 27 +- .../elasticsearch/ITElasticSearchClient.java | 40 +-- .../{ => library-elasticsearch-client}/pom.xml | 22 +- .../library/elasticsearch/ElasticSearchClient.java | 123 +++++++++ .../elasticsearch/ElasticSearchClientBuilder.java | 150 +++++++++++ .../elasticsearch/ElasticSearchVersion.java | 80 ++++++ .../library/elasticsearch/client/AliasClient.java | 64 +++++ .../elasticsearch/client/DocumentClient.java | 119 +++++++++ .../library/elasticsearch/client/IndexClient.java | 110 ++++++++ .../elasticsearch/client/TemplateClient.java | 107 ++++++++ .../requests/factory/AliasFactory.java} | 22 +- .../requests/factory/DocumentFactory.java | 54 ++++ .../requests/factory/IndexFactory.java} | 34 ++- .../requests/factory/RequestFactory.java | 72 +++++ .../requests/factory/TemplateFactory.java} | 30 ++- .../requests/factory/v6/V6AliasFactory.java} | 21 +- .../requests/factory/v6/V6DocumentFactory.java | 104 ++++++++ .../requests/factory/v6/V6IndexFactory.java | 91 +++++++ .../requests/factory/v6/V6RequestFactory.java | 48 ++++ .../requests/factory/v6/V6TemplateFactory.java | 79 ++++++ .../library/elasticsearch/response/Document.java} | 30 +-- .../library/elasticsearch/response/Documents.java} | 29 +- .../library/elasticsearch/response/NodeInfo.java} | 27 +- .../library/elasticsearch/util/JsonSerializer.java | 46 ++++ .../elasticsearch/ElasticSearchClientTest.java} | 29 +- oap-server/server-library/pom.xml | 3 +- .../storage/plugin/elasticsearch/base/EsDAO.java | 1 + .../elasticsearch/base/HistoryDeleteEsDAO.java | 5 +- .../plugin/elasticsearch/base/ManagementEsDAO.java | 14 +- .../plugin/elasticsearch/base/MetricsEsDAO.java | 22 +- .../plugin/elasticsearch/base/NoneStreamEsDAO.java | 6 +- .../plugin/elasticsearch/base/RecordEsDAO.java | 6 +- .../elasticsearch/query/MetricsQueryEsDAO.java | 42 +-- .../query/UITemplateManagementEsDAO.java | 23 +- .../client/ElasticSearch7Client.java | 40 +-- 39 files changed, 1592 insertions(+), 477 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java index ebad19e..855fb15 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java @@ -18,9 +18,16 @@ package org.apache.skywalking.oap.server.core.storage.type; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import java.io.IOException; + /** * StorageDataComplexObject implementation supports String-Object interconversion. */ +@JsonSerialize(using = StorageDataComplexObject.Serializer.class) public interface StorageDataComplexObject<T> { /** * @return string representing this object. @@ -36,4 +43,19 @@ public interface StorageDataComplexObject<T> { * Initialize the object based on the given source. */ void copyFrom(T source); + + class Serializer extends StdSerializer<StorageDataComplexObject<?>> { + protected Serializer(final Class<StorageDataComplexObject<?>> t) { + super(t); + } + + @Override + public void serialize( + final StorageDataComplexObject value, + final JsonGenerator gen, + final SerializerProvider provider) + throws IOException { + gen.writeString(value.toStorageData()); + } + } } diff --git a/oap-server/server-library/library-client/pom.xml b/oap-server/server-library/library-client/pom.xml index d5c6b82..43c4e56 100755 --- a/oap-server/server-library/library-client/pom.xml +++ b/oap-server/server-library/library-client/pom.xml @@ -40,6 +40,12 @@ </dependency> <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>library-elasticsearch-client</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-core</artifactId> </dependency> diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index 38e3af6..e9d5cc6 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -19,13 +19,9 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch; import com.google.common.base.Splitter; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.reflect.TypeToken; +import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.lang.reflect.Type; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyManagementException; @@ -33,8 +29,8 @@ import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -48,55 +44,34 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; -import org.apache.http.HttpStatus; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.methods.HttpDelete; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpHead; import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContexts; import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.library.elasticsearch.response.Document; +import org.apache.skywalking.library.elasticsearch.response.Documents; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker; import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable; -import org.apache.skywalking.oap.server.library.client.request.InsertRequest; -import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; import org.apache.skywalking.oap.server.library.util.HealthChecker; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Response; -import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; /** @@ -122,6 +97,8 @@ public class ElasticSearchClient implements Client, HealthCheckable { private final int connectTimeout; private final int socketTimeout; + org.apache.skywalking.library.elasticsearch.ElasticSearchClient esClient; + public ElasticSearchClient(String clusterNodes, String protocol, String trustStorePath, @@ -140,6 +117,15 @@ public class ElasticSearchClient implements Client, HealthCheckable { this.trustStorePass = trustStorePass; this.connectTimeout = connectTimeout; this.socketTimeout = socketTimeout; + this.esClient = org.apache.skywalking.library.elasticsearch.ElasticSearchClient.builder() + .endpoints(clusterNodes.split(",")) + .protocol(protocol) + .trustStorePath(trustStorePath) + .trustStorePass(trustStorePass) + .username(user) + .password(password) + .connectTimeout(connectTimeout) + .build(); } @Override @@ -156,6 +142,7 @@ public class ElasticSearchClient implements Client, HealthCheckable { } client = createClient(hosts); client.ping(); + esClient.connect(); } finally { connectLock.unlock(); } @@ -201,6 +188,7 @@ public class ElasticSearchClient implements Client, HealthCheckable { @Override public void shutdown() throws IOException { client.close(); + esClient.close(); } public static List<HttpHost> parseClusterNodes(String protocol, String nodes) { @@ -220,22 +208,13 @@ public class ElasticSearchClient implements Client, HealthCheckable { public boolean createIndex(String indexName) throws IOException { indexName = formatIndexName(indexName); - CreateIndexRequest request = new CreateIndexRequest(indexName); - CreateIndexResponse response = client.indices().create(request); - log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); - return response.isAcknowledged(); + return esClient.index().create(indexName); } public boolean updateIndexMapping(String indexName, Map<String, Object> mapping) throws IOException { indexName = formatIndexName(indexName); - Map<String, Object> properties = (Map<String, Object>) mapping.get(ElasticSearchClient.TYPE); - PutMappingRequest putMappingRequest = new PutMappingRequest(indexName); - Gson gson = new Gson(); - putMappingRequest.type(ElasticSearchClient.TYPE); - putMappingRequest.source(gson.toJson(properties), XContentType.JSON); - PutMappingResponse response = client.indices().putMapping(putMappingRequest); - log.debug("put {} index mapping finished, isAcknowledged: {}", indexName, response.isAcknowledged()); - return response.isAcknowledged(); + + return esClient.index().putMapping(indexName, TYPE, mapping); } public Map<String, Object> getIndex(String indexName) throws IOException { @@ -244,69 +223,22 @@ public class ElasticSearchClient implements Client, HealthCheckable { } indexName = formatIndexName(indexName); try { - Response response = client.getLowLevelClient() - .performRequest(HttpGet.METHOD_NAME, "/" + indexName); - int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode != HttpStatus.SC_OK) { - healthChecker.health(); - throw new IOException( - "The response status code of template exists request should be 200, but it is " + statusCode); + final Map<String, Object> indices = esClient.index().get(indexName); + if (indices.containsKey(indexName)) { + // noinspection unchecked + return (Map<String, Object>) indices.get(indexName); } - Type type = new TypeToken<HashMap<String, Object>>() { - }.getType(); - Map<String, Object> templates = new Gson().<HashMap<String, Object>>fromJson( - new InputStreamReader(response.getEntity().getContent()), - type - ); - return (Map<String, Object>) Optional.ofNullable(templates.get(indexName)).orElse(new HashMap<>()); - } catch (ResponseException e) { - if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) { - return new HashMap<>(); - } - healthChecker.unHealth(e); - throw e; - } catch (IOException t) { + return Collections.emptyMap(); + } catch (Exception t) { healthChecker.unHealth(t); throw t; } } - public boolean createIndex(String indexName, Map<String, Object> settings, - Map<String, Object> mapping) throws IOException { - indexName = formatIndexName(indexName); - CreateIndexRequest request = new CreateIndexRequest(indexName); - Gson gson = new Gson(); - request.settings(gson.toJson(settings), XContentType.JSON); - request.mapping(TYPE, gson.toJson(mapping), XContentType.JSON); - CreateIndexResponse response = client.indices().create(request); - log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); - return response.isAcknowledged(); - } - - public List<String> retrievalIndexByAliases(String aliases) throws IOException { + public Collection<String> retrievalIndexByAliases(String aliases) throws IOException { aliases = formatIndexName(aliases); - Response response; - try { - response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/_alias/" + aliases); - healthChecker.health(); - } catch (Throwable t) { - healthChecker.unHealth(t); - throw t; - } - if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) { - Gson gson = new Gson(); - InputStreamReader reader; - try { - reader = new InputStreamReader(response.getEntity().getContent()); - } catch (Throwable t) { - healthChecker.unHealth(t); - throw t; - } - JsonObject responseJson = gson.fromJson(reader, JsonObject.class); - log.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson); - return new ArrayList<>(responseJson.keySet()); - } - return Collections.emptyList(); + + return esClient.alias().indices(aliases).keySet(); } /** @@ -334,98 +266,48 @@ public class ElasticSearchClient implements Client, HealthCheckable { if (formatIndexName) { indexName = formatIndexName(indexName); } - DeleteIndexRequest request = new DeleteIndexRequest(indexName); - DeleteIndexResponse response; - response = client.indices().delete(request); - log.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); - return response.isAcknowledged(); + return esClient.index().delete(indexName); } public boolean isExistsIndex(String indexName) throws IOException { indexName = formatIndexName(indexName); - GetIndexRequest request = new GetIndexRequest(); - request.indices(indexName); - return client.indices().exists(request); + + return esClient.index().exists(indexName); } public Map<String, Object> getTemplate(String name) throws IOException { name = formatIndexName(name); + try { - Response response = client.getLowLevelClient() - .performRequest(HttpGet.METHOD_NAME, "/_template/" + name); - int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode != HttpStatus.SC_OK) { - healthChecker.health(); - throw new IOException( - "The response status code of template exists request should be 200, but it is " + statusCode); - } - Type type = new TypeToken<HashMap<String, Object>>() { - }.getType(); - Map<String, Object> templates = new Gson().<HashMap<String, Object>>fromJson( - new InputStreamReader(response.getEntity().getContent()), - type - ); + Map<String, Object> templates = esClient.templates().get(name); if (templates.containsKey(name)) { + // noinspection unchecked return (Map<String, Object>) templates.get(name); } - return new HashMap<>(); - } catch (ResponseException e) { - if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) { - return new HashMap<>(); - } + return Collections.emptyMap(); + } catch (Exception e) { healthChecker.unHealth(e); throw e; - } catch (IOException t) { - healthChecker.unHealth(t); - throw t; } } public boolean isExistsTemplate(String indexName) throws IOException { indexName = formatIndexName(indexName); - Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName); - - int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode == HttpStatus.SC_OK) { - return true; - } else if (statusCode == HttpStatus.SC_NOT_FOUND) { - return false; - } else { - throw new IOException( - "The response status code of template exists request should be 200 or 404, but it is " + statusCode); - } + return esClient.templates().exists(indexName); } public boolean createOrUpdateTemplate(String indexName, Map<String, Object> settings, Map<String, Object> mapping, int order) throws IOException { indexName = formatIndexName(indexName); - String[] patterns = new String[] {indexName + "-*"}; - - Map<String, Object> aliases = new HashMap<>(); - aliases.put(indexName, new JsonObject()); - - Map<String, Object> template = new HashMap<>(); - template.put("index_patterns", patterns); - template.put("aliases", aliases); - template.put("settings", settings); - template.put("mappings", mapping); - template.put("order", order); - - HttpEntity entity = new NStringEntity(new Gson().toJson(template), ContentType.APPLICATION_JSON); - - Response response = client.getLowLevelClient() - .performRequest( - HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity); - return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK; + return esClient.templates().createOrUpdate(indexName, settings, mapping, order); } public boolean deleteTemplate(String indexName) throws IOException { indexName = formatIndexName(indexName); - Response response = client.getLowLevelClient() - .performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName); - return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK; + + return esClient.templates().delete(indexName); } public SearchResponse search(IndexNameMaker indexNameMaker, @@ -472,40 +354,40 @@ public class ElasticSearchClient implements Client, HealthCheckable { } } - public GetResponse get(String indexName, String id) throws IOException { + public Optional<Document> get(String indexName, String id) throws IOException { indexName = formatIndexName(indexName); - GetRequest request = new GetRequest(indexName, TYPE, id); - try { - GetResponse response = client.get(request); - healthChecker.health(); - return response; - } catch (Throwable t) { - healthChecker.unHealth(t); - throw t; - } + + return esClient.documents().get(indexName, TYPE, id); } - public SearchResponse ids(String indexName, String[] ids) throws IOException { + public boolean existDoc(String indexName, String id) throws IOException { indexName = formatIndexName(indexName); - SearchRequest searchRequest = new SearchRequest(indexName); - searchRequest.types(TYPE); - searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length); - try { - SearchResponse response = client.search(searchRequest); - healthChecker.health(); - return response; - } catch (Throwable t) { - healthChecker.unHealth(t); - throw t; - } + return esClient.documents().exists(indexName, TYPE, id); + } + + public Optional<Documents> ids(String indexName, Iterable<String> ids) { + indexName = formatIndexName(indexName); + + return esClient.documents().mget(indexName, TYPE, ids); } - public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException { - IndexRequest request = (IndexRequest) prepareInsert(indexName, id, source); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + public Optional<Documents> ids(String indexName, String[] ids) { + return ids(indexName, Arrays.asList(ids)); + } + + public void forceInsert(String indexName, String id, Map<String, Object> source) + throws IOException { + ElasticSearchInsertRequest request = prepareInsert(indexName, id, source); + Map<String, Object> params = ImmutableMap.of("refresh", "true"); try { - client.index(request); + esClient.documents().index( + request.getIndex(), + request.getType(), + request.getId(), + request.getSource(), + params + ); healthChecker.health(); } catch (Throwable t) { healthChecker.unHealth(t); @@ -513,12 +395,18 @@ public class ElasticSearchClient implements Client, HealthCheckable { } } - public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException { - org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate( - indexName, id, source); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + public void forceUpdate(String indexName, String id, Map<String, Object> source) + throws IOException { + ElasticSearchUpdateRequest request = prepareUpdate(indexName, id, source); + Map<String, Object> params = ImmutableMap.of("refresh", "true"); try { - client.update(request); + esClient.documents().update( + request.getIndex(), + request.getType(), + request.getId(), + request.getSource(), + params + ); healthChecker.health(); } catch (Throwable t) { healthChecker.unHealth(t); @@ -526,14 +414,15 @@ public class ElasticSearchClient implements Client, HealthCheckable { } } - public InsertRequest prepareInsert(String indexName, String id, XContentBuilder source) { + public ElasticSearchInsertRequest prepareInsert(String indexName, String id, Map<String, Object> source) { indexName = formatIndexName(indexName); - return new ElasticSearchInsertRequest(indexName, TYPE, id).source(source); + return new ElasticSearchInsertRequest(indexName, TYPE, id, source); } - public UpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) { + public ElasticSearchUpdateRequest prepareUpdate(String indexName, String id, + Map<String, Object> source) { indexName = formatIndexName(indexName); - return new ElasticSearchUpdateRequest(indexName, TYPE, id).doc(source); + return new ElasticSearchUpdateRequest(indexName, TYPE, id, source); } public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException { @@ -548,24 +437,6 @@ public class ElasticSearchClient implements Client, HealthCheckable { return response.getStatusLine().getStatusCode(); } - /** - * @since 8.7.0 SkyWalking don't use sync bulk anymore. This method is just kept for unexpected case in the future. - */ - @Deprecated - public void synchronousBulk(BulkRequest request) { - request.timeout(TimeValue.timeValueMinutes(2)); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - request.waitForActiveShards(ActiveShardCount.ONE); - try { - int size = request.requests().size(); - BulkResponse responses = client.bulk(request); - log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size); - healthChecker.health(); - } catch (Throwable t) { - healthChecker.unHealth(t); - } - } - public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) { BulkProcessor.Listener listener = createBulkListener(); diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchInsertRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchInsertRequest.java index 9c0655c..2dd4851 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchInsertRequest.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchInsertRequest.java @@ -17,19 +17,25 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch; +import java.util.Map; +import lombok.Getter; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.common.xcontent.XContentBuilder; -public class ElasticSearchInsertRequest extends IndexRequest implements InsertRequest { +@Getter +public class ElasticSearchInsertRequest implements InsertRequest { + private final String index; - public ElasticSearchInsertRequest(String index, String type, String id) { - super(index, type, id); - } + private final String type; + + private final String id; + + private final Map<String, Object> source; - @Override - public ElasticSearchInsertRequest source(XContentBuilder sourceBuilder) { - super.source(sourceBuilder); - return this; + public ElasticSearchInsertRequest(String index, String type, String id, + Map<String, Object> source) { + this.index = index; + this.type = type; + this.id = id; + this.source = source; } } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java index 2663856..c1c9ef3 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java @@ -17,18 +17,25 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.common.xcontent.XContentBuilder; +import java.util.Map; +import lombok.Getter; +import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; -public class ElasticSearchUpdateRequest extends UpdateRequest implements org.apache.skywalking.oap.server.library.client.request.UpdateRequest { +@Getter +public class ElasticSearchUpdateRequest implements UpdateRequest { + private final String index; - public ElasticSearchUpdateRequest(String index, String type, String id) { - super(index, type, id); - } + private final String type; + + private final String id; + + private final Map<String, Object> source; - @Override - public ElasticSearchUpdateRequest doc(XContentBuilder source) { - super.doc(source); - return this; + public ElasticSearchUpdateRequest(String index, String type, String id, + Map<String, Object> source) { + this.index = index; + this.type = type; + this.id = id; + this.source = source; } } diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java index ae9a9a2..4f7f4fc 100644 --- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java @@ -18,26 +18,27 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch; +import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; import com.google.gson.JsonObject; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.http.client.methods.HttpGet; import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.library.elasticsearch.response.Document; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.After; @@ -95,7 +96,7 @@ public class ITElasticSearchClient { properties.add("column1", column); String indexName = "test_index_operate"; - client.createIndex(indexName, settings, doc); + // client.createIndex(indexName, settings, doc); // TODO Assert.assertTrue(client.isExistsIndex(indexName)); JsonObject index = getIndex(indexName); @@ -127,26 +128,25 @@ public class ITElasticSearchClient { public void documentOperate() throws IOException { String id = String.valueOf(System.currentTimeMillis()); - XContentBuilder builder = XContentFactory.jsonBuilder() - .startObject() - .field("user", "kimchy") - .field("post_date", "2009-11-15T14:12:12") - .field("message", "trying out Elasticsearch") - .endObject(); + Map<String, Object> builder = ImmutableMap.<String, Object>builder() + .put("user", "kimchy") + .put("post_date", "2009-11-15T14:12:12") + .put("message", "trying out Elasticsearch") + .build(); String indexName = "test_document_operate"; client.forceInsert(indexName, id, builder); - GetResponse response = client.get(indexName, id); - Assert.assertEquals("kimchy", response.getSource().get("user")); - Assert.assertEquals("trying out Elasticsearch", response.getSource().get("message")); + Optional<Document> response = client.get(indexName, id); + Assert.assertEquals("kimchy", response.get().getSource().get("user")); + Assert.assertEquals("trying out Elasticsearch", response.get().getSource().get("message")); - builder = XContentFactory.jsonBuilder().startObject().field("user", "pengys").endObject(); + builder = ImmutableMap.<String, Object>builder().put("user", "pengys").build(); client.forceUpdate(indexName, id, builder); response = client.get(indexName, id); - Assert.assertEquals("pengys", response.getSource().get("user")); - Assert.assertEquals("trying out Elasticsearch", response.getSource().get("message")); + Assert.assertEquals("pengys", response.get().getSource().get("user")); + Assert.assertEquals("trying out Elasticsearch", response.get().getSource().get("message")); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.termQuery("user", "pengys")); @@ -180,7 +180,7 @@ public class ITElasticSearchClient { Assert.assertTrue(client.isExistsTemplate(indexName)); - XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("name", "pengys").endObject(); + Map<String, Object> builder = ImmutableMap.of("name", "pengys"); client.forceInsert(indexName + "-2019", "testid", builder); JsonObject index = getIndex(indexName + "-2019"); LOGGER.info(index.toString()); @@ -235,12 +235,12 @@ public class ITElasticSearchClient { client.createOrUpdateTemplate(indexName, new HashMap<>(), mapping, 0); - XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("name", "pengys").endObject(); + Map<String, Object> builder = ImmutableMap.of("name", "pengys"); client.forceInsert(timeSeriesIndexName, "testid", builder); - List<String> indexes = client.retrievalIndexByAliases(indexName); + Collection<String> indexes = client.retrievalIndexByAliases(indexName); Assert.assertEquals(1, indexes.size()); - String index = indexes.get(0); + String index = indexes.iterator().next(); Assert.assertTrue(client.deleteByIndexName(index)); Assert.assertFalse(client.isExistsIndex(timeSeriesIndexName)); client.deleteTemplate(indexName); diff --git a/oap-server/server-library/pom.xml b/oap-server/server-library/library-elasticsearch-client/pom.xml similarity index 64% copy from oap-server/server-library/pom.xml copy to oap-server/server-library/library-elasticsearch-client/pom.xml index acfcd44..41ea10f 100644 --- a/oap-server/server-library/pom.xml +++ b/oap-server/server-library/library-elasticsearch-client/pom.xml @@ -17,27 +17,23 @@ ~ --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <artifactId>oap-server</artifactId> + <artifactId>server-library</artifactId> <groupId>org.apache.skywalking</groupId> <version>8.8.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>server-library</artifactId> - <packaging>pom</packaging> - <modules> - <module>library-module</module> - <module>library-server</module> - <module>library-util</module> - <module>library-client</module> - </modules> + <artifactId>library-elasticsearch-client</artifactId> <dependencies> <dependency> - <groupId>org.apache.skywalking</groupId> - <artifactId>apm-util</artifactId> + <groupId>com.linecorp.armeria</groupId> + <artifactId>armeria</artifactId> + <version>1.10.0</version> </dependency> </dependencies> -</project> \ No newline at end of file +</project> diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClient.java new file mode 100644 index 0000000..b73d642 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClient.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch; + +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.WebClientBuilder; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.auth.BasicToken; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.library.elasticsearch.client.AliasClient; +import org.apache.skywalking.library.elasticsearch.client.DocumentClient; +import org.apache.skywalking.library.elasticsearch.client.IndexClient; +import org.apache.skywalking.library.elasticsearch.client.TemplateClient; +import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory; +import org.apache.skywalking.library.elasticsearch.response.NodeInfo; +import org.apache.skywalking.library.elasticsearch.util.JsonSerializer; + +@Slf4j +public final class ElasticSearchClient implements AutoCloseable { + private final WebClient client; + + private final ClientFactory clientFactory; + + private final CompletableFuture<RequestFactory> requestFactory; + + private final TemplateClient templateClient; + + private final IndexClient indexClient; + + private final DocumentClient documentClient; + + private final AliasClient aliasClient; + + ElasticSearchClient(SessionProtocol protocol, + String username, String password, + EndpointGroup endpointGroup, + ClientFactory clientFactory) { + this.clientFactory = clientFactory; + this.requestFactory = new CompletableFuture<>(); + + final WebClientBuilder builder = WebClient.builder(protocol, endpointGroup); + if (StringUtil.isNotBlank(username) && StringUtil.isNotBlank(password)) { + builder.auth(BasicToken.of(username, password)); + } + builder.factory(clientFactory); + client = builder.build(); + + templateClient = new TemplateClient(requestFactory, client); + documentClient = new DocumentClient(requestFactory, client); + indexClient = new IndexClient(requestFactory, client); + aliasClient = new AliasClient(requestFactory, client); + } + + public static ElasticSearchClientBuilder builder() { + return new ElasticSearchClientBuilder(); + } + + public void connect() { + client.get("/").aggregate().thenAcceptAsync(response -> { + final HttpStatus status = response.status(); + if (status != HttpStatus.OK) { + throw new RuntimeException("Failed to connect to ElasticSearch server: " + status); + } + try (final HttpData content = response.content(); + final InputStream is = content.toInputStream()) { + final NodeInfo node = JsonSerializer.MAPPER.readValue(is, NodeInfo.class); + final String v = node.getVersion().getNumber(); + final ElasticSearchVersion version = ElasticSearchVersion.from(v); + requestFactory.complete(RequestFactory.of(version)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).exceptionally(throwable -> { + log.error("Failed to determine ElasticSearch version", throwable); + requestFactory.completeExceptionally(throwable); + return null; + }).join(); + } + + public TemplateClient templates() { + return templateClient; + } + + public DocumentClient documents() { + return documentClient; + } + + public IndexClient index() { + return indexClient; + } + + public AliasClient alias() { + return aliasClient; + } + + @Override + public void close() { + clientFactory.close(); + } +} diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientBuilder.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientBuilder.java new file mode 100644 index 0000000..a9500c2 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientBuilder.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch; + +import com.google.common.collect.ImmutableList; +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.ClientFactoryBuilder; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup; +import com.linecorp.armeria.common.SessionProtocol; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.KeyStore; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import javax.net.ssl.TrustManagerFactory; +import lombok.SneakyThrows; +import org.apache.skywalking.apm.util.StringUtil; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; +import static org.apache.skywalking.apm.util.StringUtil.isNotBlank; + +public final class ElasticSearchClientBuilder { + private SessionProtocol protocol = SessionProtocol.HTTP; + + private String username; + + private String password; + + private Duration healthCheckRetryInterval = Duration.ofSeconds(30); + + private final ImmutableList.Builder<String> endpoints = ImmutableList.builder(); + + private String trustStorePath; + + private String trustStorePass; + + private Duration connectTimeout = Duration.ofMillis(500); + + public ElasticSearchClientBuilder protocol(String protocol) { + checkArgument(isNotBlank(protocol), "protocol cannot be blank"); + this.protocol = SessionProtocol.of(protocol); + return this; + } + + public ElasticSearchClientBuilder username(String username) { + this.username = requireNonNull(username, "username"); + return this; + } + + public ElasticSearchClientBuilder password(String password) { + this.password = requireNonNull(password, "password"); + return this; + } + + public ElasticSearchClientBuilder endpoints(Iterable<String> endpoints) { + requireNonNull(endpoints, "endpoints"); + this.endpoints.addAll(endpoints); + return this; + } + + public ElasticSearchClientBuilder endpoints(String... endpoints) { + return endpoints(Arrays.asList(endpoints)); + } + + public ElasticSearchClientBuilder healthCheckRetryInterval(Duration healthCheckRetryInterval) { + requireNonNull(healthCheckRetryInterval, "healthCheckRetryInterval"); + this.healthCheckRetryInterval = healthCheckRetryInterval; + return this; + } + + public ElasticSearchClientBuilder trustStorePath(String trustStorePath) { + requireNonNull(trustStorePath, "trustStorePath"); + this.trustStorePath = trustStorePath; + return this; + } + + public ElasticSearchClientBuilder trustStorePass(String trustStorePass) { + requireNonNull(trustStorePass, "trustStorePass"); + this.trustStorePass = trustStorePass; + return this; + } + + public ElasticSearchClientBuilder connectTimeout(int connectTimeout) { + checkArgument(connectTimeout > 0, "connectTimeout must be positive"); + this.connectTimeout = Duration.ofMillis(connectTimeout); + return this; + } + + @SneakyThrows + public ElasticSearchClient build() { + final List<Endpoint> endpoints = + this.endpoints.build().stream().filter(StringUtil::isNotBlank).map(it -> { + final String[] parts = it.split(":", 2); + if (parts.length == 2) { + return Endpoint.of(parts[0], Integer.parseInt(parts[1])); + } + return Endpoint.of(parts[0]); + }).collect(Collectors.toList()); + final HealthCheckedEndpointGroup endpointGroup = + HealthCheckedEndpointGroup.builder(EndpointGroup.of(endpoints), "_cluster/health") + .protocol(protocol) + .useGet(true) + .retryInterval(healthCheckRetryInterval) + .build(); + final ClientFactoryBuilder factoryBuilder = ClientFactory.builder(); + factoryBuilder.connectTimeout(connectTimeout); + + if (StringUtil.isNotBlank(trustStorePath)) { + final TrustManagerFactory trustManagerFactory = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + final KeyStore truststore = KeyStore.getInstance("jks"); + try (final InputStream is = Files.newInputStream(Paths.get(trustStorePath))) { + truststore.load(is, trustStorePass.toCharArray()); + } + trustManagerFactory.init(truststore); + + factoryBuilder.tlsCustomizer( + sslContextBuilder -> sslContextBuilder.trustManager(trustManagerFactory)); + } + + return new ElasticSearchClient( + protocol, + username, + password, + endpointGroup, + factoryBuilder.build() + ); + } +} diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchVersion.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchVersion.java new file mode 100644 index 0000000..777dc5c --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchVersion.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch; + +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public final class ElasticSearchVersion implements Comparable<ElasticSearchVersion> { + public static final ElasticSearchVersion UNKNOWN = new ElasticSearchVersion(-1, -1); + + public static final ElasticSearchVersion V6_0 = new ElasticSearchVersion(6, 0); + + public static final ElasticSearchVersion V7_0 = new ElasticSearchVersion(7, 0); + + public static final ElasticSearchVersion V8_0 = new ElasticSearchVersion(8, 0); + + private final int major; + + private final int minor; + + @Override + public int compareTo(final ElasticSearchVersion o) { + if (major != o.major) { + return Integer.compare(major, o.major); + } + return Integer.compare(minor, o.minor); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ElasticSearchVersion that = (ElasticSearchVersion) o; + return major == that.major && minor == that.minor; + } + + @Override + public int hashCode() { + return Objects.hash(major, minor); + } + + @Override + public String toString() { + return major + "." + minor; + } + + private static final Pattern REGEX = Pattern.compile("(\\d+)\\.(\\d+).*"); + + public static ElasticSearchVersion from(String version) { + final Matcher matcher = REGEX.matcher(version); + if (!matcher.matches()) { + throw new IllegalArgumentException("Failed to parse version: " + version); + } + final int major = Integer.parseInt(matcher.group(1)); + final int minor = Integer.parseInt(matcher.group(2)); + return new ElasticSearchVersion(major, minor); + } +} diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/AliasClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/AliasClient.java new file mode 100644 index 0000000..8b125f0 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/AliasClient.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.client; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpStatus; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory; +import org.apache.skywalking.library.elasticsearch.util.JsonSerializer; + +@Slf4j +@RequiredArgsConstructor +public final class AliasClient { + private final CompletableFuture<RequestFactory> requestFactory; + + private final WebClient client; + + @SneakyThrows + public Map<String, Object> indices(String name) { + return requestFactory.thenCompose( + rf -> client.execute(rf.alias().indices(name)) + .aggregate().thenApply(response -> { + final HttpStatus status = response.status(); + if (status != HttpStatus.OK) { + throw new RuntimeException("Failed to get alias indices: " + status); + } + + try (final HttpData content = response.content(); + final InputStream is = content.toInputStream()) { + return JsonSerializer.parse(is); + } catch (IOException e) { + log.error("Failed to close input stream", e); + return Collections.<String, Object>emptyMap(); + } + + }).exceptionally(e -> { + log.error("Failed to check whether index exists", e); + return Collections.emptyMap(); + })).get(); + } +} diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java new file mode 100644 index 0000000..8f12521 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.client; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpStatus; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory; +import org.apache.skywalking.library.elasticsearch.response.Document; +import org.apache.skywalking.library.elasticsearch.response.Documents; +import org.apache.skywalking.library.elasticsearch.util.JsonSerializer; + +@Slf4j +@RequiredArgsConstructor +public final class DocumentClient { + private final CompletableFuture<RequestFactory> requestFactory; + + private final WebClient client; + + @SneakyThrows + public boolean exists(String index, String type, String id) { + return requestFactory.thenCompose( + rf -> client.execute(rf.document().exist(index, type, id)) + .aggregate().thenApply(response -> response.status() == HttpStatus.OK) + .exceptionally(e -> { + log.error("Failed to check whether document exists", e); + return false; + })).get(); + } + + @SneakyThrows + public Optional<Document> get(String index, String type, String id) { + return requestFactory.thenCompose( + rf -> client.execute(rf.document().get(index, type, id)) + .aggregate().thenApply(response -> { + if (response.status() != HttpStatus.OK) { + return Optional.<Document>empty(); + } + + try (final HttpData content = response.content(); + final InputStream is = content.toInputStream()) { + return Optional.of(JsonSerializer.parse(is, Document.class)); + } catch (IOException e) { + log.error("Failed to close input stream", e); + return Optional.<Document>empty(); + } + })).get(); + } + + @SneakyThrows + public Optional<Documents> mget(String index, String type, Iterable<String> ids) { + return requestFactory.thenCompose( + rf -> client.execute(rf.document().mget(index, type, ids)) + .aggregate().thenApply(response -> { + if (response.status() != HttpStatus.OK) { + return Optional.<Documents>empty(); + } + + try (final HttpData content = response.content(); + final InputStream is = content.toInputStream()) { + return Optional.of(JsonSerializer.parse(is, Documents.class)); + } catch (IOException e) { + log.error("Failed to close input stream", e); + return Optional.<Documents>empty(); + } + })).get(); + } + + @SneakyThrows + public void index(String index, String type, String id, + Map<String, Object> doc, + Map<String, Object> params) { + requestFactory.thenCompose( + rf -> client.execute(rf.document().index(index, type, id, doc, params)) + .aggregate().thenAccept(response -> { + final HttpStatus status = response.status(); + if (status != HttpStatus.CREATED && status != HttpStatus.OK) { + throw new RuntimeException("Failed to index doc: " + status); + } + })).join(); + } + + @SneakyThrows + public void update(String index, String type, String id, + Map<String, Object> doc, + Map<String, Object> params) { + requestFactory.thenCompose( + rf -> client.execute(rf.document().update(index, type, id, doc, params)) + .aggregate().thenAccept(response -> { + final HttpStatus status = response.status(); + if (status != HttpStatus.OK) { + throw new RuntimeException("Failed to update doc: " + status); + } + })).join(); + } +} diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/IndexClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/IndexClient.java new file mode 100644 index 0000000..c7ed047 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/IndexClient.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.client; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpStatus; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory; +import org.apache.skywalking.library.elasticsearch.response.Document; +import org.apache.skywalking.library.elasticsearch.util.JsonSerializer; + +@Slf4j +@RequiredArgsConstructor +public final class IndexClient { + private final CompletableFuture<RequestFactory> requestFactory; + + private final WebClient client; + + @SneakyThrows + public boolean exists(String name) { + return requestFactory.thenCompose( + rf -> client.execute(rf.index().exists(name)) + .aggregate().thenApply(response -> response.status() == HttpStatus.OK) + .exceptionally(e -> { + log.error("Failed to check whether index exists", e); + return false; + })).get(); + } + + @SneakyThrows + public Map<String, Object> get(String name) { + return requestFactory.thenCompose( + rf -> client.execute(rf.index().get(name)) + .aggregate().thenApply(response -> { + final HttpStatus status = response.status(); + if (status != HttpStatus.OK) { + throw new RuntimeException("Failed to get index: " + status); + } + + try (final HttpData content = response.content(); + final InputStream is = content.toInputStream()) { + return JsonSerializer.parse(is); + } catch (IOException e) { + log.error("Failed to close input stream", e); + return Collections.<String, Object>emptyMap(); + } + }).exceptionally(e -> { + log.error("Failed to check whether index exists", e); + return Collections.emptyMap(); + })).get(); + } + + @SneakyThrows + public boolean create(String name) { + return requestFactory.thenCompose( + rf -> client.execute(rf.index().create(name)) + .aggregate().thenApply(response -> response.status() == HttpStatus.OK) + .exceptionally(e -> { + log.error("Failed to check whether index exists", e); + return false; + })).get(); + } + + @SneakyThrows + public boolean delete(String name) { + return requestFactory.thenCompose( + rf -> client.execute(rf.index().delete(name)) + .aggregate().thenApply(response -> response.status() == HttpStatus.OK) + .exceptionally(e -> { + log.error("Failed to delete whether index exists", e); + return false; + })).get(); + } + + @SneakyThrows + public boolean putMapping(String name, String type, + Map<String, Object> mapping) { + return requestFactory.thenCompose( + rf -> client.execute(rf.index().putMapping(name, type, mapping)) + .aggregate().thenApply(response -> response.status() == HttpStatus.OK) + .exceptionally(e -> { + log.error("Failed to update index mapping", e); + return false; + })).get(); + } +} diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/TemplateClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/TemplateClient.java new file mode 100644 index 0000000..4f5879a --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/TemplateClient.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.client; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpStatus; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory; +import org.apache.skywalking.library.elasticsearch.util.JsonSerializer; + +@Slf4j +@RequiredArgsConstructor +public final class TemplateClient { + private final CompletableFuture<RequestFactory> requestFactory; + + private final WebClient client; + + @SneakyThrows + public boolean exists(String name) { + return requestFactory.thenCompose( + rf -> client.execute(rf.template().exists(name)) + .aggregate().thenApply(response -> { + final HttpStatus status = response.status(); + if (status == HttpStatus.OK) { + return true; + } else if (status == HttpStatus.NOT_FOUND) { + return false; + } + throw new RuntimeException( + "Response status code of template exists request should be 200 or 404," + + " but it was: " + status.code()); + }).exceptionally(e -> { + log.error("Failed to check whether template exists", e); + return false; + })).get(); + } + + @SneakyThrows + public Map<String, Object> get(String name) { + return requestFactory.thenCompose( + rf -> client.execute(rf.template().get(name)) + .aggregate().thenApply(response -> { + final HttpStatus status = response.status(); + if (status != HttpStatus.OK) { + throw new RuntimeException("Failed to get template: " + status); + } + + try (final HttpData content = response.content(); + final InputStream is = content.toInputStream()) { + return JsonSerializer.parse(is); + } catch (IOException e) { + log.error("Failed to close input stream", e); + return Collections.<String, Object>emptyMap(); + } + }).exceptionally(e -> { + log.error("Failed to check whether template exists", e); + return Collections.emptyMap(); + })).get(); + } + + @SneakyThrows + public boolean delete(String name) { + return requestFactory.thenCompose( + rf -> client.execute(rf.template().delete(name)) + .aggregate().thenApply(response -> response.status() == HttpStatus.OK) + .exceptionally(e -> { + log.error("Failed to check whether template exists", e); + return false; + })).get(); + } + + @SneakyThrows + public boolean createOrUpdate(String name, Map<String, Object> settings, + Map<String, Object> mapping, int order) { + return requestFactory.thenCompose( + rf -> client.execute(rf.template().createOrUpdate(name, settings, mapping, order)) + .aggregate().thenApply(response -> response.status() == HttpStatus.OK) + .exceptionally(e -> { + log.error("Failed to create or update template", e); + return false; + })).get(); + } + +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/AliasFactory.java similarity index 62% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/AliasFactory.java index ebad19e..6ccf6b8 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/AliasFactory.java @@ -13,27 +13,15 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.skywalking.oap.server.core.storage.type; +package org.apache.skywalking.library.elasticsearch.requests.factory; -/** - * StorageDataComplexObject implementation supports String-Object interconversion. - */ -public interface StorageDataComplexObject<T> { - /** - * @return string representing this object. - */ - String toStorageData(); - - /** - * Initialize this object based on the given string data. - */ - void toObject(String data); +import com.linecorp.armeria.common.HttpRequest; +public interface AliasFactory { /** - * Initialize the object based on the given source. + * Returns a request to list all indices behind the {@code alias}. */ - void copyFrom(T source); + HttpRequest indices(String alias); } diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/DocumentFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/DocumentFactory.java new file mode 100644 index 0000000..715edf1 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/DocumentFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.requests.factory; + +import com.linecorp.armeria.common.HttpRequest; +import java.util.Map; + +public interface DocumentFactory { + /** + * Returns a request to check whether the document exists in the {@code index} or not. + */ + HttpRequest exist(String index, String type, String id); + + /** + * Returns a request to get a document of {@code id} in {@code index}. + */ + HttpRequest get(String index, String type, String id); + + /** + * Returns a request to get multiple documents of {@code ids} in {@code index}. + */ + HttpRequest mget(String index, String type, Iterable<String> ids); + + /** + * Returns a request to index a document of {@code id} in {@code index}, with content {@code + * doc}. + */ + HttpRequest index(String index, String type, String id, + Map<String, Object> doc, + Map<String, Object> params); + + /** + * Returns a request to update a document of {@code id} in {@code index}, with content {@code + * doc}. + */ + HttpRequest update(String index, String type, String id, + Map<String, Object> doc, + Map<String, Object> params); +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/IndexFactory.java similarity index 50% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/IndexFactory.java index ebad19e..35e9567 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/IndexFactory.java @@ -13,27 +13,37 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.skywalking.oap.server.core.storage.type; +package org.apache.skywalking.library.elasticsearch.requests.factory; + +import com.linecorp.armeria.common.HttpRequest; +import java.util.Map; + +public interface IndexFactory { + /** + * Returns a request to check whether the {@code index} exists or not. + */ + HttpRequest exists(String index); + + /** + * Returns a request to get an index of name {@code index}. + */ + HttpRequest get(String index); -/** - * StorageDataComplexObject implementation supports String-Object interconversion. - */ -public interface StorageDataComplexObject<T> { /** - * @return string representing this object. + * Returns a request to create an index of name {@code index}. */ - String toStorageData(); + HttpRequest create(String index); /** - * Initialize this object based on the given string data. + * Returns a request to delete an index of name {@code index}. */ - void toObject(String data); + HttpRequest delete(String index); /** - * Initialize the object based on the given source. + * Returns a request to update the {@code mapping} of an index of name {@code index}. */ - void copyFrom(T source); + HttpRequest putMapping(String index, String type, + Map<String, Object> mapping); } diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/RequestFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/RequestFactory.java new file mode 100644 index 0000000..c00812a --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/RequestFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.requests.factory; + +import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion; +import org.apache.skywalking.library.elasticsearch.requests.factory.v6.V6RequestFactory; + +import static java.util.Objects.requireNonNull; +import static org.apache.skywalking.library.elasticsearch.ElasticSearchVersion.V6_0; +import static org.apache.skywalking.library.elasticsearch.ElasticSearchVersion.V7_0; +import static org.apache.skywalking.library.elasticsearch.ElasticSearchVersion.V8_0; + +public interface RequestFactory { + /** + * Returns a {@link RequestFactory} that is responsible to compose correct requests according to + * the syntax of specific {@link ElasticSearchVersion}. + */ + static RequestFactory of(ElasticSearchVersion version) { + requireNonNull(version, "version"); + if (version.compareTo(V6_0) >= 0 && version.compareTo(V7_0) < 0) { + return V6RequestFactory.INSTANCE; + } + if (version.compareTo(V7_0) >= 0 && version.compareTo(V8_0) < 0) { + return V6RequestFactory.INSTANCE; + } + + throw new UnsupportedOperationException("Version " + version + " is not supported."); + } + + /** + * Returns a {@link TemplateFactory} that is dedicated to compose template-related requests. + * + * @see TemplateFactory + */ + TemplateFactory template(); + + /** + * Returns a {@link IndexFactory} that is dedicated to compose index-related requests. + * + * @see IndexFactory + */ + IndexFactory index(); + + /** + * Returns a {@link AliasFactory} that is dedicated to compose alias-related requests. + * + * @see AliasFactory + */ + AliasFactory alias(); + + /** + * Returns a {@link DocumentFactory} that is dedicated to compose document-related requests. + * + * @see DocumentFactory + */ + DocumentFactory document(); +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/TemplateFactory.java similarity index 52% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/TemplateFactory.java index ebad19e..9726f1f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/TemplateFactory.java @@ -13,27 +13,33 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.skywalking.oap.server.core.storage.type; +package org.apache.skywalking.library.elasticsearch.requests.factory; + +import com.linecorp.armeria.common.HttpRequest; +import java.util.Map; + +public interface TemplateFactory { + /** + * Returns a request to check whether the template exists or not. + */ + HttpRequest exists(String name); -/** - * StorageDataComplexObject implementation supports String-Object interconversion. - */ -public interface StorageDataComplexObject<T> { /** - * @return string representing this object. + * Returns a request to get a template of {@code name}. */ - String toStorageData(); + HttpRequest get(String name); /** - * Initialize this object based on the given string data. + * Returns a request to delete a template of {@code name}. */ - void toObject(String data); + HttpRequest delete(String name); /** - * Initialize the object based on the given source. + * Returns a request to create or update a template of {@code name} with the given {@code + * settings}, {@code mapping} and {@code order}. */ - void copyFrom(T source); + HttpRequest createOrUpdate(String name, Map<String, Object> settings, + Map<String, Object> mapping, int order); } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6AliasFactory.java similarity index 59% copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6AliasFactory.java index 2663856..663d27d 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6AliasFactory.java @@ -15,20 +15,19 @@ * limitations under the License. */ -package org.apache.skywalking.oap.server.library.client.elasticsearch; +package org.apache.skywalking.library.elasticsearch.requests.factory.v6; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.common.xcontent.XContentBuilder; +import com.linecorp.armeria.common.HttpRequest; +import org.apache.skywalking.library.elasticsearch.requests.factory.AliasFactory; -public class ElasticSearchUpdateRequest extends UpdateRequest implements org.apache.skywalking.oap.server.library.client.request.UpdateRequest { - - public ElasticSearchUpdateRequest(String index, String type, String id) { - super(index, type, id); - } +class V6AliasFactory implements AliasFactory { + static final V6AliasFactory INSTANCE = new V6AliasFactory(); @Override - public ElasticSearchUpdateRequest doc(XContentBuilder source) { - super.doc(source); - return this; + public HttpRequest indices(final String alias) { + return HttpRequest.builder() + .get("/_alias/{name}") + .pathParam("name", alias) + .build(); } } diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6DocumentFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6DocumentFactory.java new file mode 100644 index 0000000..17bba61 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6DocumentFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.requests.factory.v6; + +import com.google.common.collect.ImmutableMap; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpRequestBuilder; +import com.linecorp.armeria.common.MediaType; +import java.util.Map; +import lombok.SneakyThrows; +import org.apache.skywalking.library.elasticsearch.requests.factory.DocumentFactory; +import org.apache.skywalking.library.elasticsearch.util.JsonSerializer; + +class V6DocumentFactory implements DocumentFactory { + static final V6DocumentFactory INSTANCE = new V6DocumentFactory(); + + @Override + public HttpRequest exist(final String index, final String type, final String id) { + return HttpRequest.builder() + .head("/{index}/{type}/{id}") + .pathParam("index", index.replaceAll(" ", "%20")) + .pathParam("type", type) + .pathParam("id", id) + .build(); + } + + @Override + public HttpRequest get(String index, String type, String id) { + return HttpRequest.builder() + .get("/{index}/{type}/{id}") + .pathParam("index", index.replaceAll(" ", "%20")) + .pathParam("type", type) + .pathParam("id", id.replaceAll(" ", "%20")) + .build(); + } + + @SneakyThrows + @Override + public HttpRequest mget(String index, String type, Iterable<String> ids) { + final Map<String, Iterable<String>> m = ImmutableMap.of("ids", ids); + final byte[] content = JsonSerializer.MAPPER.writeValueAsBytes(m); + return HttpRequest.builder() + .get("/{index}/{type}/_mget") + .pathParam("index", index.replaceAll(" ", "%20")) + .pathParam("type", type) + .content(MediaType.JSON, content) + .build(); + } + + @SneakyThrows + @Override + public HttpRequest index(String index, String type, String id, + Map<String, Object> doc, + Map<String, Object> params) { + final HttpRequestBuilder builder = HttpRequest.builder(); + if (params != null) { + params.forEach(builder::queryParam); + } + final byte[] content = JsonSerializer.MAPPER.writeValueAsBytes(doc); + + builder.put("/{index}/{type}/{id}") + .pathParam("index", index.replaceAll(" ", "%20")) + .pathParam("type", type) + .pathParam("id", id.replaceAll(" ", "%20")) + .content(MediaType.JSON, content); + + return builder.build(); + } + + @SneakyThrows + @Override + public HttpRequest update(String index, String type, String id, + Map<String, Object> doc, + Map<String, Object> params) { + final HttpRequestBuilder builder = HttpRequest.builder(); + if (params != null) { + params.forEach(builder::queryParam); + } + final byte[] content = JsonSerializer.MAPPER.writeValueAsBytes(doc); + + builder.put("/{index}/{type}/{id}/_update") + .pathParam("index", index.replaceAll(" ", "%20")) + .pathParam("type", type) + .pathParam("id", id.replaceAll(" ", "%20")) + .content(MediaType.JSON, content); + + return builder.build(); + } +} diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6IndexFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6IndexFactory.java new file mode 100644 index 0000000..a356dc9 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6IndexFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.requests.factory.v6; + +import com.google.common.base.Strings; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.MediaType; +import java.util.Map; +import lombok.SneakyThrows; +import org.apache.skywalking.library.elasticsearch.requests.factory.IndexFactory; +import org.apache.skywalking.library.elasticsearch.util.JsonSerializer; + +import static com.google.common.base.Preconditions.checkArgument; + +class V6IndexFactory implements IndexFactory { + static final IndexFactory INSTANCE = new V6IndexFactory(); + + @Override + public HttpRequest exists(String index) { + checkArgument(!Strings.isNullOrEmpty(index), "index cannot be null or empty"); + + return HttpRequest.builder() + .head("/{index}") + .pathParam("index", index) + .build(); + } + + @Override + public HttpRequest get(final String index) { + checkArgument(!Strings.isNullOrEmpty(index), "index cannot be null or empty"); + + return HttpRequest.builder() + .get("/{index}") + .pathParam("index", index) + .build(); + } + + @SneakyThrows + @Override + public HttpRequest create(String index) { + checkArgument(!Strings.isNullOrEmpty(index), "index cannot be null or empty"); + + return HttpRequest.builder() + .put("/{index}") + .pathParam("index", index) + .build(); + } + + @Override + public HttpRequest delete(String index) { + checkArgument(!Strings.isNullOrEmpty(index), "index cannot be null or empty"); + + return HttpRequest.builder() + .delete("/{index}") + .pathParam("index", index) + .build(); + } + + @SneakyThrows + @Override + @SuppressWarnings("unchecked") + public HttpRequest putMapping(String index, String type, + Map<String, Object> mapping) { + checkArgument(!Strings.isNullOrEmpty(index), "index cannot be null or empty"); + checkArgument(!Strings.isNullOrEmpty(type), "type cannot be null or empty"); + + final Map<String, Object> properties = (Map<String, Object>) mapping.get(type); + final byte[] content = JsonSerializer.MAPPER.writeValueAsBytes(properties); + return HttpRequest.builder() + .put("/{index}/_mapping/{type}") + .pathParam("index", index) + .pathParam("type", type) + .content(MediaType.JSON, content) + .build(); + } +} diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6RequestFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6RequestFactory.java new file mode 100644 index 0000000..2356ae9 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6RequestFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.requests.factory.v6; + +import org.apache.skywalking.library.elasticsearch.requests.factory.AliasFactory; +import org.apache.skywalking.library.elasticsearch.requests.factory.DocumentFactory; +import org.apache.skywalking.library.elasticsearch.requests.factory.IndexFactory; +import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory; +import org.apache.skywalking.library.elasticsearch.requests.factory.TemplateFactory; + +public final class V6RequestFactory implements RequestFactory { + public static final V6RequestFactory INSTANCE = new V6RequestFactory(); + + @Override + public TemplateFactory template() { + return V6TemplateFactory.INSTANCE; + } + + @Override + public IndexFactory index() { + return V6IndexFactory.INSTANCE; + } + + @Override + public AliasFactory alias() { + return V6AliasFactory.INSTANCE; + } + + @Override + public DocumentFactory document() { + return V6DocumentFactory.INSTANCE; + } +} diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6TemplateFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6TemplateFactory.java new file mode 100644 index 0000000..2777867 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6TemplateFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.requests.factory.v6; + +import com.google.common.collect.ImmutableMap; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.MediaType; +import java.util.Collections; +import java.util.Map; +import lombok.SneakyThrows; +import org.apache.skywalking.library.elasticsearch.requests.factory.TemplateFactory; +import org.apache.skywalking.library.elasticsearch.util.JsonSerializer; + +class V6TemplateFactory implements TemplateFactory { + static final TemplateFactory INSTANCE = new V6TemplateFactory(); + + @Override + public HttpRequest exists(String name) { + return HttpRequest.builder() + .get("/_template/{name}") + .pathParam("name", name) + .build(); + } + + @Override + public HttpRequest get(final String name) { + return HttpRequest.builder() + .get("/_template/{name}") + .pathParam("name", name) + .build(); + } + + @Override + public HttpRequest delete(final String name) { + return HttpRequest.builder() + .delete("/_template/{name}") + .pathParam("name", name) + .build(); + } + + @SneakyThrows + @Override + public HttpRequest createOrUpdate(String name, Map<String, Object> settings, + Map<String, Object> mapping, int order) { + final String[] patterns = new String[] {name + "-*"}; + final Map<String, Object> aliases = ImmutableMap.of(name, Collections.emptyMap()); + final Map<String, Object> template = + ImmutableMap.<String, Object>builder() + .put("index_patterns", patterns) + .put("aliases", aliases) + .put("settings", settings) + .put("mappings", mapping) + .put("order", order) + .build(); + + final byte[] content = JsonSerializer.MAPPER.writeValueAsBytes(template); + + return HttpRequest.builder() + .put("/_template/{name}") + .pathParam("name", name) + .content(MediaType.JSON, content) + .build(); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Document.java similarity index 61% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Document.java index ebad19e..89e0565 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Document.java @@ -13,27 +13,21 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.skywalking.oap.server.core.storage.type; +package org.apache.skywalking.library.elasticsearch.response; -/** - * StorageDataComplexObject implementation supports String-Object interconversion. - */ -public interface StorageDataComplexObject<T> { - /** - * @return string representing this object. - */ - String toStorageData(); +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import lombok.Data; + +@Data +public final class Document { + private boolean found; - /** - * Initialize this object based on the given string data. - */ - void toObject(String data); + @JsonProperty("_id") + private String id; - /** - * Initialize the object based on the given source. - */ - void copyFrom(T source); + @JsonProperty("_source") + private Map<String, Object> source; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Documents.java similarity index 61% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Documents.java index ebad19e..6084cab 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Documents.java @@ -13,27 +13,20 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.skywalking.oap.server.core.storage.type; +package org.apache.skywalking.library.elasticsearch.response; -/** - * StorageDataComplexObject implementation supports String-Object interconversion. - */ -public interface StorageDataComplexObject<T> { - /** - * @return string representing this object. - */ - String toStorageData(); +import java.util.Iterator; +import java.util.List; +import lombok.Data; - /** - * Initialize this object based on the given string data. - */ - void toObject(String data); +@Data +public final class Documents implements Iterable<Document> { + private List<Document> docs; - /** - * Initialize the object based on the given source. - */ - void copyFrom(T source); + @Override + public Iterator<Document> iterator() { + return docs.stream().filter(Document::isFound).iterator(); + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/NodeInfo.java similarity index 61% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/NodeInfo.java index ebad19e..3e1ea2b 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/NodeInfo.java @@ -13,27 +13,18 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.skywalking.oap.server.core.storage.type; +package org.apache.skywalking.library.elasticsearch.response; -/** - * StorageDataComplexObject implementation supports String-Object interconversion. - */ -public interface StorageDataComplexObject<T> { - /** - * @return string representing this object. - */ - String toStorageData(); +import lombok.Data; - /** - * Initialize this object based on the given string data. - */ - void toObject(String data); +@Data +public final class NodeInfo { + @Data + public static class Version { + private String number; + } - /** - * Initialize the object based on the given source. - */ - void copyFrom(T source); + private Version version; } diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/util/JsonSerializer.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/util/JsonSerializer.java new file mode 100644 index 0000000..3ecbd07 --- /dev/null +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/util/JsonSerializer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.library.elasticsearch.util; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.InputStream; +import java.util.Map; +import lombok.SneakyThrows; + +public class JsonSerializer { + public static final ObjectMapper MAPPER = new ObjectMapper() + .setSerializationInclusion(JsonInclude.Include.NON_NULL) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + private static final TypeReference<Map<String, Object>> MAP_TYPE = + new TypeReference<Map<String, Object>>() { + }; + + @SneakyThrows + public static <T> T parse(InputStream is, Class<T> type) { + return MAPPER.readValue(is, type); + } + + @SneakyThrows + public static Map<String, Object> parse(InputStream is) { + return MAPPER.readValue(is, MAP_TYPE); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/test/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientTest.java similarity index 61% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java copy to oap-server/server-library/library-elasticsearch-client/src/test/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientTest.java index ebad19e..f195ec5 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java +++ b/oap-server/server-library/library-elasticsearch-client/src/test/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientTest.java @@ -13,27 +13,20 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.skywalking.oap.server.core.storage.type; +package org.apache.skywalking.library.elasticsearch; -/** - * StorageDataComplexObject implementation supports String-Object interconversion. - */ -public interface StorageDataComplexObject<T> { - /** - * @return string representing this object. - */ - String toStorageData(); +import org.junit.Test; - /** - * Initialize this object based on the given string data. - */ - void toObject(String data); +public class ElasticSearchClientTest { - /** - * Initialize the object based on the given source. - */ - void copyFrom(T source); + @Test + public void testPing() { + final ElasticSearchClient client = + ElasticSearchClient.builder() + .endpoints("localhost:9200") + .build(); + client.connect(); + } } diff --git a/oap-server/server-library/pom.xml b/oap-server/server-library/pom.xml index acfcd44..fa37d28 100644 --- a/oap-server/server-library/pom.xml +++ b/oap-server/server-library/pom.xml @@ -32,6 +32,7 @@ <module>library-server</module> <module>library-util</module> <module>library-client</module> + <module>library-elasticsearch-client</module> </modules> <dependencies> @@ -40,4 +41,4 @@ <artifactId>apm-util</artifactId> </dependency> </dependencies> -</project> \ No newline at end of file +</project> diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java index f586b6f..e798720 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java @@ -33,6 +33,7 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> { super(client); } + // TODO: remove protected XContentBuilder map2builder(Map<String, Object> objectMap) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); for (Map.Entry<String, Object> entries: objectMap.entrySet()) { diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java index af73fc9..5f687a1 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.analysis.DownSampling; @@ -53,7 +54,7 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO { } deadline = Long.parseLong(new DateTime().plusDays(-ttl).toString("yyyyMMdd")); String tableName = IndexController.INSTANCE.getTableName(model); - List<String> indexes = client.retrievalIndexByAliases(tableName); + Collection<String> indexes = client.retrievalIndexByAliases(tableName); List<String> prepareDeleteIndexes = new ArrayList<>(); List<String> leftIndices = new ArrayList<>(); @@ -95,7 +96,7 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO { } } String tableName = IndexController.INSTANCE.getTableName(model); - List<String> indexes; + Collection<String> indexes; try { indexes = client.retrievalIndexByAliases(tableName); } catch (IOException e) { diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java index 2c8676c..abc67c6 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.io.IOException; +import java.util.Map; import org.apache.skywalking.oap.server.core.analysis.management.ManagementData; import org.apache.skywalking.oap.server.core.storage.IManagementDAO; import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder; @@ -40,12 +41,13 @@ public class ManagementEsDAO extends EsDAO implements IManagementDAO { public void insert(Model model, ManagementData managementData) throws IOException { String tableName = IndexController.INSTANCE.getTableName(model); String docId = IndexController.INSTANCE.generateDocId(model, managementData.id()); - final GetResponse response = getClient().get(tableName, docId); - if (response.isExists()) { + final boolean exist = getClient().existDoc(tableName, docId); + if (exist) { return; } - XContentBuilder builder = map2builder( - IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(managementData))); - getClient().forceInsert(tableName, docId, builder); + Map<String, Object> source = + IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage( + managementData)); + getClient().forceInsert(tableName, docId, source); } -} \ No newline at end of file +} diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java index 38d379d..e8ec5a3 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java @@ -23,8 +23,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.library.elasticsearch.response.Document; +import org.apache.skywalking.library.elasticsearch.response.Documents; import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; @@ -90,15 +93,12 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO { String[] ids = metricList.stream() .map(item -> IndexController.INSTANCE.generateDocId(model, item.id())) .toArray(String[]::new); - try { - SearchResponse response = getClient().ids(tableName, ids); - for (int i = 0; i < response.getHits().getHits().length; i++) { - Metrics source = storageBuilder.storage2Entity(response.getHits().getAt(i).getSourceAsMap()); + getClient().ids(tableName, ids).ifPresent(documents -> { + for (final Document doc : documents) { + Metrics source = storageBuilder.storage2Entity(doc.getSource()); result.add(source); } - } catch (IOException e) { - log.error("multiGet id=" + Arrays.toString(ids) + " from " + tableName + " fails.", e); - } + }); }); return result; @@ -106,8 +106,8 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO { @Override public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException { - XContentBuilder builder = map2builder( - IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(metrics))); + Map<String, Object> builder = + IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(metrics)); String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket()); String id = IndexController.INSTANCE.generateDocId(model, metrics.id()); return getClient().prepareInsert(modelName, id, builder); @@ -115,8 +115,8 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO { @Override public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException { - XContentBuilder builder = map2builder( - IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(metrics))); + Map<String, Object> builder = + IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(metrics)); String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket()); String id = IndexController.INSTANCE.generateDocId(model, metrics.id()); return getClient().prepareUpdate(modelName, id, builder); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java index 3d89243..bab0404 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.io.IOException; +import java.util.Map; import org.apache.skywalking.oap.server.core.analysis.config.NoneStream; import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO; import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder; @@ -40,8 +41,9 @@ public class NoneStreamEsDAO extends EsDAO implements INoneStreamDAO { @Override public void insert(Model model, NoneStream noneStream) throws IOException { - XContentBuilder builder = map2builder( - IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(noneStream))); + Map<String, Object> builder = + IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage( + noneStream)); String modelName = TimeSeriesUtils.writeIndexName(model, noneStream.getTimeBucket()); String id = IndexController.INSTANCE.generateDocId(model, noneStream.id()); getClient().forceInsert(modelName, id, builder); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java index f23bc61..26f35c7 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java @@ -19,13 +19,13 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.io.IOException; +import java.util.Map; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; -import org.elasticsearch.common.xcontent.XContentBuilder; public class RecordEsDAO extends EsDAO implements IRecordDAO { private final StorageHashMapBuilder<Record> storageBuilder; @@ -38,8 +38,8 @@ public class RecordEsDAO extends EsDAO implements IRecordDAO { @Override public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException { - XContentBuilder builder = map2builder( - IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(record))); + Map<String, Object> builder = + IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(record)); String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket()); String id = IndexController.INSTANCE.generateDocId(model, record.id()); return getClient().prepareInsert(modelName, id, builder); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java index 33d963f..187b9fd 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java @@ -20,9 +20,13 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import org.apache.skywalking.library.elasticsearch.response.Document; +import org.apache.skywalking.library.elasticsearch.response.Documents; import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable; import org.apache.skywalking.oap.server.core.analysis.metrics.HistogramMetrics; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; @@ -43,7 +47,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; @@ -111,12 +114,15 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { } ids.add(id); }); + MetricsValues metricsValues = new MetricsValues(); - SearchResponse response = getClient() - .ids(tableName, ids.toArray(new String[0])); - Map<String, Map<String, Object>> idMap = toMap(response); + Optional<Documents> response = getClient().ids(tableName, ids); + if (!response.isPresent()) { + return metricsValues; + } + + Map<String, Map<String, Object>> idMap = toMap(response.get()); - MetricsValues metricsValues = new MetricsValues(); // Label is null, because in readMetricsValues, no label parameter. IntValues intValues = metricsValues.getValues(); for (String id : ids) { @@ -156,11 +162,13 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { ids.add(id); }); - SearchResponse response = getClient().ids(tableName, ids.toArray(new String[0])); + Optional<Documents> response = getClient().ids(tableName, ids); + if (!response.isPresent()) { + return Collections.emptyList(); + } Map<String, DataTable> idMap = new HashMap<>(); - SearchHit[] hits = response.getHits().getHits(); - for (SearchHit hit : hits) { - idMap.put(hit.getId(), new DataTable((String) hit.getSourceAsMap().getOrDefault(valueColumnName, ""))); + for (final Document document : response.get()) { + idMap.put(document.getId(), new DataTable((String) document.getSource().getOrDefault(valueColumnName, ""))); } return Util.composeLabelValue(condition, labels, ids, idMap); } @@ -181,11 +189,14 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { ids.add(id); }); - SearchResponse response = getClient().ids(tableName, ids.toArray(new String[0])); - Map<String, Map<String, Object>> idMap = toMap(response); - HeatMap heatMap = new HeatMap(); + Optional<Documents> response = getClient().ids(tableName, ids); + if (!response.isPresent()) { + return heatMap; + } + Map<String, Map<String, Object>> idMap = toMap(response.get()); + final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName()); for (String id : ids) { Map<String, Object> source = idMap.get(id); @@ -254,11 +265,10 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { sourceBuilder.size(0); } - private Map<String, Map<String, Object>> toMap(SearchResponse response) { + private Map<String, Map<String, Object>> toMap(Documents documents) { Map<String, Map<String, Object>> result = new HashMap<>(); - SearchHit[] hits = response.getHits().getHits(); - for (SearchHit hit : hits) { - result.put(hit.getId(), hit.getSourceAsMap()); + for (final Document document : documents) { + result.put(document.getId(), document.getSource()); } return result; } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java index 809320c..9de38b8 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.library.elasticsearch.response.Document; import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate; import org.apache.skywalking.oap.server.core.query.input.DashboardSetting; import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration; @@ -32,7 +34,6 @@ import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSear import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.BoolQueryBuilder; @@ -83,12 +84,12 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage final UITemplate.Builder builder = new UITemplate.Builder(); final UITemplate uiTemplate = setting.toEntity(); - final GetResponse response = getClient().get(UITemplate.INDEX_NAME, uiTemplate.id()); - if (response.isExists()) { + final boolean exist = getClient().existDoc(UITemplate.INDEX_NAME, uiTemplate.id()); + if (exist) { return TemplateChangeStatus.builder().status(false).message("Template exists").build(); } - XContentBuilder xContentBuilder = map2builder(builder.entity2Storage(uiTemplate)); + Map<String, Object> xContentBuilder = builder.entity2Storage(uiTemplate); getClient().forceInsert(UITemplate.INDEX_NAME, uiTemplate.id(), xContentBuilder); return TemplateChangeStatus.builder().status(true).build(); } catch (IOException e) { @@ -103,12 +104,12 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage final UITemplate.Builder builder = new UITemplate.Builder(); final UITemplate uiTemplate = setting.toEntity(); - final GetResponse response = getClient().get(UITemplate.INDEX_NAME, uiTemplate.id()); - if (!response.isExists()) { + final boolean exist = getClient().existDoc(UITemplate.INDEX_NAME, uiTemplate.id()); + if (!exist) { return TemplateChangeStatus.builder().status(false).message("Can't find the template").build(); } - XContentBuilder xContentBuilder = map2builder(builder.entity2Storage(uiTemplate)); + Map<String, Object> xContentBuilder = builder.entity2Storage(uiTemplate); getClient().forceUpdate(UITemplate.INDEX_NAME, uiTemplate.id(), xContentBuilder); return TemplateChangeStatus.builder().status(true).build(); } catch (IOException e) { @@ -119,13 +120,13 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage @Override public TemplateChangeStatus disableTemplate(final String name) throws IOException { - final GetResponse response = getClient().get(UITemplate.INDEX_NAME, name); - if (response.isExists()) { + final Optional<Document> response = getClient().get(UITemplate.INDEX_NAME, name); + if (response.isPresent()) { final UITemplate.Builder builder = new UITemplate.Builder(); - final UITemplate uiTemplate = builder.storage2Entity(response.getSourceAsMap()); + final UITemplate uiTemplate = builder.storage2Entity(response.get().getSource()); uiTemplate.setDisabled(BooleanUtils.TRUE); - XContentBuilder xContentBuilder = map2builder(builder.entity2Storage(uiTemplate)); + Map<String, Object> xContentBuilder = builder.entity2Storage(uiTemplate); getClient().forceUpdate(UITemplate.INDEX_NAME, uiTemplate.id(), xContentBuilder); return TemplateChangeStatus.builder().status(true).build(); } else { diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java index f8b68ca..596d240 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java @@ -38,6 +38,8 @@ import org.apache.http.HttpHost; import org.apache.http.HttpStatus; import org.apache.http.client.methods.HttpGet; import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.library.elasticsearch.response.Document; +import org.apache.skywalking.library.elasticsearch.response.Documents; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexNameConverter; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; @@ -49,14 +51,11 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -124,18 +123,6 @@ public class ElasticSearch7Client extends ElasticSearchClient { return response.isAcknowledged(); } - @Override - public boolean createIndex(String indexName, Map<String, Object> settings, - Map<String, Object> mapping) throws IOException { - indexName = formatIndexName(indexName); - CreateIndexRequest request = new CreateIndexRequest(indexName); - request.settings(settings); - request.mapping(mapping); - CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); - log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); - return response.isAcknowledged(); - } - /** * {@inheritDoc} */ @@ -310,7 +297,7 @@ public class ElasticSearch7Client extends ElasticSearchClient { } @Override - public GetResponse get(String indexName, String id) throws IOException { + public Optional<Document> get(String indexName, String id) throws IOException { indexName = formatIndexName(indexName); GetRequest request = new GetRequest(indexName, id); try { @@ -324,7 +311,7 @@ public class ElasticSearch7Client extends ElasticSearchClient { } @Override - public SearchResponse ids(String indexName, String[] ids) throws IOException { + public Optional<Documents> ids(String indexName, String[] ids) throws IOException { indexName = formatIndexName(indexName); SearchRequest searchRequest = new SearchRequest(indexName); @@ -393,25 +380,6 @@ public class ElasticSearch7Client extends ElasticSearchClient { return HttpStatus.SC_OK; } - /** - * @since 8.7.0 SkyWalking don't use sync bulk anymore. This method is just kept for unexpected case in the future. - */ - @Deprecated - @Override - public void synchronousBulk(BulkRequest request) { - request.timeout(TimeValue.timeValueMinutes(2)); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - request.waitForActiveShards(ActiveShardCount.ONE); - try { - int size = request.requests().size(); - BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT); - log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size); - healthChecker.health(); - } catch (Throwable t) { - healthChecker.unHealth(t); - } - } - @Override public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) { BulkProcessor.Listener listener = createBulkListener();