This is an automated email from the ASF dual-hosted git repository. gongchao pushed a commit to branch update-collector-info in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
commit 0f010dab5cae3e19a8015122df8e5c2490182d11 Author: tomsun28 <[email protected]> AuthorDate: Thu Jun 20 00:17:54 2024 +0800 [improve] update collector info, support env config or auto fetch Signed-off-by: tomsun28 <[email protected]> --- .../collector/dispatch/entrance/CollectServer.java | 34 +- collector/src/main/resources/application.yml | 4 +- .../hertzbeat/common/entity/dto/CollectorInfo.java | 1 - .../apache/hertzbeat/common/util/IpDomainUtil.java | 6 +- .../netty/process/CollectorOnlineProcessor.java | 8 + .../history/greptime/GreptimeDbDataStorage.java | 593 ++++++++++----------- 6 files changed, 314 insertions(+), 332 deletions(-) diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java index 2b45cce45..9baabd150 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java @@ -38,7 +38,6 @@ import org.apache.hertzbeat.collector.dispatch.timer.TimerDispatch; import org.apache.hertzbeat.common.entity.dto.CollectorInfo; import org.apache.hertzbeat.common.entity.message.ClusterMsg; import org.apache.hertzbeat.common.support.CommonThreadPool; -import org.apache.hertzbeat.common.util.IpDomainUtil; import org.apache.hertzbeat.common.util.JsonUtil; import org.apache.hertzbeat.remoting.RemotingClient; import org.apache.hertzbeat.remoting.event.NettyEventListener; @@ -63,12 +62,12 @@ public class CollectServer implements CommandLineRunner { private final TimerDispatch timerDispatch; + private final CollectorInfoProperties infoProperties; + private RemotingClient remotingClient; private ScheduledExecutorService scheduledExecutor; - private Info info; - public CollectServer(final CollectJobService collectJobService, final TimerDispatch timerDispatch, final DispatchProperties properties, @@ -85,15 +84,15 @@ public class CollectServer implements CommandLineRunner { this.collectJobService = collectJobService; this.timerDispatch = timerDispatch; this.collectJobService.setCollectServer(this); - this.init(properties, threadPool, infoProperties); + this.infoProperties = infoProperties; + this.init(properties, threadPool); } - private void init(final DispatchProperties properties, final CommonThreadPool threadPool, final CollectorInfoProperties infoProperties) { + private void init(final DispatchProperties properties, final CommonThreadPool threadPool) { NettyClientConfig nettyClientConfig = new NettyClientConfig(); DispatchProperties.EntranceProperties.NettyProperties nettyProperties = properties.getEntrance().getNetty(); nettyClientConfig.setServerHost(nettyProperties.getManagerHost()); nettyClientConfig.setServerPort(nettyProperties.getManagerPort()); - this.initInfo(infoProperties); this.remotingClient = new NettyRemotingClient(nettyClientConfig, new CollectNettyEventListener(), threadPool); this.remotingClient.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, new HeartbeatProcessor()); @@ -135,9 +134,9 @@ public class CollectServer implements CommandLineRunner { String mode = CollectServer.this.collectJobService.getCollectorMode(); CollectorInfo collectorInfo = CollectorInfo.builder() .name(identity) - .ip(info.ip) + .ip(infoProperties.getIp()) .mode(mode) - .version(info.version) + .version(infoProperties.getVersion()) // todo more info .build(); timerDispatch.goOnline(); @@ -181,23 +180,4 @@ public class CollectServer implements CommandLineRunner { log.info("handle idle event triggered. collector is going offline."); } } - - private void initInfo(final CollectorInfoProperties infoProperties) { - info = new Info(); - info.setVersion(infoProperties.getVersion()); - info.setIp(IpDomainUtil.getIpFromEnvOrDefault(infoProperties.getIp(), IpDomainUtil.getLocalhostIp())); - } - - private static class Info { - private String version; - private String ip; - - public void setVersion(String version) { - this.version = version; - } - - public void setIp(String ip) { - this.ip = ip; - } - } } diff --git a/collector/src/main/resources/application.yml b/collector/src/main/resources/application.yml index 24ebef57c..13d52e92b 100644 --- a/collector/src/main/resources/application.yml +++ b/collector/src/main/resources/application.yml @@ -39,8 +39,8 @@ spring: collector: info: - version: 1.5.0 - ip: IP + version: ${COLLECTOR_VERSION:1.6.0} + ip: ${COLLECTOR_IP:} dispatch: entrance: netty: diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/dto/CollectorInfo.java b/common/src/main/java/org/apache/hertzbeat/common/entity/dto/CollectorInfo.java index a9529af45..7c5a889e8 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/entity/dto/CollectorInfo.java +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/dto/CollectorInfo.java @@ -38,7 +38,6 @@ public class CollectorInfo { @NotNull private String name; - @NotNull private String ip; private String version; diff --git a/common/src/main/java/org/apache/hertzbeat/common/util/IpDomainUtil.java b/common/src/main/java/org/apache/hertzbeat/common/util/IpDomainUtil.java index a29badfc1..21d8d57de 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/util/IpDomainUtil.java +++ b/common/src/main/java/org/apache/hertzbeat/common/util/IpDomainUtil.java @@ -106,11 +106,7 @@ public final class IpDomainUtil { } return null; } - - public static String getIpFromEnvOrDefault(String env, String defaultIp) { - return System.getenv().getOrDefault(env, defaultIp); - } - + /** * * @param ipDomain ip domain diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java index 66f7d62a7..ab48b2203 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java @@ -18,12 +18,14 @@ package org.apache.hertzbeat.manager.scheduler.netty.process; import io.netty.channel.ChannelHandlerContext; +import java.net.InetSocketAddress; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.common.entity.dto.CollectorInfo; import org.apache.hertzbeat.common.entity.message.ClusterMsg; import org.apache.hertzbeat.common.util.JsonUtil; import org.apache.hertzbeat.manager.scheduler.netty.ManageServer; import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor; +import org.springframework.util.StringUtils; /** * handle collector online message @@ -41,6 +43,12 @@ public class CollectorOnlineProcessor implements NettyRemotingProcessor { String collector = message.getIdentity(); log.info("the collector {} actively requests to go online.", collector); CollectorInfo collectorInfo = JsonUtil.fromJson(message.getMsg(), CollectorInfo.class); + if (collectorInfo != null && !StringUtils.hasText(collectorInfo.getIp())) { + // fetch remote ip address + InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIP = socketAddress.getAddress().getHostAddress(); + collectorInfo.setIp(clientIP); + } this.manageServer.addChannel(collector, ctx.channel()); this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(collector, collectorInfo); return null; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeDbDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeDbDataStorage.java index a44417121..8d4646f66 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeDbDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeDbDataStorage.java @@ -57,7 +57,6 @@ import org.springframework.stereotype.Component; /** * GreptimeDB data storage, only supports GreptimeDB version >= v0.5 - * */ @Component @ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled", havingValue = "true") @@ -79,13 +78,13 @@ public class GreptimeDbDataStorage extends AbstractHistoryDataStorage { private static final String CONSTANTS_CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS `%s`"; private static final Runnable INSTANCE_EXCEPTION_PRINT = () -> { - if (log.isErrorEnabled()) { - log.error(""" - \t---------------GreptimeDB Init Failed--------------- - \t--------------Please Config GreptimeDB-------------- - t-----------Can Not Use Metric History Now----------- - """); - } + if (log.isErrorEnabled()) { + log.error(""" + \t---------------GreptimeDB Init Failed--------------- + \t--------------Please Config GreptimeDB-------------- + t-----------Can Not Use Metric History Now----------- + """); + } }; private HikariDataSource hikariDataSource; @@ -93,331 +92,331 @@ public class GreptimeDbDataStorage extends AbstractHistoryDataStorage { private GreptimeDB greptimeDb; public GreptimeDbDataStorage(GreptimeProperties greptimeProperties) { - if (greptimeProperties == null) { - log.error("init error, please config Warehouse GreptimeDB props in application.yml"); - throw new IllegalArgumentException("please config Warehouse GreptimeDB props"); - } - - serverAvailable = initGreptimeDbClient(greptimeProperties) && initGreptimeDbDataSource(greptimeProperties); + if (greptimeProperties == null) { + log.error("init error, please config Warehouse GreptimeDB props in application.yml"); + throw new IllegalArgumentException("please config Warehouse GreptimeDB props"); + } + + serverAvailable = initGreptimeDbClient(greptimeProperties) && initGreptimeDbDataSource(greptimeProperties); } private void initGreptimeDb(final GreptimeProperties greptimeProperties) throws SQLException { - final DriverPropertyInfo[] properties = new Driver().getPropertyInfo(greptimeProperties.url(), null); - final String host = ObjectUtils.requireNonEmpty(properties[0].value); - final String port = ObjectUtils.requireNonEmpty(properties[1].value); - final String dbName = ObjectUtils.requireNonEmpty(properties[2].value); - - try (final Connection tempConnection = DriverManager.getConnection("jdbc:mysql://" + host + ":" + port, - greptimeProperties.username(), greptimeProperties.password()); - final PreparedStatement pstmt = tempConnection - .prepareStatement(String.format(CONSTANTS_CREATE_DATABASE, dbName))) { - log.info("[warehouse greptime] try to create database `{}` if not exists", dbName); - pstmt.execute(); - } + final DriverPropertyInfo[] properties = new Driver().getPropertyInfo(greptimeProperties.url(), null); + final String host = ObjectUtils.requireNonEmpty(properties[0].value); + final String port = ObjectUtils.requireNonEmpty(properties[1].value); + final String dbName = ObjectUtils.requireNonEmpty(properties[2].value); + + try (final Connection tempConnection = DriverManager.getConnection("jdbc:mysql://" + host + ":" + port, + greptimeProperties.username(), greptimeProperties.password()); + final PreparedStatement pstmt = tempConnection + .prepareStatement(String.format(CONSTANTS_CREATE_DATABASE, dbName))) { + log.info("[warehouse greptime] try to create database `{}` if not exists", dbName); + pstmt.execute(); + } } private boolean initGreptimeDbClient(GreptimeProperties greptimeProperties) { - String endpoints = greptimeProperties.grpcEndpoints(); - try { - final DriverPropertyInfo[] properties = new Driver().getPropertyInfo(greptimeProperties.url(), null); - final String dbName = ObjectUtils.requireNonEmpty(properties[2].value); - - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints.split(","), dbName) // - .writeMaxRetries(3) // - .authInfo(new AuthInfo(greptimeProperties.username(), greptimeProperties.password())) - .routeTableRefreshPeriodSeconds(30) // - .build(); - - this.greptimeDb = GreptimeDB.create(opts); - } catch (Exception e) { - log.error("[warehouse greptime] Fail to start GreptimeDB client"); - return false; - } - - return true; + String endpoints = greptimeProperties.grpcEndpoints(); + try { + final DriverPropertyInfo[] properties = new Driver().getPropertyInfo(greptimeProperties.url(), null); + final String dbName = ObjectUtils.requireNonEmpty(properties[2].value); + + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints.split(","), dbName) // + .writeMaxRetries(3) // + .authInfo(new AuthInfo(greptimeProperties.username(), greptimeProperties.password())) + .routeTableRefreshPeriodSeconds(30) // + .build(); + + this.greptimeDb = GreptimeDB.create(opts); + } catch (Exception e) { + log.error("[warehouse greptime] Fail to start GreptimeDB client"); + return false; + } + + return true; } private boolean initGreptimeDbDataSource(final GreptimeProperties greptimeProperties) { - try { - initGreptimeDb(greptimeProperties); - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error(e.getMessage(), e); - } - - INSTANCE_EXCEPTION_PRINT.run(); - return false; - } - - final HikariConfig config = new HikariConfig(); - // jdbc properties - config.setJdbcUrl(greptimeProperties.url()); - config.setUsername(greptimeProperties.username()); - config.setPassword(greptimeProperties.password()); - config.setDriverClassName(greptimeProperties.driverClassName()); - // minimum number of idle connection - config.setMinimumIdle(10); - // maximum number of connection in the pool - config.setMaximumPoolSize(10); - // maximum wait milliseconds for get connection from pool - config.setConnectionTimeout(30000); - // maximum lifetime for each connection - config.setMaxLifetime(0); - // max idle time for recycle idle connection - config.setIdleTimeout(0); - // validation query - config.setConnectionTestQuery("select 1"); - try { - this.hikariDataSource = new HikariDataSource(config); - } catch (Exception e) { - INSTANCE_EXCEPTION_PRINT.run(); - return false; - } - return true; + try { + initGreptimeDb(greptimeProperties); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error(e.getMessage(), e); + } + + INSTANCE_EXCEPTION_PRINT.run(); + return false; + } + + final HikariConfig config = new HikariConfig(); + // jdbc properties + config.setJdbcUrl(greptimeProperties.url()); + config.setUsername(greptimeProperties.username()); + config.setPassword(greptimeProperties.password()); + config.setDriverClassName(greptimeProperties.driverClassName()); + // minimum number of idle connection + config.setMinimumIdle(10); + // maximum number of connection in the pool + config.setMaximumPoolSize(10); + // maximum wait milliseconds for get connection from pool + config.setConnectionTimeout(30000); + // maximum lifetime for each connection + config.setMaxLifetime(0); + // max idle time for recycle idle connection + config.setIdleTimeout(0); + // validation query + config.setConnectionTestQuery("select 1"); + try { + this.hikariDataSource = new HikariDataSource(config); + } catch (Exception e) { + INSTANCE_EXCEPTION_PRINT.run(); + return false; + } + return true; } @Override public void saveData(CollectRep.MetricsData metricsData) { - if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) { - return; - } - if (metricsData.getValuesList().isEmpty()) { - log.info("[warehouse greptime] flush metrics data {} is null, ignore.", metricsData.getId()); - return; - } - String monitorId = String.valueOf(metricsData.getId()); - String tableName = getTableName(metricsData.getApp(), metricsData.getMetrics()); - TableSchema.Builder tableSchemaBuilder = TableSchema.newBuilder(tableName); - - tableSchemaBuilder.addTag("monitor_id", DataType.String) // - .addTag("instance", DataType.String) // - .addTimestamp("ts", DataType.TimestampMillisecond); - - List<CollectRep.Field> fieldsList = metricsData.getFieldsList(); - for (CollectRep.Field field : fieldsList) { - // handle field type - if (field.getType() == CommonConstants.TYPE_NUMBER) { - tableSchemaBuilder.addField(field.getName(), DataType.Float64); - } else if (field.getType() == CommonConstants.TYPE_STRING) { - tableSchemaBuilder.addField(field.getName(), DataType.String); - } - } - Table table = Table.from(tableSchemaBuilder.build()); - - try { - long now = System.currentTimeMillis(); - Object[] values = new Object[3 + fieldsList.size()]; - values[0] = monitorId; - values[2] = now; - for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { - Map<String, String> labels = new HashMap<>(8); - for (int i = 0; i < fieldsList.size(); i++) { - if (!CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) { - CollectRep.Field field = fieldsList.get(i); - if (field.getType() == CommonConstants.TYPE_NUMBER) { - values[3 + i] = Double.parseDouble(valueRow.getColumns(i)); - } else if (field.getType() == CommonConstants.TYPE_STRING) { - values[3 + i] = valueRow.getColumns(i); - } - if (field.getLabel()) { - labels.put(field.getName(), String.valueOf(values[3 + i])); - } - } else { - values[3 + i] = null; - } - } - values[1] = JsonUtil.toJson(labels); - table.addRow(values); - } - - CompletableFuture<Result<WriteOk, Err>> writeFuture = greptimeDb.write(table); - try { - Result<WriteOk, Err> result = writeFuture.get(10, TimeUnit.SECONDS); - if (result.isOk()) { - log.debug("[warehouse greptime]-Write successful"); - } else { - log.warn("[warehouse greptime]--Write failed: {}", result.getErr()); - } - } catch (Throwable throwable) { - log.error("[warehouse greptime]--Error occurred: {}", throwable.getMessage()); - } - } catch (Exception e) { - log.error("[warehouse greptime]--Error: {}", e.getMessage(), e); - } + if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) { + return; + } + if (metricsData.getValuesList().isEmpty()) { + log.info("[warehouse greptime] flush metrics data {} is null, ignore.", metricsData.getId()); + return; + } + String monitorId = String.valueOf(metricsData.getId()); + String tableName = getTableName(metricsData.getApp(), metricsData.getMetrics()); + TableSchema.Builder tableSchemaBuilder = TableSchema.newBuilder(tableName); + + tableSchemaBuilder.addTag("monitor_id", DataType.String) // + .addTag("instance", DataType.String) // + .addTimestamp("ts", DataType.TimestampMillisecond); + + List<CollectRep.Field> fieldsList = metricsData.getFieldsList(); + for (CollectRep.Field field : fieldsList) { + // handle field type + if (field.getType() == CommonConstants.TYPE_NUMBER) { + tableSchemaBuilder.addField(field.getName(), DataType.Float64); + } else if (field.getType() == CommonConstants.TYPE_STRING) { + tableSchemaBuilder.addField(field.getName(), DataType.String); + } + } + Table table = Table.from(tableSchemaBuilder.build()); + + try { + long now = System.currentTimeMillis(); + Object[] values = new Object[3 + fieldsList.size()]; + values[0] = monitorId; + values[2] = now; + for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { + Map<String, String> labels = new HashMap<>(8); + for (int i = 0; i < fieldsList.size(); i++) { + if (!CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) { + CollectRep.Field field = fieldsList.get(i); + if (field.getType() == CommonConstants.TYPE_NUMBER) { + values[3 + i] = Double.parseDouble(valueRow.getColumns(i)); + } else if (field.getType() == CommonConstants.TYPE_STRING) { + values[3 + i] = valueRow.getColumns(i); + } + if (field.getLabel()) { + labels.put(field.getName(), String.valueOf(values[3 + i])); + } + } else { + values[3 + i] = null; + } + } + values[1] = JsonUtil.toJson(labels); + table.addRow(values); + } + + CompletableFuture<Result<WriteOk, Err>> writeFuture = greptimeDb.write(table); + try { + Result<WriteOk, Err> result = writeFuture.get(10, TimeUnit.SECONDS); + if (result.isOk()) { + log.debug("[warehouse greptime]-Write successful"); + } else { + log.warn("[warehouse greptime]--Write failed: {}", result.getErr()); + } + } catch (Throwable throwable) { + log.error("[warehouse greptime]--Error occurred: {}", throwable.getMessage()); + } + } catch (Exception e) { + log.error("[warehouse greptime]--Error: {}", e.getMessage(), e); + } } @Override public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app, String metrics, String metric, - String label, String history) { - Map<String, List<Value>> instanceValuesMap = new HashMap<>(8); - if (!isServerAvailable()) { - INSTANCE_EXCEPTION_PRINT.run(); - return instanceValuesMap; - } - - String table = getTableName(app, metrics); - - String interval = history2interval(history); - String selectSql = label == null ? String.format(QUERY_HISTORY_SQL, metric, table, interval, monitorId) - : String.format(QUERY_HISTORY_WITH_INSTANCE_SQL, metric, table, interval, monitorId, label); - - if (log.isDebugEnabled()) { - log.debug("[warehouse greptime] getHistoryMetricData SQL: {}", selectSql); - } - - try (Connection connection = hikariDataSource.getConnection(); - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(selectSql)) { - while (resultSet.next()) { - long ts = resultSet.getLong(1); - if (ts == 0) { - if (log.isErrorEnabled()) { - log.error("[warehouse greptime] getHistoryMetricData query result timestamp is 0, ignore. {}.", - selectSql); - } - continue; - } - String instanceValue = resultSet.getString(2); - if (instanceValue == null || "".equals(instanceValue)) { - instanceValue = ""; - } - double value = resultSet.getDouble(3); - String strValue = double2decimalString(value); - - List<Value> valueList = instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>()); - valueList.add(new Value(strValue, ts)); - } - return instanceValuesMap; - } catch (SQLException sqlException) { - String msg = sqlException.getMessage(); - if (msg != null && !msg.contains(TABLE_NOT_EXIST)) { - if (log.isWarnEnabled()) { - log.warn("[warehouse greptime] failed to getHistoryMetricData: " + sqlException.getMessage()); - } - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("[warehouse greptime] failed to getHistoryMetricData:" + e.getMessage(), e); - } - } - return instanceValuesMap; + String label, String history) { + Map<String, List<Value>> instanceValuesMap = new HashMap<>(8); + if (!isServerAvailable()) { + INSTANCE_EXCEPTION_PRINT.run(); + return instanceValuesMap; + } + + String table = getTableName(app, metrics); + + String interval = history2interval(history); + String selectSql = label == null ? String.format(QUERY_HISTORY_SQL, metric, table, interval, monitorId) + : String.format(QUERY_HISTORY_WITH_INSTANCE_SQL, metric, table, interval, monitorId, label); + + if (log.isDebugEnabled()) { + log.debug("[warehouse greptime] getHistoryMetricData SQL: {}", selectSql); + } + + try (Connection connection = hikariDataSource.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(selectSql)) { + while (resultSet.next()) { + long ts = resultSet.getLong(1); + if (ts == 0) { + if (log.isErrorEnabled()) { + log.error("[warehouse greptime] getHistoryMetricData query result timestamp is 0, ignore. {}.", + selectSql); + } + continue; + } + String instanceValue = resultSet.getString(2); + if (instanceValue == null || "".equals(instanceValue)) { + instanceValue = ""; + } + double value = resultSet.getDouble(3); + String strValue = double2decimalString(value); + + List<Value> valueList = instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>()); + valueList.add(new Value(strValue, ts)); + } + return instanceValuesMap; + } catch (SQLException sqlException) { + String msg = sqlException.getMessage(); + if (msg != null && !msg.contains(TABLE_NOT_EXIST)) { + if (log.isWarnEnabled()) { + log.warn("[warehouse greptime] failed to getHistoryMetricData: " + sqlException.getMessage()); + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("[warehouse greptime] failed to getHistoryMetricData:" + e.getMessage(), e); + } + } + return instanceValuesMap; } private String getTableName(String app, String metrics) { - return app + "_" + metrics; + return app + "_" + metrics; } @Override public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, String app, String metrics, - String metric, String label, String history) { - if (!isServerAvailable()) { - INSTANCE_EXCEPTION_PRINT.run(); - return Collections.emptyMap(); - } - String table = getTableName(app, metrics); - List<String> instances = new LinkedList<>(); - if (label != null && !"".equals(label)) { - instances.add(label); - } - if (instances.isEmpty()) { - String selectSql = String.format(QUERY_INSTANCE_SQL, table); - if (log.isDebugEnabled()) { - log.debug("[warehouse greptime] getHistoryIntervalMetricData sql: {}", selectSql); - } - - try (Connection connection = hikariDataSource.getConnection(); - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(selectSql)) { - while (resultSet.next()) { - String instanceValue = resultSet.getString(1); - if (instanceValue == null || "".equals(instanceValue)) { - instances.add("''"); - } else { - instances.add(instanceValue); - } - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("[warehouse greptime] failed to query instances" + e.getMessage(), e); - } - } - } - - Map<String, List<Value>> instanceValuesMap = new HashMap<>(instances.size()); - for (String instanceValue : instances) { - String selectSql = String.format(QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL, metric, metric, metric, metric, - table, instanceValue, history2interval(history)); - - if (log.isDebugEnabled()) { - log.debug("[warehouse greptime] getHistoryIntervalMetricData sql: {}", selectSql); - } - - List<Value> values = instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>()); - try (Connection connection = hikariDataSource.getConnection(); - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(selectSql);) { - while (resultSet.next()) { - long ts = resultSet.getLong(1); - if (ts == 0) { - if (log.isErrorEnabled()) { - log.error( - "[warehouse greptime] getHistoryIntervalMetricData query result timestamp is 0, ignore. {}.", - selectSql); - } - continue; - } - double origin = resultSet.getDouble(2); - String originStr = double2decimalString(origin); - double avg = resultSet.getDouble(3); - String avgStr = double2decimalString(avg); - double min = resultSet.getDouble(4); - String minStr = double2decimalString(min); - double max = resultSet.getDouble(5); - String maxStr = double2decimalString(max); - Value value = Value.builder().origin(originStr).mean(avgStr).min(minStr).max(maxStr).time(ts) - .build(); - values.add(value); - } - resultSet.close(); - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("[warehouse greptime] failed to getHistoryIntervalMetricData: " + e.getMessage(), e); - } - } - } - return instanceValuesMap; + String metric, String label, String history) { + if (!isServerAvailable()) { + INSTANCE_EXCEPTION_PRINT.run(); + return Collections.emptyMap(); + } + String table = getTableName(app, metrics); + List<String> instances = new LinkedList<>(); + if (label != null && !"".equals(label)) { + instances.add(label); + } + if (instances.isEmpty()) { + String selectSql = String.format(QUERY_INSTANCE_SQL, table); + if (log.isDebugEnabled()) { + log.debug("[warehouse greptime] getHistoryIntervalMetricData sql: {}", selectSql); + } + + try (Connection connection = hikariDataSource.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(selectSql)) { + while (resultSet.next()) { + String instanceValue = resultSet.getString(1); + if (instanceValue == null || "".equals(instanceValue)) { + instances.add("''"); + } else { + instances.add(instanceValue); + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("[warehouse greptime] failed to query instances" + e.getMessage(), e); + } + } + } + + Map<String, List<Value>> instanceValuesMap = new HashMap<>(instances.size()); + for (String instanceValue : instances) { + String selectSql = String.format(QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL, metric, metric, metric, metric, + table, instanceValue, history2interval(history)); + + if (log.isDebugEnabled()) { + log.debug("[warehouse greptime] getHistoryIntervalMetricData sql: {}", selectSql); + } + + List<Value> values = instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>()); + try (Connection connection = hikariDataSource.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(selectSql);) { + while (resultSet.next()) { + long ts = resultSet.getLong(1); + if (ts == 0) { + if (log.isErrorEnabled()) { + log.error( + "[warehouse greptime] getHistoryIntervalMetricData query result timestamp is 0, ignore. {}.", + selectSql); + } + continue; + } + double origin = resultSet.getDouble(2); + String originStr = double2decimalString(origin); + double avg = resultSet.getDouble(3); + String avgStr = double2decimalString(avg); + double min = resultSet.getDouble(4); + String minStr = double2decimalString(min); + double max = resultSet.getDouble(5); + String maxStr = double2decimalString(max); + Value value = Value.builder().origin(originStr).mean(avgStr).min(minStr).max(maxStr).time(ts) + .build(); + values.add(value); + } + resultSet.close(); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("[warehouse greptime] failed to getHistoryIntervalMetricData: " + e.getMessage(), e); + } + } + } + return instanceValuesMap; } // TODO(dennis): we can remove it when // https://github.com/GreptimeTeam/greptimedb/issues/4168 is fixed. // default 6h-6 hours: s-seconds, M-minutes, h-hours, d-days, w-weeks private String history2interval(String history) { - if (history == null) { - return null; - } - history = history.trim().toLowerCase(); - - // Be careful, the order matters. - return history.replaceAll("d", " day") // - .replaceAll("s", " second") // - .replaceAll("w", " week") // - .replaceAll("h", " hour")// - .replaceAll("m", " minute"); + if (history == null) { + return null; + } + history = history.trim().toLowerCase(); + + // Be careful, the order matters. + return history.replaceAll("d", " day") // + .replaceAll("s", " second") // + .replaceAll("w", " week") // + .replaceAll("h", " hour")// + .replaceAll("m", " minute"); } private String double2decimalString(double d) { - return BigDecimal.valueOf(d).setScale(4, RoundingMode.HALF_UP).stripTrailingZeros().toPlainString(); + return BigDecimal.valueOf(d).setScale(4, RoundingMode.HALF_UP).stripTrailingZeros().toPlainString(); } @Override public void destroy() { - if (this.greptimeDb != null) { - this.greptimeDb.shutdownGracefully(); - this.greptimeDb = null; - } - if (this.hikariDataSource != null) { - this.hikariDataSource.close(); - hikariDataSource = null; - } + if (this.greptimeDb != null) { + this.greptimeDb.shutdownGracefully(); + this.greptimeDb = null; + } + if (this.hikariDataSource != null) { + this.hikariDataSource.close(); + hikariDataSource = null; + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
