This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch storage-elasticsearch-health in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 8e16d6178c385d4dc123332c9d977336e7869994 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Jul 15 06:15:15 2020 +0800 Add HealthChecker helper Signed-off-by: Gao Hongtao <[email protected]> --- .../src/main/resources/application.yml | 1 + .../client/elasticsearch/ElasticSearchClient.java | 115 ++++++++++++++++----- .../library/client/healthcheck/HealthChecker.java | 49 +++++++++ .../library/client/healthcheck/HealthListener.java | 30 ++++++ .../client/jdbc/hikaricp/JDBCHikariCPClient.java | 24 ++--- .../StorageModuleElasticsearchConfig.java | 2 + .../StorageModuleElasticsearchProvider.java | 11 ++ .../storage/plugin/jdbc/h2/H2StorageProvider.java | 8 +- 8 files changed, 195 insertions(+), 45 deletions(-) diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index 0e89003..4293c6f 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -104,6 +104,7 @@ storage: segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200} profileTaskQueryMaxSize: ${SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE:200} advanced: ${SW_STORAGE_ES_ADVANCED:""} + enableHealthCheck: ${SW_STORAGE_ES_ENABLE_HEALTH_CHECK:false} elasticsearch7: nameSpace: ${SW_NAMESPACE:""} clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200} diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index 03193ea..059d15f 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.SSLContext; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -58,6 +59,8 @@ import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContexts; import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.client.healthcheck.HealthChecker; +import org.apache.skywalking.oap.server.library.client.healthcheck.HealthListener; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -105,6 +108,8 @@ public class ElasticSearchClient implements Client { private volatile String password; private final List<IndexNameConverter> indexNameConverters; protected volatile RestHighLevelClient client; + private HealthChecker healthChecker = HealthChecker.DEFAULT_CHECKER; + private final ReentrantLock connectLock = new ReentrantLock(); public ElasticSearchClient(String clusterNodes, String protocol, @@ -124,16 +129,25 @@ public class ElasticSearchClient implements Client { @Override public void connect() throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException { - List<HttpHost> hosts = parseClusterNodes(protocol, clusterNodes); - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - log.error("ElasticSearch client reconnection fails based on new config", t); + connectLock.lock(); + try { + List<HttpHost> hosts = parseClusterNodes(protocol, clusterNodes); + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + log.error("ElasticSearch client reconnection fails based on new config", t); + } } + client = createClient(hosts); + client.ping(RequestOptions.DEFAULT); + } finally { + connectLock.unlock(); } - client = createClient(hosts); - client.ping(RequestOptions.DEFAULT); + } + + public void activeHealthChecker(HealthListener healthListener) { + healthChecker = new HealthChecker(healthListener); } protected RestHighLevelClient createClient( @@ -210,10 +224,23 @@ public class ElasticSearchClient implements Client { public List<String> retrievalIndexByAliases(String aliases) throws IOException { aliases = formatIndexName(aliases); - Response response = client.getLowLevelClient().performRequest(new Request(HttpGet.METHOD_NAME, "/_alias/" + aliases)); + Response response; + try { + response = client.getLowLevelClient().performRequest(new Request(HttpGet.METHOD_NAME, "/_alias/" + aliases)); + healthChecker.health(); + } catch (Throwable t) { + healthChecker.unHealth(t); + throw t; + } if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) { Gson gson = new Gson(); - InputStreamReader reader = new InputStreamReader(response.getEntity().getContent()); + InputStreamReader reader; + try { + reader = new InputStreamReader(response.getEntity().getContent()); + } catch (Throwable t) { + healthChecker.unHealth(t); + throw t; + } JsonObject responseJson = gson.fromJson(reader, JsonObject.class); log.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson); return new ArrayList<>(responseJson.keySet()); @@ -308,13 +335,39 @@ public class ElasticSearchClient implements Client { SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.types(TYPE); searchRequest.source(searchSourceBuilder); - return client.search(searchRequest, RequestOptions.DEFAULT); + try { + SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); + healthChecker.health(); + return response; + } catch (Throwable t) { + healthChecker.unHealth(t); + if (t instanceof IllegalStateException) { + IllegalStateException ise = (IllegalStateException) t; + // Fixed the issue described in https://github.com/elastic/elasticsearch/issues/39946 + if (ise.getMessage().contains("I/O reactor status: STOPPED") && + connectLock.tryLock()) { + try { + connect(); + } catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) { + throw new IllegalStateException("Can't reconnect to Elasticsearch", e); + } + } + } + throw t; + } } public GetResponse get(String indexName, String id) throws IOException { indexName = formatIndexName(indexName); GetRequest request = new GetRequest(indexName, TYPE, id); - return client.get(request, RequestOptions.DEFAULT); + try { + GetResponse response = client.get(request, RequestOptions.DEFAULT); + healthChecker.health(); + return response; + } catch (Throwable t) { + healthChecker.unHealth(t); + throw t; + } } public SearchResponse ids(String indexName, String[] ids) throws IOException { @@ -323,28 +376,39 @@ public class ElasticSearchClient implements Client { SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.types(TYPE); searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length); - return client.search(searchRequest, RequestOptions.DEFAULT); + try { + SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); + healthChecker.health(); + return response; + } catch (Throwable t) { + healthChecker.unHealth(t); + throw t; + } } public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException { IndexRequest request = (IndexRequest) prepareInsert(indexName, id, source); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.index(request, RequestOptions.DEFAULT); - } - - public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException { - org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate( - indexName, id, source); - request.version(version); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.update(request, RequestOptions.DEFAULT); + try { + client.index(request, RequestOptions.DEFAULT); + healthChecker.health(); + } catch (Throwable t) { + healthChecker.unHealth(t); + throw t; + } } public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException { org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate( indexName, id, source); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.update(request, RequestOptions.DEFAULT); + try { + client.update(request, RequestOptions.DEFAULT); + healthChecker.health(); + } catch (Throwable t) { + healthChecker.unHealth(t); + throw t; + } } public InsertRequest prepareInsert(String indexName, String id, XContentBuilder source) { @@ -377,8 +441,9 @@ public class ElasticSearchClient implements Client { int size = request.requests().size(); BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT); log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size); - } catch (IOException e) { - log.error(e.getMessage(), e); + healthChecker.health(); + } catch (Throwable t) { + healthChecker.unHealth(t); } } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthChecker.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthChecker.java new file mode 100644 index 0000000..49de2b4 --- /dev/null +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthChecker.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.client.healthcheck; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * HealthChecker could provide health status to the listener. + */ +@Slf4j +@RequiredArgsConstructor +public class HealthChecker { + public static final HealthChecker DEFAULT_CHECKER = new HealthChecker(health -> { }); + + private final HealthListener listener; + + /** + * Invoking when service is healthy. + */ + public void health() { + listener.listen(true); + } + + /** + * Invoking when service is unhealthy. + * @param t the reason of unhealthy status. + */ + public void unHealth(Throwable t) { + log.error("Elasticsearch health check is failed", t); + listener.listen(false); + } +} diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthListener.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthListener.java new file mode 100644 index 0000000..700db1c --- /dev/null +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.client.healthcheck; + +/** + * HealthChecker checks service health. + */ +public interface HealthListener { + + /** + * Listening health status. + */ + void listen(boolean isHealthy); +} diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java index f99a6a0..6cf15de 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java @@ -26,11 +26,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.client.healthcheck.HealthChecker; +import org.apache.skywalking.oap.server.library.client.healthcheck.HealthListener; import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,20 +41,14 @@ public class JDBCHikariCPClient implements Client { private HikariDataSource dataSource; private HikariConfig hikariConfig; + private HealthChecker healthChecker = HealthChecker.DEFAULT_CHECKER; public JDBCHikariCPClient(Properties properties) { hikariConfig = new HikariConfig(properties); } - public void setHealthCheckListener(Consumer<Boolean> healthListener) { - ScheduledExecutorService asyncHealthScheduler = Executors.newSingleThreadScheduledExecutor(); - asyncHealthScheduler.scheduleAtFixedRate(() -> { - try (Connection c = dataSource.getConnection()) { - healthListener.accept(true); - } catch (SQLException ignored) { - healthListener.accept(false); - } - }, 0, 3, TimeUnit.SECONDS); + public void activeHealthCheck(HealthListener listener) { + healthChecker = new HealthChecker(listener); } @Override @@ -93,7 +85,9 @@ public class JDBCHikariCPClient implements Client { logger.debug("execute aql: {}", sql); try (Statement statement = connection.createStatement()) { statement.execute(sql); + healthChecker.health(); } catch (SQLException e) { + healthChecker.unHealth(e); throw new JDBCClientException(e.getMessage(), e); } } @@ -107,6 +101,7 @@ public class JDBCHikariCPClient implements Client { setStatementParam(statement, params); result = statement.execute(); statement.closeOnCompletion(); + healthChecker.health(); } catch (SQLException e) { if (statement != null) { try { @@ -114,6 +109,7 @@ public class JDBCHikariCPClient implements Client { } catch (SQLException e1) { } } + healthChecker.unHealth(e); throw new JDBCClientException(e.getMessage(), e); } @@ -129,6 +125,7 @@ public class JDBCHikariCPClient implements Client { setStatementParam(statement, params); rs = statement.executeQuery(); statement.closeOnCompletion(); + healthChecker.health(); } catch (SQLException e) { if (statement != null) { try { @@ -136,6 +133,7 @@ public class JDBCHikariCPClient implements Client { } catch (SQLException e1) { } } + healthChecker.unHealth(e); throw new JDBCClientException(e.getMessage(), e); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java index a33e992..5435e72 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java @@ -92,5 +92,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { private int profileTaskQueryMaxSize = 200; @Setter private String advanced; + @Setter + private boolean enableHealthCheck; } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java index 9f591cb..8622d63 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -74,6 +74,10 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNR import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.UITemplateManagementEsDAO; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; /** * The storage provider for ElasticSearch 6. @@ -181,6 +185,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { @Override public void start() throws ModuleStartException { + MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); + GaugeMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); + healthChecker.setValue(1); try { elasticSearchClient.connect(); StorageEsInstaller installer = new StorageEsInstaller(elasticSearchClient, getManager(), config); @@ -189,6 +196,10 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) { throw new ModuleStartException(e.getMessage(), e); } + if (!config.isEnableHealthCheck()) { + return; + } + elasticSearchClient.activeHealthChecker(isHealthy -> healthChecker.setValue(isHealthy ? 0 : 1)); } @Override diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java index 067bf2c..88288cc 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java @@ -146,13 +146,7 @@ public class H2StorageProvider extends ModuleProvider { } catch (StorageException e) { throw new ModuleStartException(e.getMessage(), e); } - h2Client.setHealthCheckListener(isHealthy -> { - if (isHealthy) { - healthChecker.setValue(0); - } else { - healthChecker.setValue(1); - } - }); + h2Client.activeHealthCheck(healthy -> healthChecker.setValue(healthy ? 0 : 1)); } @Override
