This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch storage-elasticsearch-health in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit f45a1276584a2a539e5e2fedc00af2d8a15022cb Author: Gao Hongtao <[email protected]> AuthorDate: Tue Jul 14 10:48:58 2020 +0800 Update Elasticsearch client version to latest Signed-off-by: Gao Hongtao <[email protected]> --- oap-server/pom.xml | 2 +- .../client/elasticsearch/ElasticSearchClient.java | 62 +++++++++++----------- .../storage-elasticsearch7-plugin/pom.xml | 2 +- 3 files changed, 32 insertions(+), 34 deletions(-) diff --git a/oap-server/pom.xml b/oap-server/pom.xml index ee05cb4..70b66e1 100755 --- a/oap-server/pom.xml +++ b/oap-server/pom.xml @@ -64,7 +64,7 @@ <h2.version>1.4.196</h2.version> <commons-dbcp.version>1.4</commons-dbcp.version> <commons-io.version>2.6</commons-io.version> - <elasticsearch.version>6.3.2</elasticsearch.version> + <elasticsearch.version>6.8.10</elasticsearch.version> <joda-time.version>2.10.5</joda-time.version> <kubernetes.version>8.0.0</kubernetes.version> <hikaricp.version>3.1.0</hikaricp.version> diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index 5d2a90f..03193ea 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -60,11 +60,7 @@ import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; @@ -76,10 +72,16 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; @@ -131,7 +133,7 @@ public class ElasticSearchClient implements Client { } } client = createClient(hosts); - client.ping(); + client.ping(RequestOptions.DEFAULT); } protected RestHighLevelClient createClient( @@ -189,7 +191,7 @@ public class ElasticSearchClient implements Client { indexName = formatIndexName(indexName); CreateIndexRequest request = new CreateIndexRequest(indexName); - CreateIndexResponse response = client.indices().create(request); + CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); return response.isAcknowledged(); } @@ -200,15 +202,15 @@ public class ElasticSearchClient implements Client { CreateIndexRequest request = new CreateIndexRequest(indexName); Gson gson = new Gson(); request.settings(gson.toJson(settings), XContentType.JSON); - request.mapping(TYPE, gson.toJson(mapping), XContentType.JSON); - CreateIndexResponse response = client.indices().create(request); + request.mapping(mapping); + CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); return response.isAcknowledged(); } public List<String> retrievalIndexByAliases(String aliases) throws IOException { aliases = formatIndexName(aliases); - Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/_alias/" + aliases); + Response response = client.getLowLevelClient().performRequest(new Request(HttpGet.METHOD_NAME, "/_alias/" + aliases)); if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) { Gson gson = new Gson(); InputStreamReader reader = new InputStreamReader(response.getEntity().getContent()); @@ -245,23 +247,19 @@ public class ElasticSearchClient implements Client { indexName = formatIndexName(indexName); } DeleteIndexRequest request = new DeleteIndexRequest(indexName); - DeleteIndexResponse response; - response = client.indices().delete(request); + AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT); log.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); return response.isAcknowledged(); } public boolean isExistsIndex(String indexName) throws IOException { - indexName = formatIndexName(indexName); - GetIndexRequest request = new GetIndexRequest(); - request.indices(indexName); - return client.indices().exists(request); + return client.indices().exists(new GetIndexRequest(formatIndexName(indexName)), RequestOptions.DEFAULT); } public boolean isExistsTemplate(String indexName) throws IOException { indexName = formatIndexName(indexName); - Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName); + Response response = client.getLowLevelClient().performRequest(new Request(HttpHead.METHOD_NAME, "/_template/" + indexName)); int statusCode = response.getStatusLine().getStatusCode(); if (statusCode == HttpStatus.SC_OK) { @@ -291,9 +289,9 @@ public class ElasticSearchClient implements Client { HttpEntity entity = new NStringEntity(new Gson().toJson(template), ContentType.APPLICATION_JSON); - Response response = client.getLowLevelClient() - .performRequest( - HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity); + Request request = new Request(HttpPut.METHOD_NAME, "/_template/" + indexName); + request.setEntity(entity); + Response response = client.getLowLevelClient().performRequest(request); return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK; } @@ -301,7 +299,7 @@ public class ElasticSearchClient implements Client { indexName = formatIndexName(indexName); Response response = client.getLowLevelClient() - .performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName); + .performRequest(new Request(HttpDelete.METHOD_NAME, "/_template/" + indexName)); return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK; } @@ -310,13 +308,13 @@ public class ElasticSearchClient implements Client { SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.types(TYPE); searchRequest.source(searchSourceBuilder); - return client.search(searchRequest); + return client.search(searchRequest, RequestOptions.DEFAULT); } public GetResponse get(String indexName, String id) throws IOException { indexName = formatIndexName(indexName); GetRequest request = new GetRequest(indexName, TYPE, id); - return client.get(request); + return client.get(request, RequestOptions.DEFAULT); } public SearchResponse ids(String indexName, String[] ids) throws IOException { @@ -325,13 +323,13 @@ public class ElasticSearchClient implements Client { SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.types(TYPE); searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length); - return client.search(searchRequest); + return client.search(searchRequest, RequestOptions.DEFAULT); } public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException { IndexRequest request = (IndexRequest) prepareInsert(indexName, id, source); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.index(request); + client.index(request, RequestOptions.DEFAULT); } public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException { @@ -339,14 +337,14 @@ public class ElasticSearchClient implements Client { indexName, id, source); request.version(version); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.update(request); + client.update(request, RequestOptions.DEFAULT); } public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException { org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate( indexName, id, source); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.update(request); + client.update(request, RequestOptions.DEFAULT); } public InsertRequest prepareInsert(String indexName, String id, XContentBuilder source) { @@ -361,12 +359,12 @@ public class ElasticSearchClient implements Client { public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException { indexName = formatIndexName(indexName); - Map<String, String> params = Collections.singletonMap("conflicts", "proceed"); String jsonString = "{" + " \"query\": {" + " \"range\": {" + " \"" + timeBucketColumnName + "\": {" + " \"lte\": " + endTimeBucket + " }" + " }" + " }" + "}"; HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON); - Response response = client.getLowLevelClient() - .performRequest( - HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity); + Request request = new Request(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query"); + request.setEntity(entity); + request.addParameter("conflicts", "proceed"); + Response response = client.getLowLevelClient().performRequest(request); log.debug("delete indexName: {}, jsonString : {}", indexName, jsonString); return response.getStatusLine().getStatusCode(); } @@ -377,7 +375,7 @@ public class ElasticSearchClient implements Client { request.waitForActiveShards(ActiveShardCount.ONE); try { int size = request.requests().size(); - BulkResponse responses = client.bulk(request); + BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT); log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size); } catch (IOException e) { log.error(e.getMessage(), e); @@ -387,7 +385,7 @@ public class ElasticSearchClient implements Client { public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) { BulkProcessor.Listener listener = createBulkListener(); - return BulkProcessor.builder(client::bulkAsync, listener) + return BulkProcessor.builder((request, actionListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, actionListener), listener) .setBulkActions(bulkActions) .setFlushInterval(TimeValue.timeValueSeconds(flushInterval)) .setConcurrentRequests(concurrentRequests) diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml index 109392b..3129019 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml +++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml @@ -29,7 +29,7 @@ <artifactId>storage-elasticsearch7-plugin</artifactId> <properties> - <elasticsearch.version>7.0.0</elasticsearch.version> + <elasticsearch.version>7.8.0</elasticsearch.version> </properties> <dependencyManagement>
