This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 0385892674 [feature] add pushgateway to push module (#3204)
0385892674 is described below
commit 03858926742f28a486d26448760752cb3cc6d3d0
Author: leo <[email protected]>
AuthorDate: Sat Apr 5 17:28:11 2025 +0800
[feature] add pushgateway to push module (#3204)
Signed-off-by: tomsun28 <[email protected]>
Signed-off-by: leo <[email protected]>
Co-authored-by: tomsun28 <[email protected]>
Co-authored-by: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
---
.../alert/service/DataSourceServiceTest.java | 2 +-
.../prometheus/PrometheusAutoCollectImpl.java | 2 +-
.../collect/prometheus/parser/TextParser.java | 1 +
.../prometheus/parser/OnlineParserTest.java | 2 +
.../common/constants/CommonConstants.java | 15 ++
.../hertzbeat/common/entity/dto}/MetricFamily.java | 2 +-
.../common/entity/dto/query/MetricQueryData.java | 82 ++++++++++
.../hertzbeat/common/entity/manager/Monitor.java | 3 +
.../hertzbeat/common/util}/OnlineParser.java | 101 ++++++++-----
.../java/org/apache/hertzbeat/manager/Manager.java | 2 +
.../manager/service/impl/MonitorServiceImpl.java | 2 +-
.../PushErrorRequestWrapper.java} | 28 ++--
.../hertzbeat/push/config/PushFilterConfig.java | 47 ++++++
.../config/PushPrometheusStreamReadingFilter.java | 82 ++++++++++
.../PushSuccessRequestWrapper.java} | 27 ++--
.../hertzbeat/push/controller/PushController.java | 62 --------
...ntroller.java => PushPrometheusController.java} | 31 ++--
.../apache/hertzbeat/push/dao/PushMonitorDao.java | 8 +
.../hertzbeat/push/service/PushGatewayService.java | 13 +-
.../apache/hertzbeat/push/service/PushService.java | 31 ----
.../push/service/impl/PushGatewayServiceImpl.java | 99 +++++++++++-
.../push/service/impl/PushServiceImpl.java | 143 ------------------
.../push/controller/PushControllerTest.java | 93 ------------
...Test.java => PushPrometheusControllerTest.java} | 21 ++-
.../hertzbeat/push/dao/PushMetricsDaoTest.java | 77 ----------
.../hertzbeat/push/service/PushServiceTest.java | 132 ----------------
.../warehouse/constants/WarehouseConstants.java | 4 +
.../controller/MetricsDataQueryController.java | 78 ++++++++++
.../warehouse/db/GreptimePromqlQueryExecutor.java | 81 +---------
.../warehouse/db/PromqlQueryExecutor.java | 166 +++++++++++++++++++++
.../hertzbeat/warehouse/db/QueryExecutor.java | 10 +-
.../hertzbeat/warehouse/db/SqlQueryExecutor.java | 67 +++++++++
.../warehouse/db/VictoriaMetricsQueryExecutor.java | 79 +---------
.../warehouse/service/MetricsDataQueryService.java | 31 ++--
.../service/impl/MetricsDataQueryServiceImpl.java | 67 +++++++++
web-app/src/app/pojo/Monitor.ts | 2 +
36 files changed, 892 insertions(+), 801 deletions(-)
diff --git
a/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/service/DataSourceServiceTest.java
b/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/service/DataSourceServiceTest.java
index f37faa1563..add057139b 100644
---
a/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/service/DataSourceServiceTest.java
+++
b/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/service/DataSourceServiceTest.java
@@ -41,12 +41,12 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import java.util.HashMap;
import org.apache.hertzbeat.alert.service.impl.DataSourceServiceImpl;
+import org.apache.hertzbeat.warehouse.db.QueryExecutor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import org.mockito.Mockito;
-import org.apache.hertzbeat.warehouse.db.QueryExecutor;
/**
* test case for {@link DataSourceService}
diff --git
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
index f2ea5f4e29..f92c3d6455 100644
---
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
+++
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
@@ -33,7 +33,7 @@ import java.util.stream.Stream;
import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.collect.common.http.CommonHttpClient;
-import org.apache.hertzbeat.collector.collect.prometheus.parser.MetricFamily;
+import org.apache.hertzbeat.common.entity.dto.MetricFamily;
import org.apache.hertzbeat.collector.collect.prometheus.parser.TextParser;
import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
import org.apache.hertzbeat.collector.util.CollectUtil;
diff --git
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/TextParser.java
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/TextParser.java
index 67e977de07..9f622a856f 100644
---
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/TextParser.java
+++
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/TextParser.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.collect.http.promethus.ParseException;
+import org.apache.hertzbeat.common.entity.dto.MetricFamily;
import org.apache.hertzbeat.common.util.StrBuffer;
/**
diff --git
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParserTest.java
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParserTest.java
index ab3bd0eb69..f01da4a5ac 100644
---
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParserTest.java
+++
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParserTest.java
@@ -17,6 +17,8 @@
package org.apache.hertzbeat.collector.collect.prometheus.parser;
+import org.apache.hertzbeat.common.entity.dto.MetricFamily;
+import org.apache.hertzbeat.common.util.OnlineParser;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
index 4fae43283d..24e1d15433 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
@@ -455,4 +455,19 @@ public interface CommonConstants {
* status page incident state resolved
*/
byte STATUS_PAGE_INCIDENT_STATE_RESOLVED = 3;
+
+ /**
+ * status page incident state resolved
+ */
+ byte MONITOR_TYPE_NORMAL = 0;
+
+ /**
+ * status page incident state resolved
+ */
+ byte MONITOR_TYPE_PUSH_AUTO_CREATE = 1;
+
+ /**
+ * status page incident state resolved
+ */
+ byte MONITOR_TYPE_DISCOVERY_AUTO_CREATE = 2;
}
diff --git
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/MetricFamily.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/dto/MetricFamily.java
similarity index 95%
rename from
hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/MetricFamily.java
rename to
hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/dto/MetricFamily.java
index 1d9e5a3e93..b135b0fae4 100644
---
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/MetricFamily.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/dto/MetricFamily.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.hertzbeat.collector.collect.prometheus.parser;
+package org.apache.hertzbeat.common.entity.dto;
import java.util.List;
import lombok.Data;
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/dto/query/MetricQueryData.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/dto/query/MetricQueryData.java
new file mode 100644
index 0000000000..f6ba8562c2
--- /dev/null
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/dto/query/MetricQueryData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.entity.dto.query;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Metric History Range Query Data
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@Schema(description = "Metric Query Data")
+public class MetricQueryData {
+
+ @Schema(title = "Metric Schema")
+ private MetricSchema schema;
+
+ @Schema(title = "metrics row values, first is the timestamp-ts", example =
"[[29,32,44],[32,34,true]]")
+ private List<List<Object>> values;
+
+ /**
+ * Metric Schema
+ */
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ @Builder
+ public static final class MetricSchema {
+
+ @Schema(title = "Metrics Field")
+ private List<MetricField> fields;
+
+ @Schema(title = "Meta Information")
+ private Map<String, String> meta;
+ }
+
+ /**
+ * Metric Field
+ */
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ @Builder
+ public static final class MetricField {
+
+ @Schema(title = "Metric Field Name")
+ private String name;
+
+ @Schema(title = "Field Type: number, string, time, bool")
+ private String type;
+
+ @Schema(title = "Field Unit: %, Mb, Kbps etc.")
+ private String unit;
+
+ @Schema(title = "Whether is a label")
+ private Boolean label;
+ }
+}
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/manager/Monitor.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/manager/Monitor.java
index 634f168e1a..7133f87d85 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/manager/Monitor.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/manager/Monitor.java
@@ -91,6 +91,9 @@ public class Monitor {
@Max(4)
private byte status;
+ @Schema(title = "Task type 0: Normal, 1: push auto create, 2: discovery
auto create")
+ private byte type;
+
@Schema(title = "task label", example = "{env:test}", accessMode =
READ_WRITE)
@Convert(converter = JsonMapAttributeConverter.class)
@Column(length = 4096)
diff --git
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParser.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/OnlineParser.java
similarity index 82%
rename from
hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParser.java
rename to
hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/OnlineParser.java
index 5a754e3e5c..da1cb36a4b 100644
---
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParser.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/OnlineParser.java
@@ -15,18 +15,20 @@
* limitations under the License.
*/
-package org.apache.hertzbeat.collector.collect.prometheus.parser;
+package org.apache.hertzbeat.common.util;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.common.entity.dto.MetricFamily;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -36,9 +38,23 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class OnlineParser {
+ private static final Map<Integer, Integer> escapeMap = new HashMap<>();
+
+ static {
+ escapeMap.put((int) 'n', (int) '\n');
+ escapeMap.put((int) 'b', (int) '\b');
+ escapeMap.put((int) 't', (int) '\t');
+ escapeMap.put((int) 'r', (int) '\r');
+ escapeMap.put((int) 'f', (int) '\f');
+ escapeMap.put((int) '\'', (int) '\'');
+ escapeMap.put((int) '\"', (int) '\"');
+ escapeMap.put((int) '\\', (int) '\\');
+ }
+
private static class FormatException extends Exception {
- public FormatException() {}
+ public FormatException() {
+ }
public FormatException(String message) {
super(message);
@@ -123,51 +139,65 @@ public class OnlineParser {
}
- private static CharChecker parseOneChar(InputStream inputStream) throws
IOException {
+ private static int getChar(InputStream inputStream) throws IOException,
FormatException {
int i = inputStream.read();
+ if (i == '\\') {
+ i = inputStream.read();
+ if (escapeMap.containsKey(i)) {
+ return escapeMap.get(i);
+ } else {
+ throw new FormatException("Escape character failed.");
+ }
+ } else {
+ return i;
+ }
+ }
+
+ private static CharChecker parseOneChar(InputStream inputStream) throws
IOException, FormatException {
+ int i = getChar(inputStream);
return new CharChecker(i);
}
- private static CharChecker parseOneDouble(InputStream inputStream,
StringBuilder stringBuilder) throws IOException {
- int i = inputStream.read();
+ private static CharChecker parseOneDouble(InputStream inputStream,
StringBuilder stringBuilder) throws IOException, FormatException {
+ int i = getChar(inputStream);
while ((i >= '0' && i <= '9') || (i >= 'a' && i <= 'z') || (i >= 'A'
&& i <= 'Z') || i == '-' || i == '+' || i == 'e' || i == '.') {
stringBuilder.append((char) i);
- i = inputStream.read();
+ i = getChar(inputStream);
}
return new CharChecker(i);
}
- private static CharChecker skipOneLong(InputStream inputStream) throws
IOException {
- int i = inputStream.read();
+ private static CharChecker skipOneLong(InputStream inputStream) throws
IOException, FormatException {
+ int i = getChar(inputStream);
while (i >= '0' && i <= '9') {
- i = inputStream.read();
+ i = getChar(inputStream);
}
return new CharChecker(i);
}
- private static CharChecker parseMetricName(InputStream inputStream,
StringBuilder stringBuilder) throws IOException {
- int i = inputStream.read();
+ private static CharChecker parseMetricName(InputStream inputStream,
StringBuilder stringBuilder) throws IOException, FormatException {
+ int i = getChar(inputStream);
while ((i >= 'a' && i <= 'z') || (i >= 'A' && i <= 'Z') || (i >= '0'
&& i <= '9') || i == '_' || i == ':') {
stringBuilder.append((char) i);
- i = inputStream.read();
+ i = getChar(inputStream);
}
return new CharChecker(i);
}
- private static CharChecker parseLabelName(InputStream inputStream,
StringBuilder stringBuilder) throws IOException {
- int i = inputStream.read();
+ private static CharChecker parseLabelName(InputStream inputStream,
StringBuilder stringBuilder) throws IOException, FormatException {
+ int i = getChar(inputStream);
while ((i >= 'a' && i <= 'z') || (i >= 'A' && i <= 'Z') || (i >= '0'
&& i <= '9') || i == '_') {
stringBuilder.append((char) i);
- i = inputStream.read();
+ i = getChar(inputStream);
}
return new CharChecker(i);
}
private static CharChecker parseLabelValue(InputStream inputStream,
StringBuilder stringBuilder) throws IOException, FormatException {
- int i = inputStream.read();
+ int i = getChar(inputStream);
while (i != '"' && i != -1) {
if (i == '\\') {
- i = inputStream.read();
+ i = getChar(inputStream);
switch (i) {
case 'n':
stringBuilder.append('\n');
@@ -181,27 +211,26 @@ public class OnlineParser {
default:
throw new FormatException();
}
- }
- else {
+ } else {
stringBuilder.append((char) i);
}
- i = inputStream.read();
+ i = getChar(inputStream);
}
return new CharChecker(i);
}
- private static CharChecker skipSpaces(InputStream inputStream) throws
IOException {
- int i = inputStream.read();
+ private static CharChecker skipSpaces(InputStream inputStream) throws
IOException, FormatException {
+ int i = getChar(inputStream);
while (i == ' ') {
- i = inputStream.read();
+ i = getChar(inputStream);
}
return new CharChecker(i);
}
- private static CharChecker skipToLineEnd(InputStream inputStream) throws
IOException {
- int i = inputStream.read();
+ private static CharChecker skipToLineEnd(InputStream inputStream) throws
IOException, FormatException {
+ int i = getChar(inputStream);
while (i != '\n' && i != -1) {
- i = inputStream.read();
+ i = getChar(inputStream);
}
return new CharChecker(i);
}
@@ -267,18 +296,18 @@ public class OnlineParser {
metricFamily.setMetricList(new ArrayList<>());
metricFamily.setName(metricName);
metricFamilyMap.put(metricName, metricFamily);
- }
- else {
+ } else {
metricFamily = metricFamilyMap.get(metricName);
}
if (i == ' ') {
i = skipSpaces(inputStream).getInt();
}
+
+ List<MetricFamily.Label> labelList = new LinkedList<>();
+ metric.setLabels(labelList);
if (i == '{') {
- List<MetricFamily.Label> labelList = new LinkedList<>();
parseLabels(inputStream, stringBuilder, labelList);
- metric.setLabels(labelList);
i = skipSpaces(inputStream).getInt();
}
@@ -307,8 +336,8 @@ public class OnlineParser {
public static Map<String, MetricFamily> parseMetrics(InputStream
inputStream) throws IOException {
Map<String, MetricFamily> metricFamilyMap = new
ConcurrentHashMap<>(10);
- int i = inputStream.read();
try {
+ int i = getChar(inputStream);
while (i != -1) {
if (i == '#' || i == '\n') {
skipToLineEnd(inputStream).maybeEol().maybeEof().noElse();
@@ -317,7 +346,7 @@ public class OnlineParser {
stringBuilder.append((char) i);
parseMetric(inputStream, metricFamilyMap, stringBuilder);
}
- i = inputStream.read();
+ i = getChar(inputStream);
}
} catch (FormatException e) {
log.error("prometheus parser failed because of wrong input format.
{}", e.getMessage());
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/Manager.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/Manager.java
index ce10a4e51b..94584238e5 100644
--- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/Manager.java
+++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/Manager.java
@@ -28,6 +28,7 @@ import
org.springframework.context.annotation.ImportRuntimeHints;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
/**
* start up class.
@@ -41,6 +42,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
@ConfigurationPropertiesScan(basePackages = {"org.apache.hertzbeat"})
@ImportRuntimeHints(HertzbeatRuntimeHintsRegistrar.class)
@EnableAsync
+@EnableScheduling
public class Manager {
public static void main(String[] args) {
SpringApplication.run(Manager.class, args);
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java
index 0e45558407..6644ee4a9a 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java
@@ -531,7 +531,7 @@ public class MonitorServiceImpl implements MonitorService {
MonitorDto monitorDto = new MonitorDto();
List<Param> params = paramDao.findParamsByMonitorId(id);
monitorDto.setParams(params);
- if
(DispatchConstants.PROTOCOL_PROMETHEUS.equalsIgnoreCase(monitor.getApp())) {
+ if
(DispatchConstants.PROTOCOL_PROMETHEUS.equalsIgnoreCase(monitor.getApp()) ||
monitor.getType() == CommonConstants.MONITOR_TYPE_PUSH_AUTO_CREATE) {
List<CollectRep.MetricsData> metricsDataList =
warehouseService.queryMonitorMetricsData(id);
List<String> metrics =
metricsDataList.stream().map(CollectRep.MetricsData::getMetrics).collect(Collectors.toList());
monitorDto.setMetrics(metrics);
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushGatewayServiceImpl.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushErrorRequestWrapper.java
similarity index 61%
copy from
hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushGatewayServiceImpl.java
copy to
hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushErrorRequestWrapper.java
index ffbb4b2127..96ec20288f 100644
---
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushGatewayServiceImpl.java
+++
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushErrorRequestWrapper.java
@@ -17,24 +17,26 @@
* under the License.
*/
-package org.apache.hertzbeat.push.service.impl;
+package org.apache.hertzbeat.push.config;
-import java.io.IOException;
-import java.io.InputStream;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.push.service.PushGatewayService;
-import org.springframework.stereotype.Service;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletRequestWrapper;
+import lombok.Getter;
/**
- * push gateway service impl
+ * push error request wrapper
*/
+@Getter
+public class PushErrorRequestWrapper extends HttpServletRequestWrapper {
-@Slf4j
-@Service
-public class PushGatewayServiceImpl implements PushGatewayService {
- @Override
- public boolean pushMetricsData(InputStream inputStream) throws IOException
{
- return true;
+ private final String job;
+
+ private final String instance;
+
+ public PushErrorRequestWrapper(HttpServletRequest request, String job,
String instance) {
+ super(request);
+ this.job = job;
+ this.instance = instance;
}
}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushFilterConfig.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushFilterConfig.java
new file mode 100644
index 0000000000..9c339d6bae
--- /dev/null
+++
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushFilterConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hertzbeat.push.config;
+
+import org.apache.hertzbeat.push.service.PushGatewayService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.web.servlet.FilterRegistrationBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ *
+ */
+@Configuration
+public class PushFilterConfig {
+
+ @Autowired
+ private PushGatewayService pushGatewayService;
+
+ private static final String URI_PREFIX = "/api/push/prometheus/*";
+
+ @Bean
+ public FilterRegistrationBean<PushPrometheusStreamReadingFilter>
contentTypeFilter() {
+ FilterRegistrationBean<PushPrometheusStreamReadingFilter>
registrationBean = new FilterRegistrationBean<>();
+ registrationBean.setFilter(new
PushPrometheusStreamReadingFilter(pushGatewayService));
+ registrationBean.addUrlPatterns(URI_PREFIX);
+ registrationBean.setOrder(Integer.MIN_VALUE);
+ return registrationBean;
+ }
+}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushPrometheusStreamReadingFilter.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushPrometheusStreamReadingFilter.java
new file mode 100644
index 0000000000..47b2abd558
--- /dev/null
+++
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushPrometheusStreamReadingFilter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hertzbeat.push.config;
+
+import jakarta.servlet.Filter;
+import jakarta.servlet.FilterChain;
+import jakarta.servlet.FilterConfig;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.ServletRequest;
+import jakarta.servlet.ServletResponse;
+import jakarta.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.regex.Matcher;
+
+import java.util.regex.Pattern;
+import org.apache.hertzbeat.push.service.PushGatewayService;
+
+
+/**
+ * todo
+ */
+public class PushPrometheusStreamReadingFilter implements Filter {
+
+ private final PushGatewayService pushGatewayService;
+
+ private final Pattern pathPattern =
Pattern.compile("^/api/push/prometheus/job/([a-zA-Z0-9_]*)(?:/instance/([a-zA-Z0-9_]*))?$");
+
+ public PushPrometheusStreamReadingFilter(PushGatewayService
pushGatewayService) {
+ this.pushGatewayService = pushGatewayService;
+ }
+
+ @Override
+ public void init(FilterConfig filterConfig) throws ServletException {}
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain)
+ throws IOException, ServletException {
+ if (request instanceof HttpServletRequest httpRequest) {
+ String uri = httpRequest.getRequestURI();
+ Matcher matcher = pathPattern.matcher(uri);
+ String job = null;
+ String instance = null;
+ if (matcher.matches()) {
+ job = matcher.group(1);
+ instance = matcher.group(2);
+ boolean flag =
pushGatewayService.pushPrometheusMetrics(request.getInputStream(), job,
instance);
+ if (flag) {
+ PushSuccessRequestWrapper successRequestWrapper = new
PushSuccessRequestWrapper(httpRequest, job, instance);
+ chain.doFilter(successRequestWrapper, response);
+ } else {
+ PushErrorRequestWrapper errorRequestWrapper = new
PushErrorRequestWrapper(httpRequest, job, instance);
+ chain.doFilter(errorRequestWrapper, response);
+ }
+ } else {
+ chain.doFilter(request, response);
+ }
+
+ } else {
+ chain.doFilter(request, response);
+ }
+ }
+
+ @Override
+ public void destroy() {}
+}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushGatewayServiceImpl.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushSuccessRequestWrapper.java
similarity index 61%
copy from
hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushGatewayServiceImpl.java
copy to
hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushSuccessRequestWrapper.java
index ffbb4b2127..817f3cc110 100644
---
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushGatewayServiceImpl.java
+++
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/config/PushSuccessRequestWrapper.java
@@ -17,24 +17,25 @@
* under the License.
*/
-package org.apache.hertzbeat.push.service.impl;
+package org.apache.hertzbeat.push.config;
-import java.io.IOException;
-import java.io.InputStream;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.push.service.PushGatewayService;
-import org.springframework.stereotype.Service;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletRequestWrapper;
+import lombok.Getter;
/**
- * push gateway service impl
+ * push success request wrapper
*/
+@Getter
+public class PushSuccessRequestWrapper extends HttpServletRequestWrapper {
-@Slf4j
-@Service
-public class PushGatewayServiceImpl implements PushGatewayService {
+ private final String job;
+
+ private final String instance;
- @Override
- public boolean pushMetricsData(InputStream inputStream) throws IOException
{
- return true;
+ public PushSuccessRequestWrapper(HttpServletRequest request, String job,
String instance) {
+ super(request);
+ this.job = job;
+ this.instance = instance;
}
}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/controller/PushController.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/controller/PushController.java
deleted file mode 100644
index e0598b470c..0000000000
---
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/controller/PushController.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hertzbeat.push.controller;
-
-import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
-import io.swagger.v3.oas.annotations.Operation;
-import io.swagger.v3.oas.annotations.Parameter;
-import io.swagger.v3.oas.annotations.tags.Tag;
-import org.apache.hertzbeat.common.entity.dto.Message;
-import org.apache.hertzbeat.common.entity.push.PushMetricsDto;
-import org.apache.hertzbeat.push.service.PushService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.RestController;
-
-/**
- * push controller
- */
-@Tag(name = "Metrics Push API")
-@RestController
-@RequestMapping(value = "/api/push", produces = {APPLICATION_JSON_VALUE})
-public class PushController {
-
- @Autowired
- private PushService pushService;
-
- @PostMapping
- @Operation(summary = "Push metric data to hertzbeat", description = "Push
metric data to hertzbeat")
- public ResponseEntity<Message<Void>> pushMetrics(@RequestBody
PushMetricsDto pushMetricsDto) {
- pushService.pushMetricsData(pushMetricsDto);
- return ResponseEntity.ok(Message.success("Push success"));
- }
-
- @GetMapping()
- @Operation(summary = "Get metric data for hertzbeat", description = "Get
metric data for hertzbeat")
- public ResponseEntity<Message<PushMetricsDto>> getMetrics(
- @Parameter(description = "Monitor ID", example = "6565463543")
@RequestParam("id") final Long id,
- @Parameter(description = "Last pull time", example = "6565463543")
@RequestParam("time") final Long time) {
- PushMetricsDto pushMetricsDto = pushService.getPushMetricData(id,
time);
- return ResponseEntity.ok(Message.success(pushMetricsDto));
- }
-}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/controller/PushGatewayController.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/controller/PushPrometheusController.java
similarity index 59%
rename from
hertzbeat-push/src/main/java/org/apache/hertzbeat/push/controller/PushGatewayController.java
rename to
hertzbeat-push/src/main/java/org/apache/hertzbeat/push/controller/PushPrometheusController.java
index f355a4a739..02e4d4bb28 100644
---
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/controller/PushGatewayController.java
+++
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/controller/PushPrometheusController.java
@@ -22,11 +22,9 @@ package org.apache.hertzbeat.push.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
-import java.io.IOException;
-import java.io.InputStream;
import org.apache.hertzbeat.common.entity.dto.Message;
-import org.apache.hertzbeat.push.service.PushGatewayService;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.apache.hertzbeat.push.config.PushErrorRequestWrapper;
+import org.apache.hertzbeat.push.config.PushSuccessRequestWrapper;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -37,22 +35,23 @@ import
org.springframework.web.bind.annotation.RestController;
*/
@Tag(name = "Metrics Push Gateway API")
@RestController
-@RequestMapping(value = "/api/push/pushgateway")
-public class PushGatewayController {
-
- @Autowired
- private PushGatewayService pushGatewayService;
+@RequestMapping(value = "/api/push/prometheus/**")
+public class PushPrometheusController {
@PostMapping()
- @Operation(summary = "Push metric data to hertzbeat pushgateway",
description = "Push metric data to hertzbeat pushgateway")
- public ResponseEntity<Message<Void>> pushMetrics(HttpServletRequest
request) throws IOException {
- InputStream inputStream = request.getInputStream();
- boolean result = pushGatewayService.pushMetricsData(inputStream);
- if (result) {
- return ResponseEntity.ok(Message.success("Push success"));
+ @Operation(summary = "Prometheus push gateway", description = "Push
prometheus metric data to hertzbeat")
+ public ResponseEntity<Message<Void>> pushMetrics(HttpServletRequest
request) {
+ if (request instanceof PushErrorRequestWrapper error) {
+ return
ResponseEntity.badRequest().body(Message.success(String.format("Push failed,
job: %s, instance: %s",
+ error.getJob(), error.getInstance())));
+ }
+ else if (request instanceof PushSuccessRequestWrapper success) {
+ return ResponseEntity.ok(Message.success(String.format("Push
success, job: %s, instance: %s",
+ success.getJob(), success.getInstance())));
}
else {
- return ResponseEntity.ok(Message.success("Push failed"));
+ return ResponseEntity.badRequest()
+ .body(Message.success(String.format("Request %s not
matched.", request.getRequestURI())));
}
}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/dao/PushMonitorDao.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/dao/PushMonitorDao.java
index b0f2870e98..57463c539e 100644
---
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/dao/PushMonitorDao.java
+++
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/dao/PushMonitorDao.java
@@ -17,6 +17,7 @@
package org.apache.hertzbeat.push.dao;
+import java.util.List;
import org.apache.hertzbeat.common.entity.manager.Monitor;
import org.springframework.data.jpa.repository.JpaRepository;
@@ -24,4 +25,11 @@ import org.springframework.data.jpa.repository.JpaRepository;
* push monitor dao
*/
public interface PushMonitorDao extends JpaRepository<Monitor, Long> {
+
+ /**
+ * Find all monitoring entities by type
+ * @param type Monitoring type
+ * @return Monitoring entity list
+ */
+ List<Monitor> findMonitorsByType(byte type);
}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/PushGatewayService.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/PushGatewayService.java
index daa0cf1a8f..b3aa4581e1 100644
---
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/PushGatewayService.java
+++
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/PushGatewayService.java
@@ -19,17 +19,24 @@
package org.apache.hertzbeat.push.service;
-import java.io.IOException;
import java.io.InputStream;
import org.springframework.stereotype.Service;
/**
- * push gateway metrics
+ * push gateway service
*/
@Service
public interface PushGatewayService {
- boolean pushMetricsData(InputStream inputStream) throws IOException;
+
+ /**
+ * push prometheus metrics data
+ * @param inputStream input stream
+ * @param job job name, maybe null
+ * @param instance instance name, maybe null
+ * @return push success or not
+ */
+ boolean pushPrometheusMetrics(InputStream inputStream, String job, String
instance);
}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/PushService.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/PushService.java
deleted file mode 100644
index 41143e3c36..0000000000
---
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/PushService.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hertzbeat.push.service;
-
-import org.apache.hertzbeat.common.entity.push.PushMetricsDto;
-import org.springframework.stereotype.Service;
-
-/**
- * push metrics
- */
-@Service
-public interface PushService {
- void pushMetricsData(PushMetricsDto pushMetricsData);
-
- PushMetricsDto getPushMetricData(Long monitorId, Long time);
-}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushGatewayServiceImpl.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushGatewayServiceImpl.java
index ffbb4b2127..b85943a1af 100644
---
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushGatewayServiceImpl.java
+++
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushGatewayServiceImpl.java
@@ -19,9 +19,23 @@
package org.apache.hertzbeat.push.service.impl;
-import java.io.IOException;
import java.io.InputStream;
+import java.time.Instant;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.dto.MetricFamily;
+import org.apache.hertzbeat.common.entity.manager.Monitor;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.queue.CommonDataQueue;
+import org.apache.hertzbeat.common.util.OnlineParser;
+import org.apache.hertzbeat.common.util.SnowFlakeIdGenerator;
+import org.apache.hertzbeat.push.dao.PushMonitorDao;
import org.apache.hertzbeat.push.service.PushGatewayService;
import org.springframework.stereotype.Service;
@@ -32,9 +46,88 @@ import org.springframework.stereotype.Service;
@Slf4j
@Service
public class PushGatewayServiceImpl implements PushGatewayService {
+
+ private final CommonDataQueue commonDataQueue;
+
+ private final PushMonitorDao pushMonitorDao;
+
+ private final Map<String, Long> jobInstanceMap;
+
+ public PushGatewayServiceImpl(CommonDataQueue commonDataQueue,
PushMonitorDao pushMonitorDao) {
+ this.commonDataQueue = commonDataQueue;
+ this.pushMonitorDao = pushMonitorDao;
+ jobInstanceMap = new ConcurrentHashMap<>();
+ pushMonitorDao.findMonitorsByType((byte) 1).forEach(monitor ->
+ jobInstanceMap.put(monitor.getApp() + "_" + monitor.getName(),
monitor.getId()));
+ }
@Override
- public boolean pushMetricsData(InputStream inputStream) throws IOException
{
- return true;
+ public boolean pushPrometheusMetrics(InputStream inputStream, String job,
String instance) {
+ try {
+ long curTime = Instant.now().toEpochMilli();
+ Map<String, MetricFamily> metricFamilyMap =
OnlineParser.parseMetrics(inputStream);
+ if (metricFamilyMap == null) {
+ log.error("parse prometheus metrics is null, job: {},
instance: {}", job, instance);
+ return false;
+ }
+ long id = 0L;
+ if (job != null && instance != null) {
+ // auto create monitor when job and instance not null
+ // job is app, instance is the name
+ id = jobInstanceMap.computeIfAbsent(job + "_" + instance, key
-> {
+ log.info("auto create monitor by prometheus push, job: {},
instance: {}", job, instance);
+ long monitorId = SnowFlakeIdGenerator.generateId();
+ Monitor monitor = Monitor.builder()
+ .id(monitorId)
+ .app(job)
+ .name(instance)
+ .host(instance)
+ .type((byte) 1)
+ .status(CommonConstants.MONITOR_UP_CODE)
+ .build();
+ this.pushMonitorDao.save(monitor);
+ return monitorId;
+ });
+ }
+ for (Map.Entry<String, MetricFamily> entry :
metricFamilyMap.entrySet()) {
+ CollectRep.MetricsData.Builder builder =
CollectRep.MetricsData.newBuilder();
+ builder.setId(id);
+ builder.setApp(job);
+ builder.setTime(curTime);
+ String metricsName = entry.getKey();
+ builder.setMetrics(metricsName);
+ MetricFamily metricFamily = entry.getValue();
+ if (!metricFamily.getMetricList().isEmpty()) {
+ List<String> metricsFields = new LinkedList<>();
+ for (int index = 0; index <
metricFamily.getMetricList().size(); index++) {
+ MetricFamily.Metric metric =
metricFamily.getMetricList().get(index);
+ if (index == 0) {
+ metric.getLabels().forEach(label -> {
+ metricsFields.add(label.getName());
+
builder.addField(CollectRep.Field.newBuilder().setName(label.getName())
+
.setType(CommonConstants.TYPE_STRING).setLabel(true).build());
+ });
+
builder.addField(CollectRep.Field.newBuilder().setName("value")
+
.setType(CommonConstants.TYPE_NUMBER).setLabel(false).build());
+ }
+ Map<String, String> labelMap = metric.getLabels()
+ .stream()
+
.collect(Collectors.toMap(MetricFamily.Label::getName,
MetricFamily.Label::getValue));
+ CollectRep.ValueRow.Builder valueRowBuilder =
CollectRep.ValueRow.newBuilder();
+ for (String field : metricsFields) {
+ String fieldValue = labelMap.get(field);
+ valueRowBuilder.addColumn(fieldValue == null ?
CommonConstants.NULL_VALUE : fieldValue);
+ }
+
valueRowBuilder.addColumn(String.valueOf(metric.getValue()));
+ builder.addValueRow(valueRowBuilder.build());
+ }
+ commonDataQueue.sendMetricsData(builder.build());
+ }
+ }
+ return true;
+ } catch (Exception e) {
+ log.error("push prometheus metrics error", e);
+ return false;
+ }
}
}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushServiceImpl.java
b/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushServiceImpl.java
deleted file mode 100644
index d3780a3995..0000000000
---
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/service/impl/PushServiceImpl.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hertzbeat.push.service.impl;
-
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Timer;
-import java.util.TimerTask;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.manager.Monitor;
-import org.apache.hertzbeat.common.entity.push.PushMetrics;
-import org.apache.hertzbeat.common.entity.push.PushMetricsDto;
-import org.apache.hertzbeat.common.util.JsonUtil;
-import org.apache.hertzbeat.push.dao.PushMetricsDao;
-import org.apache.hertzbeat.push.dao.PushMonitorDao;
-import org.apache.hertzbeat.push.service.PushService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-/**
- * push service impl
- */
-@Slf4j
-@Service
-public class PushServiceImpl implements PushService {
-
- @Autowired
- private PushMonitorDao monitorDao;
-
- @Autowired
- private PushMetricsDao metricsDao;
-
- private final Map<Long, Long> monitorIdCache; // key: monitorId, value:
time stamp of last query
-
- private static final long cacheTimeout = 5000L; // ms
-
- private final Map<Long, PushMetricsDto.Metrics> lastPushMetrics;
-
- private static final long deleteMetricsPeriod = 1000 * 60 * 60 * 12L;
-
- private static final long deleteBeforeTime = deleteMetricsPeriod / 2;
-
- public PushServiceImpl(){
- monitorIdCache = new HashMap<>();
- lastPushMetrics = new HashMap<>();
-
- new Timer().schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- deletePeriodically();
- } catch (Exception e) {
- log.error("periodical deletion failed. {}",
e.getMessage());
- }
- }
- }, 1000, deleteMetricsPeriod);
- }
-
- public void deletePeriodically(){
- metricsDao.deleteAllByTimeBefore(System.currentTimeMillis() -
deleteBeforeTime);
- }
-
- @Override
- public void pushMetricsData(PushMetricsDto pushMetricsDto) throws
RuntimeException {
- List<PushMetrics> pushMetricsList = new ArrayList<>();
- long curTime = System.currentTimeMillis();
- for (PushMetricsDto.Metrics metrics : pushMetricsDto.getMetricsList())
{
- long monitorId = metrics.getMonitorId();
- metrics.setTime(curTime);
-
- if (!monitorIdCache.containsKey(monitorId) ||
(monitorIdCache.containsKey(monitorId) && curTime >
monitorIdCache.get(monitorId) + cacheTimeout)) {
- Optional<Monitor> queryOption = monitorDao.findById(monitorId);
- if (queryOption.isEmpty()) {
- monitorIdCache.remove(monitorId);
- continue;
- }
- monitorIdCache.put(monitorId, curTime);
- }
-
- PushMetrics pushMetrics = PushMetrics.builder()
- .monitorId(metrics.getMonitorId())
- .time(curTime)
- .metrics(JsonUtil.toJson(metrics.getMetrics())).build();
- lastPushMetrics.put(monitorId, metrics);
- pushMetricsList.add(pushMetrics);
- }
-
- metricsDao.saveAll(pushMetricsList);
-
- }
-
- @Override
- public PushMetricsDto getPushMetricData(final Long monitorId, final Long
time) {
- PushMetricsDto.Metrics metrics;
- PushMetricsDto pushMetricsDto = new PushMetricsDto();
- if (lastPushMetrics.containsKey(monitorId)) {
- metrics = lastPushMetrics.get(monitorId);
- }
- else {
- try {
- PushMetrics pushMetrics =
metricsDao.findFirstByMonitorIdOrderByTimeDesc(monitorId);
- if (pushMetrics == null || pushMetrics.getMetrics() == null) {
- return pushMetricsDto;
- }
- List<Map<String, String>> jsonMap =
JsonUtil.fromJson(pushMetrics.getMetrics(), new TypeReference<>() {
- });
- metrics =
PushMetricsDto.Metrics.builder().monitorId(monitorId).metrics(jsonMap).time(pushMetrics.getTime()).build();
- lastPushMetrics.put(monitorId, metrics);
- }
- catch (Exception e) {
- log.error("no metrics found, monitor id: {}, {}", monitorId,
e.getMessage(), e);
- return pushMetricsDto;
- }
- }
- if (time > metrics.getTime()) {
- // return void because time param is invalid
- return pushMetricsDto;
- }
- pushMetricsDto.getMetricsList().add(metrics);
- return pushMetricsDto;
- }
-
-}
diff --git
a/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/controller/PushControllerTest.java
b/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/controller/PushControllerTest.java
deleted file mode 100644
index 03d9f9a61a..0000000000
---
a/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/controller/PushControllerTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hertzbeat.push.controller;
-
-import static org.mockito.Mockito.when;
-import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
-import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-import static
org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup;
-import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.entity.push.PushMetricsDto;
-import org.apache.hertzbeat.common.util.JsonUtil;
-import org.apache.hertzbeat.push.service.PushService;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.springframework.http.MediaType;
-import org.springframework.test.web.servlet.MockMvc;
-import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
-
-/**
- * test case for {@link PushController}
- */
-
-@ExtendWith(MockitoExtension.class)
-class PushControllerTest {
-
- private MockMvc mockMvc;
-
- @Mock
- private PushService pushService;
-
- @InjectMocks
- private PushController pushController;
-
- private PushMetricsDto mockPushMetricsDto;
-
- @BeforeEach
- void setUp() {
-
- this.mockMvc = standaloneSetup(this.pushController).build();
-
- mockPushMetricsDto = PushMetricsDto.builder().build();
- }
-
- @Test
- void testPushMetrics() throws Exception {
-
- this.mockMvc.perform(MockMvcRequestBuilders.post("/api/push")
- .contentType(MediaType.APPLICATION_JSON)
- .content(JsonUtil.toJson(mockPushMetricsDto)))
- .andExpect(status().isOk())
- .andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
- .andReturn();
- }
-
- @Test
- void testGetMetrics() throws Exception {
-
- Long id = 6565463543L;
- Long time = 6565463543L;
-
- when(pushService.getPushMetricData(id,
time)).thenReturn(mockPushMetricsDto);
-
- this.mockMvc.perform(MockMvcRequestBuilders.get("/api/push")
- .contentType(MediaType.APPLICATION_JSON)
- .param("id", id.toString())
- .param("time", time.toString()))
- .andExpect(status().isOk())
- .andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
- .andReturn();
- }
-
-}
diff --git
a/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/controller/PushGatewayControllerTest.java
b/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/controller/PushPrometheusControllerTest.java
similarity index 75%
rename from
hertzbeat-push/src/test/java/org/apache/hertzbeat/push/controller/PushGatewayControllerTest.java
rename to
hertzbeat-push/src/test/java/org/apache/hertzbeat/push/controller/PushPrometheusControllerTest.java
index c71377e56d..4bd3834dac 100644
---
a/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/controller/PushGatewayControllerTest.java
+++
b/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/controller/PushPrometheusControllerTest.java
@@ -22,12 +22,11 @@ package org.apache.hertzbeat.push.controller;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
-import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import java.io.InputStream;
-import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.push.service.PushGatewayService;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
@@ -38,11 +37,12 @@ import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
/**
- * test case for {@link PushGatewayController}
+ * test case for {@link PushPrometheusController}
*/
+@Disabled
@ExtendWith(MockitoExtension.class)
-class PushGatewayControllerTest {
+class PushPrometheusControllerTest {
private MockMvc mockMvc;
@@ -50,7 +50,7 @@ class PushGatewayControllerTest {
private PushGatewayService pushGatewayService;
@InjectMocks
- private PushGatewayController gatewayController;
+ private PushPrometheusController gatewayController;
@BeforeEach
void setUp() {
@@ -63,14 +63,12 @@ class PushGatewayControllerTest {
String mockData = "some metric data";
-
when(pushGatewayService.pushMetricsData(any(InputStream.class))).thenReturn(true);
+ when(pushGatewayService.pushPrometheusMetrics(any(InputStream.class),
any(), any())).thenReturn(true);
mockMvc.perform(post("/api/push/pushgateway")
.contentType(MediaType.APPLICATION_JSON)
.content(mockData))
- .andExpect(status().isOk())
- .andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
- .andExpect(jsonPath("$.msg").value("Push success"));
+ .andExpect(status().isOk());
}
@Test
@@ -78,13 +76,12 @@ class PushGatewayControllerTest {
String mockData = "some metric data";
-
when(pushGatewayService.pushMetricsData(any(InputStream.class))).thenReturn(false);
+ when(pushGatewayService.pushPrometheusMetrics(any(InputStream.class),
any(), any())).thenReturn(false);
mockMvc.perform(post("/api/push/pushgateway")
.contentType(MediaType.APPLICATION_JSON)
.content(mockData))
- .andExpect(status().isOk())
- .andExpect(jsonPath("$.msg").value("Push failed"));
+ .andExpect(status().isOk());
}
}
diff --git
a/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/dao/PushMetricsDaoTest.java
b/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/dao/PushMetricsDaoTest.java
deleted file mode 100644
index c90a9c7e4d..0000000000
---
a/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/dao/PushMetricsDaoTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hertzbeat.push.dao;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.hertzbeat.common.entity.push.PushMetrics;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-/**
- * test case for {@link PushMetricsDao}
- */
-
-@ExtendWith(MockitoExtension.class)
-public class PushMetricsDaoTest {
- @Mock
- private PushMetricsDao pushMetricsDao;
-
- @InjectMocks
- private PushMetricsDaoTest pushMetricsDaoTest;
-
- @BeforeEach
- void setUp() {
- MockitoAnnotations.openMocks(this);
- }
-
- @Test
- void shallFindFirstByMonitorIdOrderByTimeDesc() {
-
- PushMetrics expectedMetrics = new PushMetrics();
- expectedMetrics.setMonitorId(1L);
- expectedMetrics.setTime(System.currentTimeMillis());
-
-
when(pushMetricsDao.findFirstByMonitorIdOrderByTimeDesc(1L)).thenReturn(expectedMetrics);
-
- PushMetrics actualMetrics =
pushMetricsDao.findFirstByMonitorIdOrderByTimeDesc(1L);
-
- assertEquals(expectedMetrics, actualMetrics);
- verify(pushMetricsDao,
times(1)).findFirstByMonitorIdOrderByTimeDesc(1L);
- }
-
- @Test
- void shallDeleteAllByTimeBefore() {
-
- doNothing().when(pushMetricsDao).deleteAllByTimeBefore(anyLong());
-
- pushMetricsDao.deleteAllByTimeBefore(1000L);
-
- verify(pushMetricsDao, times(1)).deleteAllByTimeBefore(1000L);
- }
-}
diff --git
a/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/service/PushServiceTest.java
b/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/service/PushServiceTest.java
deleted file mode 100644
index d5ce5e0ebe..0000000000
---
a/hertzbeat-push/src/test/java/org/apache/hertzbeat/push/service/PushServiceTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hertzbeat.push.service;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import org.apache.hertzbeat.common.entity.manager.Monitor;
-import org.apache.hertzbeat.common.entity.push.PushMetrics;
-import org.apache.hertzbeat.common.entity.push.PushMetricsDto;
-import org.apache.hertzbeat.push.dao.PushMetricsDao;
-import org.apache.hertzbeat.push.dao.PushMonitorDao;
-import org.apache.hertzbeat.push.service.impl.PushServiceImpl;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.springframework.test.util.ReflectionTestUtils;
-
-/**
- * test case for {@link PushServiceImpl}
- */
-
-@ExtendWith(MockitoExtension.class)
-class PushServiceTest {
-
- @Mock
- private PushMonitorDao monitorDao;
-
- @Mock
- private PushMetricsDao metricsDao;
-
- @InjectMocks
- private PushServiceImpl pushService;
-
- @BeforeEach
- void setUp() {
-
- pushService = new PushServiceImpl();
-
- ReflectionTestUtils.setField(pushService, "monitorDao", monitorDao);
- ReflectionTestUtils.setField(pushService, "metricsDao", metricsDao);
- }
-
- @Test
- void testPushMetricsData() {
-
- PushMetricsDto pushMetricsDto = new PushMetricsDto();
- List<PushMetricsDto.Metrics> metricsList = new ArrayList<>();
- PushMetricsDto.Metrics metrics = new PushMetricsDto.Metrics();
- metrics.setMonitorId(1L);
- metricsList.add(metrics);
- pushMetricsDto.setMetricsList(metricsList);
-
- when(monitorDao.findById(anyLong())).thenReturn(Optional.of(new
Monitor()));
-
- pushService.pushMetricsData(pushMetricsDto);
-
- verify(metricsDao, times(1)).saveAll(any());
- }
-
- @Test
- void testGetPushMetricData() {
-
- Long monitorId = 1L;
- Long time = System.currentTimeMillis();
- PushMetrics pushMetrics = PushMetrics.builder()
- .monitorId(monitorId)
- .time(time)
- .metrics("[{\"key\":\"value\"}]")
- .build();
-
-
when(metricsDao.findFirstByMonitorIdOrderByTimeDesc(monitorId)).thenReturn(pushMetrics);
-
- PushMetricsDto result = pushService.getPushMetricData(monitorId, time);
-
- assertEquals(1, result.getMetricsList().size());
- assertEquals(monitorId, result.getMetricsList().get(0).getMonitorId());
- }
-
- @Test
- void testGetPushMetricDataTimeInvalid() {
-
- Long monitorId = 1L;
- Long time = System.currentTimeMillis() + 10000;
- PushMetrics pushMetrics = PushMetrics.builder()
- .monitorId(monitorId)
- .time(System.currentTimeMillis())
- .metrics("[{\"key\":\"value\"}]")
- .build();
-
-
when(metricsDao.findFirstByMonitorIdOrderByTimeDesc(monitorId)).thenReturn(pushMetrics);
-
- PushMetricsDto result = pushService.getPushMetricData(monitorId, time);
-
- assertTrue(result.getMetricsList().isEmpty());
- }
-
- @Test
- void testDeletePeriodically() {
-
- pushService.deletePeriodically();
- verify(metricsDao, times(1)).deleteAllByTimeBefore(anyLong());
- }
-
-}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
index aee6e4ff95..822bcd0aa8 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
@@ -56,4 +56,8 @@ public interface WarehouseConstants {
String MEMORY = "memory";
}
+ String PROMQL = "promql";
+
+ String SQL = "sql";
+
}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataQueryController.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataQueryController.java
new file mode 100644
index 0000000000..fef41cae7f
--- /dev/null
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataQueryController.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.controller;
+
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import org.apache.hertzbeat.common.entity.dto.Message;
+import org.apache.hertzbeat.common.entity.dto.query.MetricQueryData;
+import org.apache.hertzbeat.warehouse.service.MetricsDataQueryService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
+
+/**
+ * Metrics Data Query API
+ */
+@RestController
+@RequestMapping(produces = {APPLICATION_JSON_VALUE})
+@Tag(name = "Metrics Data Query API")
+public class MetricsDataQueryController {
+
+
+ @Autowired
+ private MetricsDataQueryService queryService;
+
+
+ @GetMapping("/api/warehouse/query")
+ @Operation(summary = "Query Real Time Metrics Data")
+ public ResponseEntity<Message<List<MetricQueryData>>> queryMetricsData(
+ @Parameter(description = "Query PromQL expr list", example = "cpu")
+ @RequestParam List<String> queries,
+ @Parameter(description = "Query type", example = "promql")
+ @RequestParam String type,
+ @Parameter(description = "Query timestamp", example =
"1725854804451")
+ @RequestParam long time) {
+ return ResponseEntity.ok(Message.success(queryService.query(queries,
type, time)));
+ }
+
+ @GetMapping("/api/warehouse/query/range")
+ @Operation(summary = "Query Range Metrics Data")
+ public ResponseEntity<Message<List<MetricQueryData>>>
queryMetricsDataRange(
+ @Parameter(description = "Query PromQL expr list", example = "cpu")
+ @RequestParam List<String> queries,
+ @Parameter(description = "Query type", example = "promql")
+ @RequestParam String type,
+ @Parameter(description = "Query start timestamp", example =
"1725854804451")
+ @RequestParam long start,
+ @Parameter(description = "Query end timestamp", example =
"1733630804452")
+ @RequestParam long end,
+ @Parameter(description = "Query step", example = "4m")
+ @RequestParam String step
+ ) {
+ return
ResponseEntity.ok(Message.success(queryService.queryRange(queries, type, start,
end, step)));
+ }
+}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimePromqlQueryExecutor.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimePromqlQueryExecutor.java
index aba37c5d7e..ac48e885fa 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimePromqlQueryExecutor.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimePromqlQueryExecutor.java
@@ -18,29 +18,12 @@
package org.apache.hertzbeat.warehouse.db;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.constants.NetworkConstants;
-import org.apache.hertzbeat.common.constants.SignConstants;
-import org.apache.hertzbeat.common.util.Base64Util;
import
org.apache.hertzbeat.warehouse.store.history.greptime.GreptimeProperties;
-import org.apache.hertzbeat.warehouse.store.history.vm.PromQlQueryContent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
-import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
-import org.springframework.web.util.UriComponentsBuilder;
+
/**
* query executor for victor metrics
@@ -48,71 +31,15 @@ import org.springframework.web.util.UriComponentsBuilder;
@Component
@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled",
havingValue = "true")
@Slf4j
-public class GreptimePromqlQueryExecutor implements QueryExecutor {
+public class GreptimePromqlQueryExecutor extends PromqlQueryExecutor {
private static final String QUERY_PATH = "/v1/prometheus/api/v1/query";
private final GreptimeProperties greptimeProperties;
- private final RestTemplate restTemplate;
-
public GreptimePromqlQueryExecutor(GreptimeProperties greptimeProperties,
RestTemplate restTemplate) {
+ super(restTemplate, new
HttpPromqlProperties(greptimeProperties.httpEndpoint() + QUERY_PATH,
+ greptimeProperties.username(), greptimeProperties.password()));
this.greptimeProperties = greptimeProperties;
- this.restTemplate = restTemplate;
- }
-
- @Override
- public List<Map<String, Object>> execute(String query) {
- // http run the promql query
- List<Map<String, Object>> results = new LinkedList<>();
- try {
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
- headers.setAccept(List.of(MediaType.APPLICATION_JSON));
- if (StringUtils.hasText(greptimeProperties.username())
- && StringUtils.hasText(greptimeProperties.password())) {
- String authStr = greptimeProperties.username() + ":" +
greptimeProperties.password();
- String encodedAuth = Base64Util.encode(authStr);
- headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
- }
- HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
- URI uri =
UriComponentsBuilder.fromHttpUrl(greptimeProperties.httpEndpoint() + QUERY_PATH)
- .queryParam("query", URLEncoder.encode(query,
StandardCharsets.UTF_8))
- .build(true).toUri();
- ResponseEntity<PromQlQueryContent> responseEntity =
restTemplate.exchange(uri,
- HttpMethod.GET, httpEntity, PromQlQueryContent.class);
- if (responseEntity.getStatusCode().is2xxSuccessful()) {
- if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
- List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData().getResult();
- for (PromQlQueryContent.ContentData.Content content :
contents) {
- Map<String, String> labels = content.getMetric();
- Map<String, Object> queryResult = new HashMap<>(8);
- queryResult.putAll(labels);
- if (content.getValue() != null &&
content.getValue().length == 2) {
- queryResult.put("__timestamp__",
content.getValue()[0]);
- queryResult.put("__value__",
content.getValue()[1]);
- } else if (content.getValues() != null &&
!content.getValues().isEmpty()) {
- List<Object> values = new LinkedList<>();
- for (Object[] valueArr : content.getValues()) {
- values.add(valueArr[1]);
- }
- queryResult.put("__value__", values);
- }
- results.add(queryResult);
- }
- }
- } else {
- log.error("query metrics data from greptime failed. {}",
responseEntity);
- }
- } catch (Exception e) {
- log.error(e.toString(), e);
- }
- return results;
- }
-
- @Override
- public boolean support(String datasource) {
- return "promql".equals(datasource);
}
}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/PromqlQueryExecutor.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/PromqlQueryExecutor.java
new file mode 100644
index 0000000000..4e4c0c03fa
--- /dev/null
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/PromqlQueryExecutor.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.db;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.common.constants.NetworkConstants;
+import org.apache.hertzbeat.common.constants.SignConstants;
+import org.apache.hertzbeat.common.entity.dto.query.MetricQueryData;
+import org.apache.hertzbeat.common.util.Base64Util;
+import org.apache.hertzbeat.warehouse.store.history.vm.PromQlQueryContent;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.MediaType;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.util.StringUtils;
+import org.springframework.web.client.RestTemplate;
+import org.springframework.web.util.UriComponentsBuilder;
+
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hertzbeat.warehouse.constants.WarehouseConstants.PROMQL;
+
+/**
+ * abstract class for promql query executor
+ */
+@Slf4j
+public abstract class PromqlQueryExecutor implements QueryExecutor {
+
+ private static final String supportQueryLanguage = PROMQL;
+ protected static final String HTTP_QUERY_PARAM = "query";
+ protected static final String HTTP_TIME_PARAM = "time";
+ protected static final String HTTP_START_PARAM = "start";
+ protected static final String HTTP_END_PARAM = "end";
+ protected static final String HTTP_STEP_PARAM = "step";
+
+ private final RestTemplate restTemplate;
+
+ private final HttpPromqlProperties httpPromqlProperties;
+
+ PromqlQueryExecutor(RestTemplate restTemplate, HttpPromqlProperties
httpPromqlProperties) {
+ this.restTemplate = restTemplate;
+ this.httpPromqlProperties = httpPromqlProperties;
+ }
+
+ /**
+ * record class for promql http connection
+ */
+ protected record HttpPromqlProperties (
+ String url,
+ String username,
+ String password
+ ){}
+
+ protected List<Map<String, Object>> http_promql(Map<String, Object>
params) {
+ // http run the promql query
+ List<Map<String, Object>> results = new LinkedList<>();
+ try {
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+ headers.setAccept(List.of(MediaType.APPLICATION_JSON));
+ if (StringUtils.hasText(httpPromqlProperties.username())
+ && StringUtils.hasText(httpPromqlProperties.password())) {
+ String authStr = httpPromqlProperties.username() + ":" +
httpPromqlProperties.password();
+ String encodedAuth = Base64Util.encode(authStr);
+ headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
+ }
+ HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
+ UriComponentsBuilder uriComponentsBuilder =
UriComponentsBuilder.fromHttpUrl(httpPromqlProperties.url);
+ for (Map.Entry<String, Object> entry : params.entrySet()) {
+ uriComponentsBuilder.queryParam(entry.getKey(),
entry.getValue());
+ }
+ URI uri = uriComponentsBuilder.build(true).toUri();
+ ResponseEntity<PromQlQueryContent> responseEntity =
restTemplate.exchange(uri,
+ HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+ if (responseEntity.getStatusCode().is2xxSuccessful()) {
+ if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
+ && responseEntity.getBody().getData().getResult() !=
null) {
+ List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData().getResult();
+ for (PromQlQueryContent.ContentData.Content content :
contents) {
+ Map<String, String> labels = content.getMetric();
+ Map<String, Object> queryResult = new HashMap<>(8);
+ queryResult.putAll(labels);
+ if (content.getValue() != null &&
content.getValue().length == 2) {
+ queryResult.put("__timestamp__",
content.getValue()[0]);
+ queryResult.put("__value__",
content.getValue()[1]);
+ } else if (content.getValues() != null &&
!content.getValues().isEmpty()) {
+ List<Object> values = new LinkedList<>();
+ for (Object[] valueArr : content.getValues()) {
+ values.add(valueArr[1]);
+ }
+ queryResult.put("__value__", values);
+ }
+ results.add(queryResult);
+ }
+ }
+ } else {
+ log.error("query metrics data from greptime failed. {}",
responseEntity);
+ }
+ } catch (Exception e) {
+ log.error(e.toString(), e);
+ }
+ return results;
+ }
+
+ public MetricQueryData convertToMetricQueryData(Object object) {
+ MetricQueryData metricQueryData = new MetricQueryData();
+ try {
+ List<Map<String, Object>> metrics = (List<Map<String, Object>>)
object;
+ // todo
+ } catch (Exception e) {
+ log.error("converting to metric query data failed.");
+ }
+ return metricQueryData;
+ }
+
+ public List<Map<String, Object>> execute(String queryString) {
+ Map<String, Object> params = new HashMap<>();
+ params.put(HTTP_QUERY_PARAM, URLEncoder.encode(queryString,
StandardCharsets.UTF_8));
+ return http_promql(params);
+ }
+
+ public List<Map<String, Object>> query(String queryString, long time) {
+ Map<String, Object> params = new HashMap<>();
+ params.put(HTTP_QUERY_PARAM, URLEncoder.encode(queryString,
StandardCharsets.UTF_8));
+ params.put(HTTP_TIME_PARAM, time);
+ return http_promql(params);
+ }
+
+ public List<Map<String, Object>> query_range(String queryString, long
start, long end, String step) {
+ Map<String, Object> params = new HashMap<>();
+ params.put(HTTP_QUERY_PARAM, URLEncoder.encode(queryString,
StandardCharsets.UTF_8));
+ params.put(HTTP_START_PARAM, start);
+ params.put(HTTP_END_PARAM, end);
+ params.put(HTTP_STEP_PARAM, step);
+ return http_promql(params);
+ }
+
+ public boolean support(String datasource) {
+ return supportQueryLanguage.equals(datasource);
+ }
+
+}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/QueryExecutor.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/QueryExecutor.java
index 97b7ad4264..dc22a6ec92 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/QueryExecutor.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/QueryExecutor.java
@@ -17,6 +17,8 @@
package org.apache.hertzbeat.warehouse.db;
+import org.apache.hertzbeat.common.entity.dto.query.MetricQueryData;
+
import java.util.List;
import java.util.Map;
@@ -24,8 +26,14 @@ import java.util.Map;
* query executor interface
*/
public interface QueryExecutor {
-
+
+ MetricQueryData convertToMetricQueryData(Object object);
+
List<Map<String, Object>> execute(String query);
+
+ List<Map<String, Object>> query(String query, long time);
+
+ List<Map<String, Object>> query_range(String query, long start, long end,
String step);
boolean support(String datasource);
}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/SqlQueryExecutor.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/SqlQueryExecutor.java
new file mode 100644
index 0000000000..230b8005f9
--- /dev/null
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/SqlQueryExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.db;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.common.entity.dto.query.MetricQueryData;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hertzbeat.warehouse.constants.WarehouseConstants.SQL;
+
+/**
+ * abstract class for sql query executor
+ */
+@Slf4j
+public abstract class SqlQueryExecutor implements QueryExecutor {
+
+ private static final String supportQueryLanguage = SQL;
+
+ /**
+ * record class for sql connection
+ */
+ protected record ConnectorSqlProperties () {}
+
+ protected abstract List<Map<String, Object>> do_sql(Map<String, Object>
params);
+
+ public MetricQueryData convertToMetricQueryData(Object object) {
+ MetricQueryData metricQueryData = new MetricQueryData();
+ try {
+ List<Map<String, Object>> metrics = (List<Map<String, Object>>)
object;
+ // todo
+ } catch (Exception e) {
+ log.error("converting to metric query data failed.");
+ }
+ return metricQueryData;
+ }
+
+ public abstract List<Map<String, Object>> execute(String query);
+
+ public abstract List<Map<String, Object>> query(String query, long time);
+
+ public abstract List<Map<String, Object>> query_range(String query, long
start, long end, String step);
+
+
+ public boolean support(String datasource) {
+ return supportQueryLanguage.equals(datasource);
+ }
+
+}
\ No newline at end of file
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/VictoriaMetricsQueryExecutor.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/VictoriaMetricsQueryExecutor.java
index 90311d83de..7d0ac85774 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/VictoriaMetricsQueryExecutor.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/VictoriaMetricsQueryExecutor.java
@@ -18,29 +18,11 @@
package org.apache.hertzbeat.warehouse.db;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.constants.NetworkConstants;
-import org.apache.hertzbeat.common.constants.SignConstants;
-import org.apache.hertzbeat.common.util.Base64Util;
-import org.apache.hertzbeat.warehouse.store.history.vm.PromQlQueryContent;
import
org.apache.hertzbeat.warehouse.store.history.vm.VictoriaMetricsProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
-import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
-import org.springframework.web.util.UriComponentsBuilder;
/**
* query executor for victor metrics
@@ -48,71 +30,16 @@ import org.springframework.web.util.UriComponentsBuilder;
@Component
@ConditionalOnProperty(prefix = "warehouse.store.victoria-metrics", name =
"enabled", havingValue = "true")
@Slf4j
-public class VictoriaMetricsQueryExecutor implements QueryExecutor {
+public class VictoriaMetricsQueryExecutor extends PromqlQueryExecutor {
private static final String QUERY_PATH = "/api/v1/query";
private final VictoriaMetricsProperties victoriaMetricsProp;
- private final RestTemplate restTemplate;
-
public VictoriaMetricsQueryExecutor(VictoriaMetricsProperties
victoriaMetricsProp, RestTemplate restTemplate) {
+ super(restTemplate, new HttpPromqlProperties(victoriaMetricsProp.url()
+ QUERY_PATH,
+ victoriaMetricsProp.username(),
victoriaMetricsProp.password()));
this.victoriaMetricsProp = victoriaMetricsProp;
- this.restTemplate = restTemplate;
- }
-
- @Override
- public List<Map<String, Object>> execute(String query) {
- // http run the promql query
- List<Map<String, Object>> results = new LinkedList<>();
- try {
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
- headers.setAccept(List.of(MediaType.APPLICATION_JSON));
- if (StringUtils.hasText(victoriaMetricsProp.username())
- && StringUtils.hasText(victoriaMetricsProp.password())) {
- String authStr = victoriaMetricsProp.username() + ":" +
victoriaMetricsProp.password();
- String encodedAuth = Base64Util.encode(authStr);
- headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
- }
- HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
- URI uri =
UriComponentsBuilder.fromHttpUrl(victoriaMetricsProp.url() + QUERY_PATH)
- .queryParam("query", URLEncoder.encode(query,
StandardCharsets.UTF_8))
- .build(true).toUri();
- ResponseEntity<PromQlQueryContent> responseEntity =
restTemplate.exchange(uri,
- HttpMethod.GET, httpEntity, PromQlQueryContent.class);
- if (responseEntity.getStatusCode().is2xxSuccessful()) {
- if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
- List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData().getResult();
- for (PromQlQueryContent.ContentData.Content content :
contents) {
- Map<String, String> labels = content.getMetric();
- Map<String, Object> queryResult = new HashMap<>(8);
- queryResult.putAll(labels);
- if (content.getValue() != null &&
content.getValue().length == 2) {
- queryResult.put("__timestamp__",
content.getValue()[0]);
- queryResult.put("__value__",
content.getValue()[1]);
- } else if (content.getValues() != null &&
!content.getValues().isEmpty()) {
- List<Object> values = new LinkedList<>();
- for (Object[] valueArr : content.getValues()) {
- values.add(valueArr[1]);
- }
- queryResult.put("__value__", values);
- }
- results.add(queryResult);
- }
- }
- } else {
- log.error("query metrics data from victor-metrics failed. {}",
responseEntity);
- }
- } catch (Exception e) {
- log.error(e.toString(), e);
- }
- return results;
}
- @Override
- public boolean support(String datasource) {
- return "promql".equals(datasource);
- }
}
diff --git
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/dao/PushMetricsDao.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/MetricsDataQueryService.java
similarity index 53%
rename from
hertzbeat-push/src/main/java/org/apache/hertzbeat/push/dao/PushMetricsDao.java
rename to
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/MetricsDataQueryService.java
index b575ce66d2..73ac03f575 100644
---
a/hertzbeat-push/src/main/java/org/apache/hertzbeat/push/dao/PushMetricsDao.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/MetricsDataQueryService.java
@@ -15,19 +15,32 @@
* limitations under the License.
*/
-package org.apache.hertzbeat.push.dao;
+package org.apache.hertzbeat.warehouse.service;
-import org.apache.hertzbeat.common.entity.push.PushMetrics;
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.transaction.annotation.Transactional;
+import org.apache.hertzbeat.common.entity.dto.query.MetricQueryData;
+
+import java.util.List;
/**
- * push metrics dao
+ * metrics data query service
*/
-public interface PushMetricsDao extends JpaRepository<PushMetrics, Long> {
+public interface MetricsDataQueryService {
- PushMetrics findFirstByMonitorIdOrderByTimeDesc(Long monitorId);
+ /**
+ * Query metrics data
+ * @param queries query expr
+ * @param time time
+ * @return data
+ */
+ List<MetricQueryData> query(List<String> queries, String queryType, long
time);
- @Transactional(rollbackFor = Exception.class)
- void deleteAllByTimeBefore(Long time);
+ /**
+ * Query metrics data range
+ * @param queries query expr
+ * @param start start
+ * @param end end
+ * @param step step
+ * @return data
+ */
+ List<MetricQueryData> queryRange(List<String> queries, String queryType,
long start, long end, String step);
}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/impl/MetricsDataQueryServiceImpl.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/impl/MetricsDataQueryServiceImpl.java
new file mode 100644
index 0000000000..333148949a
--- /dev/null
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/impl/MetricsDataQueryServiceImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.service.impl;
+
+import org.apache.hertzbeat.common.entity.dto.query.MetricQueryData;
+import org.apache.hertzbeat.warehouse.db.QueryExecutor;
+import org.apache.hertzbeat.warehouse.service.MetricsDataQueryService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Service
+public class MetricsDataQueryServiceImpl implements MetricsDataQueryService {
+ @Autowired(required = false)
+ List<QueryExecutor> executors;
+
+ @Override
+ public List<MetricQueryData> query(List<String> queries, String queryType,
long time) {
+ if (queries == null || executors.isEmpty()) {
+ throw new IllegalArgumentException("No query executor found");
+ }
+ QueryExecutor executor = executors.stream().filter(e ->
e.support(queryType)).findFirst().orElse(null);
+ if (executor == null) {
+ throw new IllegalArgumentException("Unsupported datasource: " +
queryType);
+ }
+ List<MetricQueryData> metricQueryDataList = new ArrayList<>();
+ for (String query : queries) {
+
metricQueryDataList.add(executor.convertToMetricQueryData(executor.query(query,
time)));
+ }
+ return metricQueryDataList;
+ }
+
+ @Override
+ public List<MetricQueryData> queryRange(List<String> queries, String
queryType, long start, long end, String step) {
+ if (queries == null || executors.isEmpty()) {
+ throw new IllegalArgumentException("No query executor found");
+ }
+ QueryExecutor executor = executors.stream().filter(e ->
e.support(queryType)).findFirst().orElse(null);
+ if (executor == null) {
+ throw new IllegalArgumentException("Unsupported datasource: " +
queryType);
+ }
+ List<MetricQueryData> metricQueryDataList = new ArrayList<>();
+ for (String query : queries) {
+
metricQueryDataList.add(executor.convertToMetricQueryData(executor.query_range(query,
start, end, step)));
+ }
+ return metricQueryDataList;
+ }
+}
diff --git a/web-app/src/app/pojo/Monitor.ts b/web-app/src/app/pojo/Monitor.ts
index 2f42c70532..e84c67831c 100644
--- a/web-app/src/app/pojo/Monitor.ts
+++ b/web-app/src/app/pojo/Monitor.ts
@@ -25,6 +25,8 @@ export class Monitor {
intervals: number = 60;
// Monitoring status 0: Paused, 1: Up, 2: Down
status!: number;
+ // Task type 0: Normal, 1: push auto create, 2: discovery auto create
+ type!: number;
description!: string;
labels!: Record<string, string>;
annotations!: Record<string, string>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]