This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 6473d20c5 [refactor] split AbstractRealTimeDataStorage class (#1935)
6473d20c5 is described below
commit 6473d20c5b5abd9d7a33717ef1f1311126a76a76
Author: xuziyang <[email protected]>
AuthorDate: Wed May 8 17:55:45 2024 +0800
[refactor] split AbstractRealTimeDataStorage class (#1935)
---
.../controller/MetricsDataController.java | 16 ++++-----
.../warehouse/store/DataStorageDispatch.java | 18 +++++-----
.../realtime/AbstractRealTimeDataStorage.java | 27 ++-------------
...imeDataStorage.java => RealTimeDataReader.java} | 27 +++++----------
.../store/realtime/RealTimeDataWriter.java | 38 ++++++++++++++++++++++
.../controller/MetricsDataControllerTest.java | 18 +++++-----
6 files changed, 74 insertions(+), 70 deletions(-)
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
index 6c6416c3e..a5b19159c 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java
@@ -37,7 +37,7 @@ import org.apache.hertzbeat.common.entity.dto.ValueRow;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.warehouse.store.history.HistoryDataReader;
import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage;
-import
org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage;
+import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataReader;
import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
@@ -56,13 +56,13 @@ public class MetricsDataController {
private static final Integer METRIC_FULL_LENGTH = 3;
- private final List<AbstractRealTimeDataStorage> realTimeDataStorages;
+ private final List<RealTimeDataReader> realTimeDataReaders;
private final List<HistoryDataReader> historyDataReaders;
- public MetricsDataController(List<AbstractRealTimeDataStorage>
realTimeDataStorages,
+ public MetricsDataController(List<RealTimeDataReader> realTimeDataReaders,
List<HistoryDataReader> historyDataReaders) {
- this.realTimeDataStorages = realTimeDataStorages;
+ this.realTimeDataReaders = realTimeDataReaders;
this.historyDataReaders = historyDataReaders;
}
@@ -87,8 +87,8 @@ public class MetricsDataController {
@PathVariable Long monitorId,
@Parameter(description = "Metrics Name", example = "cpu")
@PathVariable String metrics) {
- AbstractRealTimeDataStorage realTimeDataStorage =
realTimeDataStorages.stream()
- .filter(AbstractRealTimeDataStorage::isServerAvailable)
+ RealTimeDataReader realTimeDataReader = realTimeDataReaders.stream()
+ .filter(RealTimeDataReader::isServerAvailable)
.max((o1, o2) -> {
if (o1 instanceof MemoryDataStorage) {
return -1;
@@ -98,10 +98,10 @@ public class MetricsDataController {
return 0;
}
}).orElse(null);
- if (realTimeDataStorage == null) {
+ if (realTimeDataReader == null) {
return ResponseEntity.ok(Message.fail(FAIL_CODE, "real time store
not available"));
}
- CollectRep.MetricsData storageData =
realTimeDataStorage.getCurrentMetricsData(monitorId, metrics);
+ CollectRep.MetricsData storageData =
realTimeDataReader.getCurrentMetricsData(monitorId, metrics);
if (storageData == null) {
return ResponseEntity.ok(Message.success("query metrics data is
empty"));
}
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
index f59775459..0451200c6 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
@@ -24,7 +24,7 @@ import org.apache.hertzbeat.common.queue.CommonDataQueue;
import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
import org.apache.hertzbeat.warehouse.store.history.HistoryDataWriter;
import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage;
-import
org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage;
+import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataWriter;
import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage;
import org.springframework.stereotype.Component;
@@ -38,27 +38,27 @@ public class DataStorageDispatch {
private final CommonDataQueue commonDataQueue;
private final WarehouseWorkerPool workerPool;
private final List<HistoryDataWriter> historyDataWriters;
- private final List<AbstractRealTimeDataStorage> realTimeDataStorages;
+ private final List<RealTimeDataWriter> realTimeDataWriters;
public DataStorageDispatch(CommonDataQueue commonDataQueue,
WarehouseWorkerPool workerPool,
List<HistoryDataWriter> historyDataWriters,
- List<AbstractRealTimeDataStorage>
realTimeDataStorages) {
+ List<RealTimeDataWriter> realTimeDataWriters) {
this.commonDataQueue = commonDataQueue;
this.workerPool = workerPool;
this.historyDataWriters = historyDataWriters;
- this.realTimeDataStorages = realTimeDataStorages;
+ this.realTimeDataWriters = realTimeDataWriters;
startPersistentDataStorage();
startRealTimeDataStorage();
}
private void startRealTimeDataStorage() {
- if (realTimeDataStorages == null || realTimeDataStorages.isEmpty()) {
+ if (realTimeDataWriters == null || realTimeDataWriters.isEmpty()) {
log.info("no real time data storage start");
return;
}
- if (realTimeDataStorages.size() > 1) {
- realTimeDataStorages.removeIf(MemoryDataStorage.class::isInstance);
+ if (realTimeDataWriters.size() > 1) {
+ realTimeDataWriters.removeIf(MemoryDataStorage.class::isInstance);
}
Runnable runnable = () -> {
Thread.currentThread().setName("warehouse-realtime-data-storage");
@@ -68,8 +68,8 @@ public class DataStorageDispatch {
if (metricsData == null) {
continue;
}
- for (AbstractRealTimeDataStorage realTimeDataStorage :
realTimeDataStorages) {
- realTimeDataStorage.saveData(metricsData);
+ for (RealTimeDataWriter realTimeDataWriter :
realTimeDataWriters) {
+ realTimeDataWriter.saveData(metricsData);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java
index d5f631c53..04eb33b33 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java
@@ -17,45 +17,22 @@
package org.apache.hertzbeat.warehouse.store.realtime;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.springframework.beans.factory.DisposableBean;
-import org.springframework.lang.NonNull;
/**
* Real-time data storage abstract class
*/
@Slf4j
-public abstract class AbstractRealTimeDataStorage implements DisposableBean {
+public abstract class AbstractRealTimeDataStorage implements
RealTimeDataReader, RealTimeDataWriter, DisposableBean {
protected boolean serverAvailable;
/**
* @return data Whether the storage is available
*/
+ @Override
public boolean isServerAvailable() {
return serverAvailable;
}
-
- /**
- * save collect metrics data
- * @param metricsData metrics data
- */
- public abstract void saveData(CollectRep.MetricsData metricsData);
-
- /**
- * query real-time last metrics data
- * @param monitorId monitorId
- * @param metric metric name
- * @return metrics data
- */
- public abstract CollectRep.MetricsData getCurrentMetricsData(@NonNull Long
monitorId, @NonNull String metric);
-
- /**
- * query real-time last metrics data
- * @param monitorId monitor id
- * @return metrics data
- */
- public abstract List<CollectRep.MetricsData>
getCurrentMetricsData(@NonNull Long monitorId);
}
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataReader.java
similarity index 62%
copy from
warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java
copy to
warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataReader.java
index d5f631c53..3d01dc678 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataReader.java
@@ -18,31 +18,19 @@
package org.apache.hertzbeat.warehouse.store.realtime;
import java.util.List;
-import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.message.CollectRep;
-import org.springframework.beans.factory.DisposableBean;
import org.springframework.lang.NonNull;
+
/**
- * Real-time data storage abstract class
+ * Real-time data reading class
*/
-@Slf4j
-public abstract class AbstractRealTimeDataStorage implements DisposableBean {
-
- protected boolean serverAvailable;
-
- /**
- * @return data Whether the storage is available
- */
- public boolean isServerAvailable() {
- return serverAvailable;
- }
+public interface RealTimeDataReader {
/**
- * save collect metrics data
- * @param metricsData metrics data
+ * @return data storage available
*/
- public abstract void saveData(CollectRep.MetricsData metricsData);
+ boolean isServerAvailable();
/**
* query real-time last metrics data
@@ -50,12 +38,13 @@ public abstract class AbstractRealTimeDataStorage
implements DisposableBean {
* @param metric metric name
* @return metrics data
*/
- public abstract CollectRep.MetricsData getCurrentMetricsData(@NonNull Long
monitorId, @NonNull String metric);
+ CollectRep.MetricsData getCurrentMetricsData(@NonNull Long monitorId,
@NonNull String metric);
/**
* query real-time last metrics data
* @param monitorId monitor id
* @return metrics data
*/
- public abstract List<CollectRep.MetricsData>
getCurrentMetricsData(@NonNull Long monitorId);
+ List<CollectRep.MetricsData> getCurrentMetricsData(@NonNull Long
monitorId);
+
}
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataWriter.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataWriter.java
new file mode 100644
index 000000000..46eaeed04
--- /dev/null
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataWriter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.realtime;
+
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+
+/**
+ * Real-time data writing class
+ */
+public interface RealTimeDataWriter {
+
+ /**
+ * @return data storage available
+ */
+ boolean isServerAvailable();
+
+ /**
+ * save metrics data
+ * @param metricsData metrics data
+ */
+ void saveData(CollectRep.MetricsData metricsData);
+
+}
diff --git
a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
index baf99b0ef..0a2fdfb60 100644
---
a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
+++
b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java
@@ -34,7 +34,7 @@ import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.dto.Value;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.warehouse.store.history.HistoryDataReader;
-import
org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage;
+import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataReader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -62,17 +62,17 @@ class MetricsDataControllerTest {
HistoryDataReader historyDataReader;
@Mock
- AbstractRealTimeDataStorage realTimeDataStorage;
+ RealTimeDataReader realTimeDataReader;
private List<HistoryDataReader> historyDataReaders = new LinkedList<>();
- private List<AbstractRealTimeDataStorage> realTimeDataStorages = new
LinkedList<>();
+ private List<RealTimeDataReader> realTimeDataReaders = new LinkedList<>();
@BeforeEach
void setUp() {
historyDataReaders.add(historyDataReader);
- realTimeDataStorages.add(realTimeDataStorage);
- metricsDataController = new
MetricsDataController(realTimeDataStorages, historyDataReaders);
+ realTimeDataReaders.add(realTimeDataReader);
+ metricsDataController = new MetricsDataController(realTimeDataReaders,
historyDataReaders);
this.mockMvc =
MockMvcBuilders.standaloneSetup(metricsDataController).build();
}
@@ -101,8 +101,8 @@ class MetricsDataControllerTest {
final long time = System.currentTimeMillis();
final String getUrl = "/api/monitor/" + monitorId + "/metrics/" +
metric;
- when(realTimeDataStorage.getCurrentMetricsData(eq(monitorId),
eq(metric))).thenReturn(null);
- when(realTimeDataStorage.isServerAvailable()).thenReturn(true);
+ when(realTimeDataReader.getCurrentMetricsData(eq(monitorId),
eq(metric))).thenReturn(null);
+ when(realTimeDataReader.isServerAvailable()).thenReturn(true);
this.mockMvc.perform(MockMvcRequestBuilders.get(getUrl))
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
@@ -116,8 +116,8 @@ class MetricsDataControllerTest {
.setMetrics(metric)
.setTime(time)
.build();
- when(realTimeDataStorage.getCurrentMetricsData(eq(monitorId),
eq(metric))).thenReturn(metricsData);
- when(realTimeDataStorage.isServerAvailable()).thenReturn(true);
+ when(realTimeDataReader.getCurrentMetricsData(eq(monitorId),
eq(metric))).thenReturn(metricsData);
+ when(realTimeDataReader.isServerAvailable()).thenReturn(true);
this.mockMvc.perform(MockMvcRequestBuilders.get(getUrl))
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]