This is an automated email from the ASF dual-hosted git repository.

gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new 9765e06ca [refactoring]  Inject a single instance of the data store 
(#1944)
9765e06ca is described below

commit 9765e06caca5fa7463bae86fa262f83b6a915a51
Author: xuziyang <[email protected]>
AuthorDate: Thu May 9 22:22:24 2024 +0800

    [refactoring]  Inject a single instance of the data store (#1944)
    
    Co-authored-by: tomsun28 <[email protected]>
---
 .../controller/MetricsDataController.java          | 59 +++++++---------------
 .../WareHouseApplicationReadyListener.java         | 48 ++++++++++++++++++
 .../warehouse/service/WarehouseServiceImpl.java    | 21 ++------
 .../warehouse/store/DataStorageDispatch.java       | 39 ++++----------
 .../controller/MetricsDataControllerTest.java      | 10 ++--
 5 files changed, 85 insertions(+), 92 deletions(-)

diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
index a5b19159c..dd787d6de 100644
--- 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.hertzbeat.common.constants.CommonConstants;
 import org.apache.hertzbeat.common.entity.dto.Field;
@@ -36,9 +37,7 @@ import org.apache.hertzbeat.common.entity.dto.Value;
 import org.apache.hertzbeat.common.entity.dto.ValueRow;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.warehouse.store.history.HistoryDataReader;
-import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage;
 import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataReader;
-import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -56,28 +55,25 @@ public class MetricsDataController {
 
     private static final Integer METRIC_FULL_LENGTH = 3;
 
-    private final List<RealTimeDataReader> realTimeDataReaders;
+    private final RealTimeDataReader realTimeDataReader;
+    private final Optional<HistoryDataReader> historyDataReader;
 
-    private final List<HistoryDataReader> historyDataReaders;
-
-    public MetricsDataController(List<RealTimeDataReader> realTimeDataReaders,
-                                 List<HistoryDataReader> historyDataReaders) {
-        this.realTimeDataReaders = realTimeDataReaders;
-        this.historyDataReaders = historyDataReaders;
+    public MetricsDataController(RealTimeDataReader realTimeDataReader,
+                                 Optional<HistoryDataReader> 
historyDataReader) {
+        this.realTimeDataReader = realTimeDataReader;
+        this.historyDataReader = historyDataReader;
     }
 
     @GetMapping("/api/warehouse/storage/status")
     @Operation(summary = "Query Warehouse Storage Server Status", description 
= "Query the availability status of the storage service under the warehouse")
     public ResponseEntity<Message<Void>> getWarehouseStorageServerStatus() {
-        boolean available = false;
-        if (historyDataReaders != null) {
-            available = 
historyDataReaders.stream().anyMatch(HistoryDataReader::isServerAvailable);
-        }
-        if (available) {
+
+        if (historyDataReader.isPresent() && 
historyDataReader.get().isServerAvailable()) {
             return ResponseEntity.ok(Message.success());
-        } else {
-            return ResponseEntity.ok(Message.fail(FAIL_CODE, "Service not 
available!"));
         }
+
+        // historyDataReader does not exist or is not available
+        return ResponseEntity.ok(Message.fail(FAIL_CODE, "Service not 
available!"));
     }
 
     @GetMapping("/api/monitor/{monitorId}/metrics/{metrics}")
@@ -87,18 +83,8 @@ public class MetricsDataController {
             @PathVariable Long monitorId,
             @Parameter(description = "Metrics Name", example = "cpu")
             @PathVariable String metrics) {
-        RealTimeDataReader realTimeDataReader = realTimeDataReaders.stream()
-                .filter(RealTimeDataReader::isServerAvailable)
-                .max((o1, o2) -> {
-                    if (o1 instanceof MemoryDataStorage) {
-                        return -1;
-                    } else if (o2 instanceof MemoryDataStorage) {
-                        return 1;
-                    } else {
-                        return 0;
-                    }
-                }).orElse(null);
-        if (realTimeDataReader == null) {
+        boolean available = realTimeDataReader.isServerAvailable();
+        if (!available) {
             return ResponseEntity.ok(Message.fail(FAIL_CODE, "real time store 
not available"));
         }
         CollectRep.MetricsData storageData = 
realTimeDataReader.getCurrentMetricsData(monitorId, metrics);
@@ -154,17 +140,8 @@ public class MetricsDataController {
             @Parameter(description = "aggregate data calc. off by default; 
4-hour window, query limit >1 week", example = "false")
             @RequestParam(required = false) Boolean interval
     ) {
-        HistoryDataReader historyDataReader = historyDataReaders.stream()
-                .filter(HistoryDataReader::isServerAvailable).max((o1, o2) -> {
-                    if (o1 instanceof JpaDatabaseDataStorage) {
-                        return -1;
-                    } else if (o2 instanceof JpaDatabaseDataStorage) {
-                        return 1;
-                    } else {
-                        return 0;
-                    }
-                }).orElse(null);
-        if (historyDataReader == null) {
+
+        if (historyDataReader.isEmpty() || 
!historyDataReader.get().isServerAvailable()) {
             return ResponseEntity.ok(Message.fail(FAIL_CODE, "time series 
database not available"));
         }
         String[] names = metricFull.split("\\.");
@@ -179,9 +156,9 @@ public class MetricsDataController {
         }
         Map<String, List<Value>> instanceValuesMap;
         if (interval == null || !interval) {
-            instanceValuesMap = 
historyDataReader.getHistoryMetricData(monitorId, app, metrics, metric, label, 
history);
+            instanceValuesMap = 
historyDataReader.get().getHistoryMetricData(monitorId, app, metrics, metric, 
label, history);
         } else {
-            instanceValuesMap = 
historyDataReader.getHistoryIntervalMetricData(monitorId, app, metrics, metric, 
label, history);
+            instanceValuesMap = 
historyDataReader.get().getHistoryIntervalMetricData(monitorId, app, metrics, 
metric, label, history);
         }
         MetricsHistoryData historyData = MetricsHistoryData.builder()
                 .id(monitorId).metrics(metrics).values(instanceValuesMap)
diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/listener/WareHouseApplicationReadyListener.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/listener/WareHouseApplicationReadyListener.java
new file mode 100644
index 000000000..a611358b5
--- /dev/null
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/listener/WareHouseApplicationReadyListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.listener;
+
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+
+
+/**
+ *
+ */
+@Slf4j
+@Component
+public class WareHouseApplicationReadyListener {
+
+    private Optional<AbstractHistoryDataStorage> historyDataStorage;
+
+    public 
WareHouseApplicationReadyListener(Optional<AbstractHistoryDataStorage> 
historyDataStorage) {
+        this.historyDataStorage = historyDataStorage;
+    }
+
+    @EventListener(classes = {ApplicationReadyEvent.class})
+    public void listen() {
+        if (historyDataStorage.isEmpty()) {
+            log.warn("The historical data repository is not configured");
+        }
+    }
+}
diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java
index 63a0d802c..570fd7bba 100644
--- 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java
@@ -22,7 +22,6 @@ import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import 
org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage;
-import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage;
 import org.springframework.stereotype.Service;
 
 /**
@@ -32,26 +31,16 @@ import org.springframework.stereotype.Service;
 @Slf4j
 public class WarehouseServiceImpl implements WarehouseService {
 
-    private final List<AbstractRealTimeDataStorage> realTimeDataStorages;
+    private final AbstractRealTimeDataStorage realTimeDataStorage;
 
-    public WarehouseServiceImpl(List<AbstractRealTimeDataStorage> 
realTimeDataStorages) {
-        this.realTimeDataStorages = realTimeDataStorages;
+    public WarehouseServiceImpl(AbstractRealTimeDataStorage 
realTimeDataStorage) {
+        this.realTimeDataStorage = realTimeDataStorage;
     }
 
     @Override
     public List<CollectRep.MetricsData> queryMonitorMetricsData(Long 
monitorId) {
-        AbstractRealTimeDataStorage realTimeDataStorage = 
realTimeDataStorages.stream()
-                .filter(AbstractRealTimeDataStorage::isServerAvailable)
-                .max((o1, o2) -> {
-                    if (o1 instanceof MemoryDataStorage) {
-                        return -1;
-                    } else if (o2 instanceof MemoryDataStorage) {
-                        return 1;
-                    } else {
-                        return 0;
-                    }
-                }).orElse(null);
-        if (realTimeDataStorage == null) {
+        boolean available = realTimeDataStorage.isServerAvailable();
+        if (!available) {
             log.error("real time store not available");
             return Collections.emptyList();
         }
diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
index 0451200c6..6b4ed8b70 100644
--- 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
@@ -17,15 +17,13 @@
 
 package org.apache.hertzbeat.warehouse.store;
 
-import java.util.List;
+import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.queue.CommonDataQueue;
 import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
 import org.apache.hertzbeat.warehouse.store.history.HistoryDataWriter;
-import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage;
 import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataWriter;
-import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage;
 import org.springframework.stereotype.Component;
 
 /**
@@ -37,29 +35,23 @@ public class DataStorageDispatch {
 
     private final CommonDataQueue commonDataQueue;
     private final WarehouseWorkerPool workerPool;
-    private final List<HistoryDataWriter> historyDataWriters;
-    private final List<RealTimeDataWriter> realTimeDataWriters;
+
+    private final RealTimeDataWriter realTimeDataWriter;
+    private final Optional<HistoryDataWriter> historyDataWriter;
 
     public DataStorageDispatch(CommonDataQueue commonDataQueue,
                                WarehouseWorkerPool workerPool,
-                               List<HistoryDataWriter> historyDataWriters,
-                               List<RealTimeDataWriter> realTimeDataWriters) {
+                               Optional<HistoryDataWriter> historyDataWriter,
+                               RealTimeDataWriter realTimeDataWriter) {
         this.commonDataQueue = commonDataQueue;
         this.workerPool = workerPool;
-        this.historyDataWriters = historyDataWriters;
-        this.realTimeDataWriters = realTimeDataWriters;
+        this.realTimeDataWriter = realTimeDataWriter;
+        this.historyDataWriter = historyDataWriter;
         startPersistentDataStorage();
         startRealTimeDataStorage();
     }
 
     private void startRealTimeDataStorage() {
-        if (realTimeDataWriters == null || realTimeDataWriters.isEmpty()) {
-            log.info("no real time data storage start");
-            return;
-        }
-        if (realTimeDataWriters.size() > 1) {
-            realTimeDataWriters.removeIf(MemoryDataStorage.class::isInstance);
-        }
         Runnable runnable = () -> {
             Thread.currentThread().setName("warehouse-realtime-data-storage");
             while (!Thread.currentThread().isInterrupted()) {
@@ -68,9 +60,7 @@ public class DataStorageDispatch {
                     if (metricsData == null) {
                         continue;
                     }
-                    for (RealTimeDataWriter realTimeDataWriter : 
realTimeDataWriters) {
-                        realTimeDataWriter.saveData(metricsData);
-                    }
+                    realTimeDataWriter.saveData(metricsData);
                 } catch (Exception e) {
                     log.error(e.getMessage(), e);
                 }
@@ -80,13 +70,6 @@ public class DataStorageDispatch {
     }
 
     protected void startPersistentDataStorage() {
-        if (historyDataWriters == null || historyDataWriters.isEmpty()) {
-            log.info("no history data storage start");
-            return;
-        }
-        if (historyDataWriters.size() > 1) {
-            
historyDataWriters.removeIf(JpaDatabaseDataStorage.class::isInstance);
-        }
         Runnable runnable = () -> {
             
Thread.currentThread().setName("warehouse-persistent-data-storage");
             while (!Thread.currentThread().isInterrupted()) {
@@ -95,8 +78,8 @@ public class DataStorageDispatch {
                     if (metricsData == null) {
                         continue;
                     }
-                    for (HistoryDataWriter historyDataWriter : 
historyDataWriters) {
-                        historyDataWriter.saveData(metricsData);
+                    if (historyDataWriter.isPresent()) {
+                        historyDataWriter.get().saveData(metricsData);
                     }
                 } catch (Exception e) {
                     log.error(e.getMessage(), e);
diff --git 
a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
 
b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
index 0a2fdfb60..8db64e1cf 100644
--- 
a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
+++ 
b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
@@ -30,6 +30,8 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+
 import org.apache.hertzbeat.common.constants.CommonConstants;
 import org.apache.hertzbeat.common.entity.dto.Value;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
@@ -64,15 +66,9 @@ class MetricsDataControllerTest {
     @Mock
     RealTimeDataReader realTimeDataReader;
 
-    private List<HistoryDataReader> historyDataReaders = new LinkedList<>();
-
-    private List<RealTimeDataReader> realTimeDataReaders = new LinkedList<>();
-
     @BeforeEach
     void setUp() {
-        historyDataReaders.add(historyDataReader);
-        realTimeDataReaders.add(realTimeDataReader);
-        metricsDataController = new MetricsDataController(realTimeDataReaders, 
historyDataReaders);
+        metricsDataController = new MetricsDataController(realTimeDataReader, 
Optional.of(historyDataReader));
         this.mockMvc = 
MockMvcBuilders.standaloneSetup(metricsDataController).build();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to