This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 9765e06ca [refactoring] Inject a single instance of the data store
(#1944)
9765e06ca is described below
commit 9765e06caca5fa7463bae86fa262f83b6a915a51
Author: xuziyang <[email protected]>
AuthorDate: Thu May 9 22:22:24 2024 +0800
[refactoring] Inject a single instance of the data store (#1944)
Co-authored-by: tomsun28 <[email protected]>
---
.../controller/MetricsDataController.java | 59 +++++++---------------
.../WareHouseApplicationReadyListener.java | 48 ++++++++++++++++++
.../warehouse/service/WarehouseServiceImpl.java | 21 ++------
.../warehouse/store/DataStorageDispatch.java | 39 ++++----------
.../controller/MetricsDataControllerTest.java | 10 ++--
5 files changed, 85 insertions(+), 92 deletions(-)
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
index a5b19159c..dd787d6de 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.dto.Field;
@@ -36,9 +37,7 @@ import org.apache.hertzbeat.common.entity.dto.Value;
import org.apache.hertzbeat.common.entity.dto.ValueRow;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.warehouse.store.history.HistoryDataReader;
-import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage;
import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataReader;
-import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@@ -56,28 +55,25 @@ public class MetricsDataController {
private static final Integer METRIC_FULL_LENGTH = 3;
- private final List<RealTimeDataReader> realTimeDataReaders;
+ private final RealTimeDataReader realTimeDataReader;
+ private final Optional<HistoryDataReader> historyDataReader;
- private final List<HistoryDataReader> historyDataReaders;
-
- public MetricsDataController(List<RealTimeDataReader> realTimeDataReaders,
- List<HistoryDataReader> historyDataReaders) {
- this.realTimeDataReaders = realTimeDataReaders;
- this.historyDataReaders = historyDataReaders;
+ public MetricsDataController(RealTimeDataReader realTimeDataReader,
+ Optional<HistoryDataReader>
historyDataReader) {
+ this.realTimeDataReader = realTimeDataReader;
+ this.historyDataReader = historyDataReader;
}
@GetMapping("/api/warehouse/storage/status")
@Operation(summary = "Query Warehouse Storage Server Status", description
= "Query the availability status of the storage service under the warehouse")
public ResponseEntity<Message<Void>> getWarehouseStorageServerStatus() {
- boolean available = false;
- if (historyDataReaders != null) {
- available =
historyDataReaders.stream().anyMatch(HistoryDataReader::isServerAvailable);
- }
- if (available) {
+
+ if (historyDataReader.isPresent() &&
historyDataReader.get().isServerAvailable()) {
return ResponseEntity.ok(Message.success());
- } else {
- return ResponseEntity.ok(Message.fail(FAIL_CODE, "Service not
available!"));
}
+
+ // historyDataReader does not exist or is not available
+ return ResponseEntity.ok(Message.fail(FAIL_CODE, "Service not
available!"));
}
@GetMapping("/api/monitor/{monitorId}/metrics/{metrics}")
@@ -87,18 +83,8 @@ public class MetricsDataController {
@PathVariable Long monitorId,
@Parameter(description = "Metrics Name", example = "cpu")
@PathVariable String metrics) {
- RealTimeDataReader realTimeDataReader = realTimeDataReaders.stream()
- .filter(RealTimeDataReader::isServerAvailable)
- .max((o1, o2) -> {
- if (o1 instanceof MemoryDataStorage) {
- return -1;
- } else if (o2 instanceof MemoryDataStorage) {
- return 1;
- } else {
- return 0;
- }
- }).orElse(null);
- if (realTimeDataReader == null) {
+ boolean available = realTimeDataReader.isServerAvailable();
+ if (!available) {
return ResponseEntity.ok(Message.fail(FAIL_CODE, "real time store
not available"));
}
CollectRep.MetricsData storageData =
realTimeDataReader.getCurrentMetricsData(monitorId, metrics);
@@ -154,17 +140,8 @@ public class MetricsDataController {
@Parameter(description = "aggregate data calc. off by default;
4-hour window, query limit >1 week", example = "false")
@RequestParam(required = false) Boolean interval
) {
- HistoryDataReader historyDataReader = historyDataReaders.stream()
- .filter(HistoryDataReader::isServerAvailable).max((o1, o2) -> {
- if (o1 instanceof JpaDatabaseDataStorage) {
- return -1;
- } else if (o2 instanceof JpaDatabaseDataStorage) {
- return 1;
- } else {
- return 0;
- }
- }).orElse(null);
- if (historyDataReader == null) {
+
+ if (historyDataReader.isEmpty() ||
!historyDataReader.get().isServerAvailable()) {
return ResponseEntity.ok(Message.fail(FAIL_CODE, "time series
database not available"));
}
String[] names = metricFull.split("\\.");
@@ -179,9 +156,9 @@ public class MetricsDataController {
}
Map<String, List<Value>> instanceValuesMap;
if (interval == null || !interval) {
- instanceValuesMap =
historyDataReader.getHistoryMetricData(monitorId, app, metrics, metric, label,
history);
+ instanceValuesMap =
historyDataReader.get().getHistoryMetricData(monitorId, app, metrics, metric,
label, history);
} else {
- instanceValuesMap =
historyDataReader.getHistoryIntervalMetricData(monitorId, app, metrics, metric,
label, history);
+ instanceValuesMap =
historyDataReader.get().getHistoryIntervalMetricData(monitorId, app, metrics,
metric, label, history);
}
MetricsHistoryData historyData = MetricsHistoryData.builder()
.id(monitorId).metrics(metrics).values(instanceValuesMap)
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/listener/WareHouseApplicationReadyListener.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/listener/WareHouseApplicationReadyListener.java
new file mode 100644
index 000000000..a611358b5
--- /dev/null
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/listener/WareHouseApplicationReadyListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.listener;
+
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+
+
+/**
+ *
+ */
+@Slf4j
+@Component
+public class WareHouseApplicationReadyListener {
+
+ private Optional<AbstractHistoryDataStorage> historyDataStorage;
+
+ public
WareHouseApplicationReadyListener(Optional<AbstractHistoryDataStorage>
historyDataStorage) {
+ this.historyDataStorage = historyDataStorage;
+ }
+
+ @EventListener(classes = {ApplicationReadyEvent.class})
+ public void listen() {
+ if (historyDataStorage.isEmpty()) {
+ log.warn("The historical data repository is not configured");
+ }
+ }
+}
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java
index 63a0d802c..570fd7bba 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java
@@ -22,7 +22,6 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import
org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage;
-import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage;
import org.springframework.stereotype.Service;
/**
@@ -32,26 +31,16 @@ import org.springframework.stereotype.Service;
@Slf4j
public class WarehouseServiceImpl implements WarehouseService {
- private final List<AbstractRealTimeDataStorage> realTimeDataStorages;
+ private final AbstractRealTimeDataStorage realTimeDataStorage;
- public WarehouseServiceImpl(List<AbstractRealTimeDataStorage>
realTimeDataStorages) {
- this.realTimeDataStorages = realTimeDataStorages;
+ public WarehouseServiceImpl(AbstractRealTimeDataStorage
realTimeDataStorage) {
+ this.realTimeDataStorage = realTimeDataStorage;
}
@Override
public List<CollectRep.MetricsData> queryMonitorMetricsData(Long
monitorId) {
- AbstractRealTimeDataStorage realTimeDataStorage =
realTimeDataStorages.stream()
- .filter(AbstractRealTimeDataStorage::isServerAvailable)
- .max((o1, o2) -> {
- if (o1 instanceof MemoryDataStorage) {
- return -1;
- } else if (o2 instanceof MemoryDataStorage) {
- return 1;
- } else {
- return 0;
- }
- }).orElse(null);
- if (realTimeDataStorage == null) {
+ boolean available = realTimeDataStorage.isServerAvailable();
+ if (!available) {
log.error("real time store not available");
return Collections.emptyList();
}
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
index 0451200c6..6b4ed8b70 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
@@ -17,15 +17,13 @@
package org.apache.hertzbeat.warehouse.store;
-import java.util.List;
+import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.queue.CommonDataQueue;
import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
import org.apache.hertzbeat.warehouse.store.history.HistoryDataWriter;
-import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage;
import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataWriter;
-import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage;
import org.springframework.stereotype.Component;
/**
@@ -37,29 +35,23 @@ public class DataStorageDispatch {
private final CommonDataQueue commonDataQueue;
private final WarehouseWorkerPool workerPool;
- private final List<HistoryDataWriter> historyDataWriters;
- private final List<RealTimeDataWriter> realTimeDataWriters;
+
+ private final RealTimeDataWriter realTimeDataWriter;
+ private final Optional<HistoryDataWriter> historyDataWriter;
public DataStorageDispatch(CommonDataQueue commonDataQueue,
WarehouseWorkerPool workerPool,
- List<HistoryDataWriter> historyDataWriters,
- List<RealTimeDataWriter> realTimeDataWriters) {
+ Optional<HistoryDataWriter> historyDataWriter,
+ RealTimeDataWriter realTimeDataWriter) {
this.commonDataQueue = commonDataQueue;
this.workerPool = workerPool;
- this.historyDataWriters = historyDataWriters;
- this.realTimeDataWriters = realTimeDataWriters;
+ this.realTimeDataWriter = realTimeDataWriter;
+ this.historyDataWriter = historyDataWriter;
startPersistentDataStorage();
startRealTimeDataStorage();
}
private void startRealTimeDataStorage() {
- if (realTimeDataWriters == null || realTimeDataWriters.isEmpty()) {
- log.info("no real time data storage start");
- return;
- }
- if (realTimeDataWriters.size() > 1) {
- realTimeDataWriters.removeIf(MemoryDataStorage.class::isInstance);
- }
Runnable runnable = () -> {
Thread.currentThread().setName("warehouse-realtime-data-storage");
while (!Thread.currentThread().isInterrupted()) {
@@ -68,9 +60,7 @@ public class DataStorageDispatch {
if (metricsData == null) {
continue;
}
- for (RealTimeDataWriter realTimeDataWriter :
realTimeDataWriters) {
- realTimeDataWriter.saveData(metricsData);
- }
+ realTimeDataWriter.saveData(metricsData);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
@@ -80,13 +70,6 @@ public class DataStorageDispatch {
}
protected void startPersistentDataStorage() {
- if (historyDataWriters == null || historyDataWriters.isEmpty()) {
- log.info("no history data storage start");
- return;
- }
- if (historyDataWriters.size() > 1) {
-
historyDataWriters.removeIf(JpaDatabaseDataStorage.class::isInstance);
- }
Runnable runnable = () -> {
Thread.currentThread().setName("warehouse-persistent-data-storage");
while (!Thread.currentThread().isInterrupted()) {
@@ -95,8 +78,8 @@ public class DataStorageDispatch {
if (metricsData == null) {
continue;
}
- for (HistoryDataWriter historyDataWriter :
historyDataWriters) {
- historyDataWriter.saveData(metricsData);
+ if (historyDataWriter.isPresent()) {
+ historyDataWriter.get().saveData(metricsData);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
diff --git
a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
index 0a2fdfb60..8db64e1cf 100644
---
a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
+++
b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
@@ -30,6 +30,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.dto.Value;
import org.apache.hertzbeat.common.entity.message.CollectRep;
@@ -64,15 +66,9 @@ class MetricsDataControllerTest {
@Mock
RealTimeDataReader realTimeDataReader;
- private List<HistoryDataReader> historyDataReaders = new LinkedList<>();
-
- private List<RealTimeDataReader> realTimeDataReaders = new LinkedList<>();
-
@BeforeEach
void setUp() {
- historyDataReaders.add(historyDataReader);
- realTimeDataReaders.add(realTimeDataReader);
- metricsDataController = new MetricsDataController(realTimeDataReaders,
historyDataReaders);
+ metricsDataController = new MetricsDataController(realTimeDataReader,
Optional.of(historyDataReader));
this.mockMvc =
MockMvcBuilders.standaloneSetup(metricsDataController).build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]