This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new e52e715a53 improvement: optimize log stream for high TPS scenarios
(#3876)
e52e715a53 is described below
commit e52e715a531822c196bede3627d3e721d1f42afa
Author: Yang Chen <[email protected]>
AuthorDate: Wed Dec 3 11:18:49 2025 +0800
improvement: optimize log stream for high TPS scenarios (#3876)
Signed-off-by: Yang Chen <[email protected]>
Co-authored-by: factory-droid[bot]
<138933559+factory-droid[bot]@users.noreply.github.com>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Duansg <[email protected]>
---
.../WindowedLogRealTimeAlertCalculator.java | 1 -
.../WindowedLogRealTimeAlertCalculatorTest.java | 3 -
.../collector/dispatch/export/NettyDataQueue.java | 22 ++
.../hertzbeat/common/queue/CommonDataQueue.java | 29 +++
.../common/queue/impl/InMemoryCommonDataQueue.java | 45 ++++
.../common/queue/impl/KafkaCommonDataQueue.java | 88 +++++++
.../common/queue/impl/RedisCommonDataQueue.java | 51 ++++
hertzbeat-log/pom.xml | 11 +
.../log/controller/LogIngestionController.java | 2 +-
.../apache/hertzbeat/log/notice/LogSseManager.java | 169 ++++++++++---
.../log/service/impl/OtlpLogProtocolAdapter.java | 10 +-
.../hertzbeat/log/notice/LogSseManagerTest.java | 52 ++--
.../service/impl/OtlpLogProtocolAdapterTest.java | 56 +++--
.../warehouse/store/DataStorageDispatch.java | 13 +-
.../store/history/tsdb/HistoryDataWriter.java | 13 +
.../tsdb/greptime/GreptimeDbDataStorage.java | 64 +++++
material/licenses/LICENSE | 1 +
.../log/log-stream/log-stream.component.html | 69 +++--
.../log/log-stream/log-stream.component.less | 36 ++-
.../routes/log/log-stream/log-stream.component.ts | 279 ++++++++++++---------
20 files changed, 769 insertions(+), 245 deletions(-)
diff --git
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
index 90d12be6c8..5b84f46b4a 100644
---
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
+++
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
@@ -71,7 +71,6 @@ public class WindowedLogRealTimeAlertCalculator implements
Runnable {
}
backoff.reset();
processLogEntry(logEntry);
- dataQueue.sendLogEntryToStorage(logEntry);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
diff --git
a/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculatorTest.java
b/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculatorTest.java
index b990b91d3d..e07f573b09 100644
---
a/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculatorTest.java
+++
b/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculatorTest.java
@@ -108,7 +108,6 @@ class WindowedLogRealTimeAlertCalculatorTest {
verify(timeService).isLateData(anyLong());
verify(timeService).updateMaxTimestamp(anyLong());
verify(logWorker).reduceAndSendLogTask(validLogEntry);
- verify(dataQueue).sendLogEntryToStorage(validLogEntry);
}
@Test
@@ -134,7 +133,6 @@ class WindowedLogRealTimeAlertCalculatorTest {
verify(timeService, never()).isLateData(anyLong());
verify(timeService, never()).updateMaxTimestamp(anyLong());
verify(logWorker, never()).reduceAndSendLogTask(any());
- verify(dataQueue).sendLogEntryToStorage(invalidTimestampLogEntry);
}
@Test
@@ -162,7 +160,6 @@ class WindowedLogRealTimeAlertCalculatorTest {
verify(timeService).isLateData(anyLong());
verify(timeService, never()).updateMaxTimestamp(anyLong());
verify(logWorker, never()).reduceAndSendLogTask(any());
- verify(dataQueue).sendLogEntryToStorage(lateDataLogEntry);
}
@Test
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/export/NettyDataQueue.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/export/NettyDataQueue.java
index 2328496b91..bdd001f12f 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/export/NettyDataQueue.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/export/NettyDataQueue.java
@@ -25,6 +25,8 @@ import org.apache.hertzbeat.common.queue.CommonDataQueue;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
+import java.util.List;
+
/**
* for collector instance
* send collect response data by netty
@@ -108,4 +110,24 @@ public class NettyDataQueue implements CommonDataQueue {
public LogEntry pollLogEntryToStorage() throws InterruptedException {
return null;
}
+
+ @Override
+ public void sendLogEntryToAlertBatch(List<LogEntry> logEntries) {
+
+ }
+
+ @Override
+ public List<LogEntry> pollLogEntryToAlertBatch(int maxBatchSize) throws
InterruptedException {
+ return List.of();
+ }
+
+ @Override
+ public void sendLogEntryToStorageBatch(List<LogEntry> logEntries) {
+
+ }
+
+ @Override
+ public List<LogEntry> pollLogEntryToStorageBatch(int maxBatchSize) throws
InterruptedException {
+ return List.of();
+ }
}
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/CommonDataQueue.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/CommonDataQueue.java
index 4abbc29b14..3b1cb5643d 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/CommonDataQueue.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/CommonDataQueue.java
@@ -17,6 +17,7 @@
package org.apache.hertzbeat.common.queue;
+import java.util.List;
import org.apache.hertzbeat.common.entity.log.LogEntry;
import org.apache.hertzbeat.common.entity.message.CollectRep;
@@ -91,4 +92,32 @@ public interface CommonDataQueue {
* @throws InterruptedException when poll timeout
*/
LogEntry pollLogEntryToStorage() throws InterruptedException;
+
+ /**
+ * send batch log entries to alert queue
+ * @param logEntries list of log entry data
+ */
+ void sendLogEntryToAlertBatch(List<LogEntry> logEntries);
+
+ /**
+ * poll batch log entries from alert queue
+ * @param maxBatchSize maximum number of entries to poll
+ * @return list of log entry data
+ * @throws InterruptedException when poll timeout
+ */
+ List<LogEntry> pollLogEntryToAlertBatch(int maxBatchSize) throws
InterruptedException;
+
+ /**
+ * send batch log entries to storage queue
+ * @param logEntries list of log entry data
+ */
+ void sendLogEntryToStorageBatch(List<LogEntry> logEntries);
+
+ /**
+ * poll batch log entries from storage queue
+ * @param maxBatchSize maximum number of entries to poll
+ * @return list of log entry data
+ * @throws InterruptedException when poll timeout
+ */
+ List<LogEntry> pollLogEntryToStorageBatch(int maxBatchSize) throws
InterruptedException;
}
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/InMemoryCommonDataQueue.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/InMemoryCommonDataQueue.java
index bb8f61a6cf..9a9e22655d 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/InMemoryCommonDataQueue.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/InMemoryCommonDataQueue.java
@@ -17,9 +17,12 @@
package org.apache.hertzbeat.common.queue.impl;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.constants.DataQueueConstants;
import org.apache.hertzbeat.common.entity.log.LogEntry;
@@ -117,6 +120,48 @@ public class InMemoryCommonDataQueue implements
CommonDataQueue, DisposableBean
return logEntryToStorageQueue.take();
}
+ @Override
+ public void sendLogEntryToAlertBatch(List<LogEntry> logEntries) {
+ if (logEntries == null || logEntries.isEmpty()) {
+ return;
+ }
+ for (LogEntry logEntry : logEntries) {
+ logEntryQueue.offer(logEntry);
+ }
+ }
+
+ @Override
+ public List<LogEntry> pollLogEntryToAlertBatch(int maxBatchSize) throws
InterruptedException {
+ List<LogEntry> batch = new ArrayList<>(maxBatchSize);
+ LogEntry first = logEntryQueue.poll(1, TimeUnit.SECONDS);
+ if (first != null) {
+ batch.add(first);
+ logEntryQueue.drainTo(batch, maxBatchSize - 1);
+ }
+ return batch;
+ }
+
+ @Override
+ public void sendLogEntryToStorageBatch(List<LogEntry> logEntries) {
+ if (logEntries == null || logEntries.isEmpty()) {
+ return;
+ }
+ for (LogEntry logEntry : logEntries) {
+ logEntryToStorageQueue.offer(logEntry);
+ }
+ }
+
+ @Override
+ public List<LogEntry> pollLogEntryToStorageBatch(int maxBatchSize) throws
InterruptedException {
+ List<LogEntry> batch = new ArrayList<>(maxBatchSize);
+ LogEntry first = logEntryToStorageQueue.poll(1, TimeUnit.SECONDS);
+ if (first != null) {
+ batch.add(first);
+ logEntryToStorageQueue.drainTo(batch, maxBatchSize - 1);
+ }
+ return batch;
+ }
+
@Override
public void destroy() {
metricsDataToAlertQueue.clear();
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
index 17b4e3d4d4..4e77a7f390 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
@@ -18,8 +18,10 @@
package org.apache.hertzbeat.common.queue.impl;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
@@ -261,6 +263,92 @@ public class KafkaCommonDataQueue implements
CommonDataQueue, DisposableBean {
return genericPollDataFunction(logEntryToStorageQueue,
logEntryToStorageConsumer, logEntryToStorageLock);
}
+ @Override
+ public void sendLogEntryToAlertBatch(List<LogEntry> logEntries) {
+ if (logEntries == null || logEntries.isEmpty()) {
+ return;
+ }
+ if (logEntryProducer != null) {
+ try {
+ for (LogEntry logEntry : logEntries) {
+ ProducerRecord<Long, LogEntry> record = new
ProducerRecord<>(kafka.getLogEntryDataTopic(), logEntry);
+ logEntryProducer.send(record);
+ }
+ } catch (Exception e) {
+ log.error("Failed to send LogEntry batch to Kafka: {}",
e.getMessage());
+ for (LogEntry logEntry : logEntries) {
+ logEntryQueue.offer(logEntry);
+ }
+ }
+ } else {
+ log.warn("logEntryProducer is not enabled, using memory queue");
+ for (LogEntry logEntry : logEntries) {
+ logEntryQueue.offer(logEntry);
+ }
+ }
+ }
+
+ @Override
+ public List<LogEntry> pollLogEntryToAlertBatch(int maxBatchSize) throws
InterruptedException {
+ return genericBatchPollDataFunction(logEntryQueue, logEntryConsumer,
logEntryLock, maxBatchSize);
+ }
+
+ @Override
+ public void sendLogEntryToStorageBatch(List<LogEntry> logEntries) {
+ if (logEntries == null || logEntries.isEmpty()) {
+ return;
+ }
+ if (logEntryProducer != null) {
+ try {
+ for (LogEntry logEntry : logEntries) {
+ ProducerRecord<Long, LogEntry> record = new
ProducerRecord<>(kafka.getLogEntryDataToStorageTopic(), logEntry);
+ logEntryProducer.send(record);
+ }
+ } catch (Exception e) {
+ log.error("Failed to send LogEntry batch to storage via Kafka:
{}", e.getMessage());
+ for (LogEntry logEntry : logEntries) {
+ logEntryToStorageQueue.offer(logEntry);
+ }
+ }
+ } else {
+ log.warn("logEntryProducer is not enabled, using memory queue for
storage");
+ for (LogEntry logEntry : logEntries) {
+ logEntryToStorageQueue.offer(logEntry);
+ }
+ }
+ }
+
+ @Override
+ public List<LogEntry> pollLogEntryToStorageBatch(int maxBatchSize) throws
InterruptedException {
+ return genericBatchPollDataFunction(logEntryToStorageQueue,
logEntryToStorageConsumer, logEntryToStorageLock, maxBatchSize);
+ }
+
+ public <T> List<T> genericBatchPollDataFunction(LinkedBlockingQueue<T>
dataQueue, KafkaConsumer<Long, T> dataConsumer,
+ ReentrantLock lock, int maxBatchSize) throws InterruptedException {
+ List<T> batch = new ArrayList<>(maxBatchSize);
+ lock.lockInterruptibly();
+ try {
+ dataQueue.drainTo(batch, maxBatchSize);
+ if (batch.size() >= maxBatchSize) {
+ return batch;
+ }
+ ConsumerRecords<Long, T> records =
dataConsumer.poll(Duration.ofSeconds(1));
+ for (ConsumerRecord<Long, T> record : records) {
+ if (batch.size() < maxBatchSize) {
+ batch.add(record.value());
+ } else {
+ dataQueue.offer(record.value());
+ }
+ }
+ dataConsumer.commitAsync();
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ } finally {
+ lock.unlock();
+ }
+ return batch;
+ }
+
@Override
public void destroy() throws Exception {
if (metricsDataProducer != null) {
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
index 7f82f6372b..49c187ab59 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
@@ -17,6 +17,8 @@
package org.apache.hertzbeat.common.queue.impl;
+import java.util.ArrayList;
+import java.util.List;
import io.lettuce.core.KeyValue;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
@@ -162,6 +164,42 @@ public class RedisCommonDataQueue implements
CommonDataQueue, DisposableBean {
return genericBlockingPollFunction(logEntryToStorageQueueName,
logEntrySyncCommands);
}
+ @Override
+ @SuppressWarnings("unchecked")
+ public void sendLogEntryToAlertBatch(List<LogEntry> logEntries) {
+ if (logEntries == null || logEntries.isEmpty()) {
+ return;
+ }
+ try {
+ logEntrySyncCommands.lpush(logEntryQueueName,
logEntries.toArray(new LogEntry[0]));
+ } catch (Exception e) {
+ log.error("Failed to send LogEntry batch to Redis: {}",
e.getMessage());
+ }
+ }
+
+ @Override
+ public List<LogEntry> pollLogEntryToAlertBatch(int maxBatchSize) throws
InterruptedException {
+ return genericBatchPollFunction(logEntryQueueName,
logEntrySyncCommands, maxBatchSize);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void sendLogEntryToStorageBatch(List<LogEntry> logEntries) {
+ if (logEntries == null || logEntries.isEmpty()) {
+ return;
+ }
+ try {
+ logEntrySyncCommands.lpush(logEntryToStorageQueueName,
logEntries.toArray(new LogEntry[0]));
+ } catch (Exception e) {
+ log.error("Failed to send LogEntry batch to storage via Redis:
{}", e.getMessage());
+ }
+ }
+
+ @Override
+ public List<LogEntry> pollLogEntryToStorageBatch(int maxBatchSize) throws
InterruptedException {
+ return genericBatchPollFunction(logEntryToStorageQueueName,
logEntrySyncCommands, maxBatchSize);
+ }
+
@Override
public void destroy() {
connection.close();
@@ -188,4 +226,17 @@ public class RedisCommonDataQueue implements
CommonDataQueue, DisposableBean {
}
}
+ private List<LogEntry> genericBatchPollFunction(String key,
RedisCommands<String, LogEntry> commands, int maxBatchSize) {
+ List<LogEntry> batch = new ArrayList<>(maxBatchSize);
+ try {
+ List<LogEntry> elements = commands.rpop(key, maxBatchSize);
+ if (elements != null) {
+ batch.addAll(elements);
+ }
+ } catch (Exception e) {
+ log.error("Redis batch poll failed: {}", e.getMessage());
+ }
+ return batch;
+ }
+
}
diff --git a/hertzbeat-log/pom.xml b/hertzbeat-log/pom.xml
index e1859501b1..7bc9c65fe3 100644
--- a/hertzbeat-log/pom.xml
+++ b/hertzbeat-log/pom.xml
@@ -28,6 +28,10 @@
<artifactId>hertzbeat-log</artifactId>
<name>${project.artifactId}</name>
+ <properties>
+ <awaitility.version>4.2.0</awaitility.version>
+ </properties>
+
<dependencies>
<!-- common -->
<dependency>
@@ -67,6 +71,13 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
+ <!-- awaitility for async testing -->
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogIngestionController.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogIngestionController.java
index 125b10904d..2536986ae8 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogIngestionController.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogIngestionController.java
@@ -60,7 +60,7 @@ public class LogIngestionController {
@PostMapping("/ingest/{protocol}")
public ResponseEntity<Message<Void>>
ingestExternLog(@PathVariable("protocol") String protocol,
@RequestBody String
content) {
- log.info("Receive extern log from protocol: {}, content length: {}",
protocol, content == null ? 0 : content.length());
+ log.debug("Receive extern log from protocol: {}, content length: {}",
protocol, content == null ? 0 : content.length());
if (!StringUtils.hasText(protocol)) {
protocol = DEFAULT_PROTOCOL; // Default to OTLP if no protocol
specified
}
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/notice/LogSseManager.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/notice/LogSseManager.java
index 432b27d8cc..31a537c6f2 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/notice/LogSseManager.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/notice/LogSseManager.java
@@ -19,34 +19,75 @@
package org.apache.hertzbeat.log.notice;
+import jakarta.annotation.PreDestroy;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.log.LogEntry;
-import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * SSE manager for log
+ * SSE manager for log with batch processing support for high TPS scenarios
*/
@Component
@Slf4j
@Getter
public class LogSseManager {
+
+ private static final long BATCH_INTERVAL_MS = 200;
+ private static final int MAX_BATCH_SIZE = 1000;
+ private static final int MAX_QUEUE_SIZE = 10000;
+
private final Map<Long, SseSubscriber> emitters = new
ConcurrentHashMap<>();
+ private final Queue<LogEntry> logQueue = new ConcurrentLinkedQueue<>();
+ private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "sse-batch-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
+ private final ExecutorService senderPool = Executors.newCachedThreadPool(r
-> {
+ Thread t = new Thread(r, "sse-sender");
+ t.setDaemon(true);
+ return t;
+ });
+ private final AtomicLong queueSize = new AtomicLong(0);
+
+ public LogSseManager() {
+ scheduler.scheduleAtFixedRate(this::flushBatch, BATCH_INTERVAL_MS,
BATCH_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ scheduler.shutdown();
+ senderPool.shutdown();
+ try {
+ scheduler.awaitTermination(2, TimeUnit.SECONDS);
+ senderPool.awaitTermination(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ scheduler.shutdownNow();
+ senderPool.shutdownNow();
+ }
/**
* Create a new SSE emitter for a client with specified filters
- * @param clientId The unique identifier for the client
- * @param filters The filters to apply to the log data
- * @return The SSE emitter
*/
public SseEmitter createEmitter(Long clientId, LogSseFilterCriteria
filters) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
@@ -54,55 +95,115 @@ public class LogSseManager {
emitter.onTimeout(() -> removeEmitter(clientId));
emitter.onError((ex) -> removeEmitter(clientId));
- SseSubscriber subscriber = new SseSubscriber(emitter, filters);
- emitters.put(clientId, subscriber);
+ emitters.put(clientId, new SseSubscriber(emitter, filters));
return emitter;
}
/**
- * Broadcast log data to all subscribers
- * @param logEntry The log data to broadcast
+ * Queue log entry for batch processing
*/
- @Async
public void broadcast(LogEntry logEntry) {
- emitters.forEach((clientId, subscriber) -> {
- try {
- // Check if the log entry matches the subscriber's filter
criteria
- if (subscriber.filters == null ||
subscriber.filters.matches(logEntry)) {
- subscriber.emitter.send(SseEmitter.event()
- .id(String.valueOf(System.currentTimeMillis()))
- .name("LOG_EVENT")
- .data(logEntry));
+ if (queueSize.incrementAndGet() > MAX_QUEUE_SIZE) {
+ queueSize.decrementAndGet();
+ return;
+ }
+ boolean offered = logQueue.offer(logEntry);
+ if (!offered) {
+ queueSize.decrementAndGet();
+ log.warn("Failed to enqueue log entry: {}", logEntry);
+ }
+ }
+
+ /**
+ * Flush queued logs to all subscribers in batch
+ */
+ private void flushBatch() {
+ try {
+ if (logQueue.isEmpty() || emitters.isEmpty()) {
+ return;
+ }
+
+ List<LogEntry> batch = new ArrayList<>(MAX_BATCH_SIZE);
+ LogEntry entry;
+ while (batch.size() < MAX_BATCH_SIZE && (entry = logQueue.poll())
!= null) {
+ batch.add(entry);
+ queueSize.decrementAndGet();
+ }
+
+ if (batch.isEmpty()) {
+ return;
+ }
+
+ // Send to each subscriber in parallel
+ for (Map.Entry<Long, SseSubscriber> e : emitters.entrySet()) {
+ Long clientId = e.getKey();
+ SseSubscriber subscriber = e.getValue();
+ List<LogEntry> filtered = filterLogs(batch,
subscriber.filters);
+ if (!filtered.isEmpty()) {
+ senderPool.submit(() -> sendToSubscriber(clientId,
subscriber.emitter, filtered));
}
- } catch (IOException | IllegalStateException e) {
- subscriber.emitter.complete();
- removeEmitter(clientId);
- } catch (Exception exception) {
- log.error("Failed to broadcast log to client: {}",
exception.getMessage());
- subscriber.emitter.complete();
- removeEmitter(clientId);
}
- });
+ } catch (Exception e) {
+ log.error("Error in flushBatch: {}", e.getMessage(), e);
+ }
+ }
+
+ private void sendToSubscriber(Long clientId, SseEmitter emitter,
List<LogEntry> logs) {
+ try {
+ long batchTimestamp = System.currentTimeMillis();
+ int sequenceNumber = 0;
+ for (LogEntry logEntry : logs) {
+ String eventId = batchTimestamp + "-" + sequenceNumber++;
+ emitter.send(SseEmitter.event()
+ .id(eventId)
+ .name("LOG_EVENT")
+ .data(logEntry));
+ }
+ } catch (IOException | IllegalStateException e) {
+ safeComplete(clientId, emitter);
+ } catch (Exception e) {
+ log.error("Failed to send to client {}: {}", clientId,
e.getMessage());
+ safeComplete(clientId, emitter);
+ }
+ }
+
+ private void safeComplete(Long clientId, SseEmitter emitter) {
+ try {
+ emitter.complete();
+ } catch (Exception ignored) {
+ }
+ removeEmitter(clientId);
+ }
+
+ private List<LogEntry> filterLogs(List<LogEntry> logs,
LogSseFilterCriteria filters) {
+ if (filters == null) {
+ return logs;
+ }
+ List<LogEntry> filtered = new ArrayList<>();
+ for (LogEntry log : logs) {
+ if (filters.matches(log)) {
+ filtered.add(log);
+ }
+ }
+ return filtered;
}
private void removeEmitter(Long clientId) {
emitters.remove(clientId);
}
-
+
+ public long getQueueSize() {
+ return queueSize.get();
+ }
+
/**
- * SSE subscriber
+ * SseSubscriber for SseEmitter and LogSseFilterCriteria
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class SseSubscriber {
- /**
- * The SSE emitter for streaming log events
- */
private SseEmitter emitter;
- /**
- * The filters for streaming log events
- */
private LogSseFilterCriteria filters;
}
}
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/service/impl/OtlpLogProtocolAdapter.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/service/impl/OtlpLogProtocolAdapter.java
index 27ca1a24ef..53980f6614 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/service/impl/OtlpLogProtocolAdapter.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/service/impl/OtlpLogProtocolAdapter.java
@@ -69,13 +69,9 @@ public class OtlpLogProtocolAdapter implements
LogProtocolAdapter {
// Extract LogEntry instances from the request
List<LogEntry> logEntries = extractLogEntries(request);
log.debug("Successfully extracted {} log entries from OTLP payload
{}", logEntries.size(), content);
-
- logEntries.forEach(entry -> {
- commonDataQueue.sendLogEntry(entry);
- logSseManager.broadcast(entry);
- log.info("Log entry sent to queue: {}", entry);
- });
-
+ commonDataQueue.sendLogEntryToStorageBatch(logEntries);
+ commonDataQueue.sendLogEntryToAlertBatch(logEntries);
+ logEntries.forEach(logSseManager::broadcast);
} catch (InvalidProtocolBufferException e) {
log.error("Failed to parse OTLP log payload: {}", e.getMessage());
throw new IllegalArgumentException("Invalid OTLP log content", e);
diff --git
a/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/notice/LogSseManagerTest.java
b/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/notice/LogSseManagerTest.java
index 09696e3dc1..3f2c39ebeb 100644
---
a/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/notice/LogSseManagerTest.java
+++
b/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/notice/LogSseManagerTest.java
@@ -20,19 +20,21 @@
package org.apache.hertzbeat.log.notice;
import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Spy;
-import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -41,14 +43,21 @@ import static org.mockito.Mockito.verify;
/**
* Unit tests for {@link LogSseManager}.
*/
-@ExtendWith(MockitoExtension.class)
class LogSseManagerTest {
- @Spy
private LogSseManager logSseManager;
-
private static final Long CLIENT_ID = 1L;
+ @BeforeEach
+ void setUp() {
+ logSseManager = new LogSseManager();
+ }
+
+ @AfterEach
+ void tearDown() {
+ logSseManager.shutdown();
+ }
+
@Test
void shouldCreateAndStoreEmitter() {
// When: Creating a new emitter for a client
@@ -73,12 +82,14 @@ class LogSseManagerTest {
// When: An "INFO" log is broadcast
logSseManager.broadcast(infoLog);
- // Then: The log should be sent to the client
- verify(mockEmitter).send(any(SseEmitter.SseEventBuilder.class));
+ // Then: The log should be sent to the client (wait for batch
processing)
+ await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() ->
+ verify(mockEmitter,
atLeastOnce()).send(any(SseEmitter.SseEventBuilder.class))
+ );
}
@Test
- void shouldNotBroadcastLogWhenFilterDoesNotMatch() throws IOException {
+ void shouldNotBroadcastLogWhenFilterDoesNotMatch() throws IOException,
InterruptedException {
// Given: A client with a filter for "ERROR" logs
LogSseFilterCriteria filters = new LogSseFilterCriteria();
filters.setSeverityText("ERROR");
@@ -90,6 +101,9 @@ class LogSseManagerTest {
// When: An "INFO" log is broadcast
logSseManager.broadcast(infoLog);
+ // Wait for batch processing
+ Thread.sleep(300);
+
// Then: The log should NOT be sent to the client
verify(mockEmitter,
never()).send(any(SseEmitter.SseEventBuilder.class));
}
@@ -105,12 +119,14 @@ class LogSseManagerTest {
// When: Any log is broadcast
logSseManager.broadcast(anyLog);
- // Then: The log should be sent to the client
- verify(mockEmitter).send(any(SseEmitter.SseEventBuilder.class));
+ // Then: The log should be sent to the client (wait for batch
processing)
+ await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() ->
+ verify(mockEmitter,
atLeastOnce()).send(any(SseEmitter.SseEventBuilder.class))
+ );
}
@Test
- void shouldBroadcastOnlyToMatchingSubscribers() throws IOException {
+ void shouldBroadcastOnlyToMatchingSubscribers() throws IOException,
InterruptedException {
// Given: Two clients with different filters
LogSseFilterCriteria infoFilter = new LogSseFilterCriteria();
infoFilter.setSeverityText("INFO");
@@ -127,8 +143,12 @@ class LogSseManagerTest {
// When: An "INFO" log is broadcast
logSseManager.broadcast(infoLog);
+ // Wait for batch processing
+ await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() ->
+ verify(infoEmitter,
atLeastOnce()).send(any(SseEmitter.SseEventBuilder.class))
+ );
+
// Then: The log is sent only to the client subscribed to "INFO" logs
- verify(infoEmitter).send(any(SseEmitter.SseEventBuilder.class));
verify(errorEmitter,
never()).send(any(SseEmitter.SseEventBuilder.class));
}
@@ -146,8 +166,10 @@ class LogSseManagerTest {
logSseManager.broadcast(log);
// Then: The failing emitter should be completed and removed
- verify(mockEmitter).complete();
- assertFalse(logSseManager.getEmitters().containsKey(CLIENT_ID));
+ await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() -> {
+ verify(mockEmitter).complete();
+ assertFalse(logSseManager.getEmitters().containsKey(CLIENT_ID));
+ });
}
/**
diff --git
a/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/service/impl/OtlpLogProtocolAdapterTest.java
b/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/service/impl/OtlpLogProtocolAdapterTest.java
index 4f72671e8e..fa136db240 100644
---
a/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/service/impl/OtlpLogProtocolAdapterTest.java
+++
b/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/service/impl/OtlpLogProtocolAdapterTest.java
@@ -44,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@@ -85,12 +86,16 @@ class OtlpLogProtocolAdapterTest {
adapter.ingest(otlpPayload);
- ArgumentCaptor<LogEntry> logEntryCaptor =
ArgumentCaptor.forClass(LogEntry.class);
- verify(commonDataQueue,
times(1)).sendLogEntry(logEntryCaptor.capture());
- verify(logSseManager, times(1)).broadcast(logEntryCaptor.capture());
+ ArgumentCaptor<List<LogEntry>> listCaptor =
ArgumentCaptor.forClass(List.class);
+ verify(commonDataQueue,
times(1)).sendLogEntryToStorageBatch(listCaptor.capture());
+ verify(commonDataQueue, times(1)).sendLogEntryToAlertBatch(anyList());
+ verify(logSseManager, times(1)).broadcast(any(LogEntry.class));
+
+ List<LogEntry> capturedList = listCaptor.getValue();
+ assertNotNull(capturedList);
+ assertEquals(1, capturedList.size());
- LogEntry capturedEntry = logEntryCaptor.getValue();
- assertNotNull(capturedEntry);
+ LogEntry capturedEntry = capturedList.get(0);
assertEquals("test-service",
capturedEntry.getResource().get("service_name"));
assertEquals("test-version",
capturedEntry.getResource().get("service_version"));
assertEquals("test-scope",
capturedEntry.getInstrumentationScope().getName());
@@ -106,8 +111,13 @@ class OtlpLogProtocolAdapterTest {
adapter.ingest(otlpPayload);
- verify(commonDataQueue, times(2)).sendLogEntry(any(LogEntry.class));
+ ArgumentCaptor<List<LogEntry>> listCaptor =
ArgumentCaptor.forClass(List.class);
+ verify(commonDataQueue,
times(1)).sendLogEntryToStorageBatch(listCaptor.capture());
+ verify(commonDataQueue, times(1)).sendLogEntryToAlertBatch(anyList());
verify(logSseManager, times(2)).broadcast(any(LogEntry.class));
+
+ List<LogEntry> capturedList = listCaptor.getValue();
+ assertEquals(2, capturedList.size());
}
@Test
@@ -116,15 +126,16 @@ class OtlpLogProtocolAdapterTest {
adapter.ingest(otlpPayload);
- verify(commonDataQueue, times(1)).sendLogEntry(any(LogEntry.class));
+ ArgumentCaptor<List<LogEntry>> listCaptor =
ArgumentCaptor.forClass(List.class);
+ verify(commonDataQueue,
times(1)).sendLogEntryToStorageBatch(listCaptor.capture());
+ verify(commonDataQueue, times(1)).sendLogEntryToAlertBatch(anyList());
verify(logSseManager, times(1)).broadcast(any(LogEntry.class));
- ArgumentCaptor<LogEntry> logEntryCaptor =
ArgumentCaptor.forClass(LogEntry.class);
- verify(commonDataQueue).sendLogEntry(logEntryCaptor.capture());
-
- LogEntry capturedEntry = logEntryCaptor.getValue();
- assertNotNull(capturedEntry);
+ List<LogEntry> capturedList = listCaptor.getValue();
+ assertNotNull(capturedList);
+ assertEquals(1, capturedList.size());
+ LogEntry capturedEntry = capturedList.get(0);
Map<String, Object> attributes = capturedEntry.getAttributes();
assertEquals("string_value", attributes.get("string_attr"));
assertEquals(true, attributes.get("bool_attr"));
@@ -145,12 +156,15 @@ class OtlpLogProtocolAdapterTest {
adapter.ingest(otlpPayload);
- verify(commonDataQueue, times(1)).sendLogEntry(any(LogEntry.class));
+ ArgumentCaptor<List<LogEntry>> listCaptor =
ArgumentCaptor.forClass(List.class);
+ verify(commonDataQueue,
times(1)).sendLogEntryToStorageBatch(listCaptor.capture());
+ verify(commonDataQueue, times(1)).sendLogEntryToAlertBatch(anyList());
+ verify(logSseManager, times(1)).broadcast(any(LogEntry.class));
- ArgumentCaptor<LogEntry> logEntryCaptor =
ArgumentCaptor.forClass(LogEntry.class);
- verify(commonDataQueue).sendLogEntry(logEntryCaptor.capture());
+ List<LogEntry> capturedList = listCaptor.getValue();
+ assertEquals(1, capturedList.size());
- LogEntry capturedEntry = logEntryCaptor.getValue();
+ LogEntry capturedEntry = capturedList.get(0);
assertEquals("1234567890abcdef1234567890abcdef",
capturedEntry.getTraceId());
assertEquals("1234567890abcdef", capturedEntry.getSpanId());
assertEquals(1, capturedEntry.getTraceFlags());
@@ -170,7 +184,15 @@ class OtlpLogProtocolAdapterTest {
adapter.ingest(otlpPayload);
- verifyNoInteractions(commonDataQueue, logSseManager);
+ ArgumentCaptor<List<LogEntry>> listCaptor =
ArgumentCaptor.forClass(List.class);
+ verify(commonDataQueue,
times(1)).sendLogEntryToStorageBatch(listCaptor.capture());
+ verify(commonDataQueue, times(1)).sendLogEntryToAlertBatch(anyList());
+
+ List<LogEntry> capturedList = listCaptor.getValue();
+ assertNotNull(capturedList);
+ assertEquals(0, capturedList.size());
+
+ verifyNoInteractions(logSseManager);
}
private String createValidOtlpLogPayload() throws Exception {
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
index f0081e47ab..177b337344 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
@@ -17,6 +17,8 @@
package org.apache.hertzbeat.warehouse.store;
+import java.util.List;
+import java.util.Optional;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import lombok.extern.slf4j.Slf4j;
@@ -36,8 +38,6 @@ import
org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataWriter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
-import java.util.Optional;
-
/**
* dispatch storage metrics data
*/
@@ -51,6 +51,7 @@ public class DataStorageDispatch {
private final RealTimeDataWriter realTimeDataWriter;
private final Optional<HistoryDataWriter> historyDataWriter;
private final PluginRunner pluginRunner;
+ private static final int LOG_BATCH_SIZE = 1000;
@PersistenceContext
private EntityManager entityManager;
@@ -108,16 +109,16 @@ public class DataStorageDispatch {
Thread.currentThread().setName("warehouse-log-data-storage");
while (!Thread.currentThread().isInterrupted()) {
try {
- LogEntry logEntry =
commonDataQueue.pollLogEntryToStorage();
- if (logEntry == null) {
+ List<LogEntry> logEntries =
commonDataQueue.pollLogEntryToStorageBatch(LOG_BATCH_SIZE);
+ if (logEntries == null || logEntries.isEmpty()) {
continue;
}
backoff.reset();
historyDataWriter.ifPresent(dataWriter -> {
try {
- dataWriter.saveLogData(logEntry);
+ dataWriter.saveLogDataBatch(logEntries);
} catch (Exception e) {
- log.error("Failed to save log entry: {}",
e.getMessage(), e);
+ log.error("Failed to save log entries batch: {}",
e.getMessage(), e);
}
});
} catch (InterruptedException interruptedException) {
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataWriter.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataWriter.java
index f26b4898c6..e76cd24aa4 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataWriter.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataWriter.java
@@ -54,4 +54,17 @@ public interface HistoryDataWriter {
default boolean batchDeleteLogs(List<Long> timeUnixNanos) {
throw new UnsupportedOperationException("batch delete logs is not
supported");
}
+
+ /**
+ * Batch save log data
+ * @param logEntries list of log entries
+ */
+ default void saveLogDataBatch(List<LogEntry> logEntries) {
+ if (logEntries == null || logEntries.isEmpty()) {
+ return;
+ }
+ for (LogEntry logEntry : logEntries) {
+ saveLogData(logEntry);
+ }
+ }
}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
index 6d6c1d1a2a..c2834f5ab9 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
@@ -92,6 +92,7 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
private static final String LOG_TABLE_NAME = "hertzbeat_logs";
private static final String LABEL_KEY_START_TIME = "start";
private static final String LABEL_KEY_END_TIME = "end";
+ private static final int LOG_BATCH_SIZE = 500;
private GreptimeDB greptimeDb;
@@ -767,4 +768,67 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
}
}
+ @Override
+ public void saveLogDataBatch(List<LogEntry> logEntries) {
+ if (!isServerAvailable() || logEntries == null ||
logEntries.isEmpty()) {
+ return;
+ }
+
+ int total = logEntries.size();
+ for (int i = 0; i < total; i += LOG_BATCH_SIZE) {
+ int end = Math.min(i + LOG_BATCH_SIZE, total);
+ List<LogEntry> batch = logEntries.subList(i, end);
+ doSaveLogBatch(batch);
+ }
+ }
+
+ private void doSaveLogBatch(List<LogEntry> logEntries) {
+ try {
+ TableSchema.Builder tableSchemaBuilder =
TableSchema.newBuilder(LOG_TABLE_NAME);
+ tableSchemaBuilder.addTimestamp("time_unix_nano",
DataType.TimestampNanosecond)
+ .addField("observed_time_unix_nano",
DataType.TimestampNanosecond)
+ .addField("severity_number", DataType.Int32)
+ .addField("severity_text", DataType.String)
+ .addField("body", DataType.Json)
+ .addField("trace_id", DataType.String)
+ .addField("span_id", DataType.String)
+ .addField("trace_flags", DataType.Int32)
+ .addField("attributes", DataType.Json)
+ .addField("resource", DataType.Json)
+ .addField("instrumentation_scope", DataType.Json)
+ .addField("dropped_attributes_count", DataType.Int32);
+
+ Table table = Table.from(tableSchemaBuilder.build());
+
+ for (LogEntry logEntry : logEntries) {
+ Object[] values = new Object[] {
+ logEntry.getTimeUnixNano() != null ?
logEntry.getTimeUnixNano() : System.nanoTime(),
+ logEntry.getObservedTimeUnixNano() != null ?
logEntry.getObservedTimeUnixNano() : System.nanoTime(),
+ logEntry.getSeverityNumber(),
+ logEntry.getSeverityText(),
+ JsonUtil.toJson(logEntry.getBody()),
+ logEntry.getTraceId(),
+ logEntry.getSpanId(),
+ logEntry.getTraceFlags(),
+ JsonUtil.toJson(logEntry.getAttributes()),
+ JsonUtil.toJson(logEntry.getResource()),
+ JsonUtil.toJson(logEntry.getInstrumentationScope()),
+ logEntry.getDroppedAttributesCount()
+ };
+ table.addRow(values);
+ }
+
+ CompletableFuture<Result<WriteOk, Err>> writeFuture =
greptimeDb.write(table);
+ Result<WriteOk, Err> result = writeFuture.get(10,
TimeUnit.SECONDS);
+
+ if (result.isOk()) {
+ log.debug("[warehouse greptime-log] Batch write {} logs
successful", logEntries.size());
+ } else {
+ log.warn("[warehouse greptime-log] Batch write failed: {}",
result.getErr());
+ }
+ } catch (Exception e) {
+ log.error("[warehouse greptime-log] Error saving log entries
batch", e);
+ }
+ }
+
}
diff --git a/material/licenses/LICENSE b/material/licenses/LICENSE
index a6be59d0fe..85eeac878b 100644
--- a/material/licenses/LICENSE
+++ b/material/licenses/LICENSE
@@ -440,6 +440,7 @@ The text of each license is the standard Apache 2.0 license.
https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.9.3
Apache-2.0
https://mvnrepository.com/artifact/io.opentelemetry.proto/opentelemetry-proto/1.7.0-alpha
Apache-2.0
https://mvnrepository.com/artifact/org.questdb/questdb Apache-2.0
+ https://mvnrepository.com/artifact/org.awaitility/awaitility/4.2.0
Apache-2.0
========================================================================
BSD-2-Clause licenses
diff --git a/web-app/src/app/routes/log/log-stream/log-stream.component.html
b/web-app/src/app/routes/log/log-stream/log-stream.component.html
index 2e3d3e906d..3a41193045 100644
--- a/web-app/src/app/routes/log/log-stream/log-stream.component.html
+++ b/web-app/src/app/routes/log/log-stream/log-stream.component.html
@@ -159,47 +159,44 @@
</div>
</div>
- <div class="log-container" #logContainer [class.paused]="isPaused">
+ <div class="log-container" [class.paused]="isPaused">
<!-- Empty State -->
<div *ngIf="!isConnecting && logEntries.length === 0"
class="empty-state">
<nz-empty [nzNotFoundContent]="'log.stream.no-logs' | i18n">
</nz-empty>
</div>
- <!-- Log Entries -->
- <div
- *ngFor="let logEntry of logEntries; trackBy: trackByLogEntry"
- class="log-entry"
- [class.new-entry]="logEntry.isNew"
- (click)="showLogDetails(logEntry)"
- >
- <div class="log-content">
- <div class="log-meta">
- <nz-tag
[nzColor]="getSeverityColor(logEntry.original.severityNumber)"
class="severity-tag">
- {{ logEntry.original.severityText || ('log.stream.unknown' |
i18n) }}
- </nz-tag>
-
- <span class="timestamp">
- {{ formatTimestamp(logEntry.timestamp!) }}
- </span>
- </div>
-
- <div class="log-message">
- {{ getLogEntryJson(logEntry.original) }}
- </div>
-
- <div class="log-actions">
- <button
- nz-button
- nzSize="small"
- nzType="text"
- (click)="$event.stopPropagation();
copyToClipboard(getLogEntryJson(logEntry.original))"
- [nz-tooltip]="'log.stream.copy-message' | i18n"
- >
- <i nz-icon nzType="copy"></i>
- </button>
+ <!-- Log Entries with Virtual Scroll -->
+ <cdk-virtual-scroll-viewport itemSize="40"
class="virtual-scroll-viewport">
+ <div *cdkVirtualFor="let logEntry of logEntries; trackBy:
trackByLogEntry" class="log-entry" (click)="showLogDetails(logEntry)">
+ <div class="log-content">
+ <div class="log-meta">
+ <nz-tag [nzColor]="logEntry.severityColor" class="severity-tag">
+ {{ logEntry.original.severityText || ('log.stream.unknown' |
i18n) }}
+ </nz-tag>
+
+ <span class="timestamp">
+ {{ logEntry.timestamp | date : 'yyyy-MM-dd HH:mm:ss.SSS' }}
+ </span>
+ </div>
+
+ <div class="log-message">
+ {{ logEntry.displayText }}
+ </div>
+
+ <div class="log-actions">
+ <button
+ nz-button
+ nzSize="small"
+ nzType="text"
+ (click)="$event.stopPropagation();
copyToClipboard(logEntry.displayText)"
+ [nz-tooltip]="'log.stream.copy-message' | i18n"
+ >
+ <i nz-icon nzType="copy"></i>
+ </button>
+ </div>
</div>
</div>
- </div>
+ </cdk-virtual-scroll-viewport>
</div>
</nz-card>
</div>
@@ -219,13 +216,13 @@
<div class="basic-info">
<div class="info-row">
<span class="info-label">{{ 'log.stream.severity' | i18n }}</span>
- <nz-tag
[nzColor]="getSeverityColor(selectedLogEntry.original.severityNumber)">
+ <nz-tag [nzColor]="selectedLogEntry.severityColor">
{{ selectedLogEntry.original.severityText ||
('log.stream.unknown' | i18n) }}
</nz-tag>
</div>
<div class="info-row">
<span class="info-label">{{ 'log.stream.timestamp' | i18n }}</span>
- <span>{{ formatTimestamp(selectedLogEntry.timestamp!) }}</span>
+ <span>{{ selectedLogEntry.timestamp | date : 'yyyy-MM-dd
HH:mm:ss.SSS' }}</span>
</div>
<div class="info-row" *ngIf="selectedLogEntry.original.traceId">
<span class="info-label">{{ 'log.stream.trace-id-full' | i18n
}}</span>
diff --git a/web-app/src/app/routes/log/log-stream/log-stream.component.less
b/web-app/src/app/routes/log/log-stream/log-stream.component.less
index 6b63b05096..eaa26d1258 100644
--- a/web-app/src/app/routes/log/log-stream/log-stream.component.less
+++ b/web-app/src/app/routes/log/log-stream/log-stream.component.less
@@ -123,8 +123,7 @@
}
.log-container {
- max-height: 60vh;
- overflow-y: auto;
+ height: 60vh;
background: @common-background-color;
position: relative;
margin-top: 24px;
@@ -133,6 +132,15 @@
opacity: 0.8;
}
+ .virtual-scroll-viewport {
+ height: 100%;
+ width: 100%;
+ overflow-x: hidden;
+ ::ng-deep .cdk-virtual-scroll-content-wrapper {
+ max-width: 100%;
+ }
+ }
+
.loading-state {
display: flex;
flex-direction: column;
@@ -156,11 +164,13 @@
padding: 0px 12px;
background: @common-background-color;
cursor: pointer;
-
- &.new-entry {
- background: #e6f7ff;
- border-left: 4px solid #1890ff;
- }
+ height: 40px;
+ display: flex;
+ align-items: center;
+ width: 100%;
+ max-width: 100%;
+ box-sizing: border-box;
+ overflow: hidden;
&:hover {
background: #fafafa;
@@ -170,11 +180,13 @@
border-bottom: none;
}
- .log-content {
- display: flex;
- align-items: center;
- gap: 12px;
- padding: 4px 0;
+ .log-content {
+ display: flex;
+ align-items: center;
+ gap: 12px;
+ padding: 4px 0;
+ flex: 1;
+ min-width: 0;
.log-meta {
display: flex;
diff --git a/web-app/src/app/routes/log/log-stream/log-stream.component.ts
b/web-app/src/app/routes/log/log-stream/log-stream.component.ts
index 707009c73b..ce1709c366 100644
--- a/web-app/src/app/routes/log/log-stream/log-stream.component.ts
+++ b/web-app/src/app/routes/log/log-stream/log-stream.component.ts
@@ -17,8 +17,19 @@
* under the License.
*/
+import { ScrollingModule, CdkVirtualScrollViewport } from
'@angular/cdk/scrolling';
import { CommonModule } from '@angular/common';
-import { Component, Inject, OnDestroy, OnInit, ViewChild, ElementRef,
AfterViewInit } from '@angular/core';
+import {
+ Component,
+ Inject,
+ OnDestroy,
+ OnInit,
+ ViewChild,
+ AfterViewInit,
+ ChangeDetectionStrategy,
+ ChangeDetectorRef,
+ NgZone
+} from '@angular/core';
import { FormsModule } from '@angular/forms';
import { I18NService } from '@core';
import { ALAIN_I18N_TOKEN } from '@delon/theme';
@@ -40,8 +51,9 @@ import { LogEntry } from '../../../pojo/LogEntry';
interface ExtendedLogEntry {
original: LogEntry;
- isNew?: boolean;
- timestamp?: Date;
+ timestamp: Date;
+ displayText: string;
+ severityColor: string;
}
@Component({
@@ -62,10 +74,12 @@ interface ExtendedLogEntry {
NzAlertModule,
NzEmptyModule,
NzModalModule,
- NzDividerComponent
+ NzDividerComponent,
+ ScrollingModule
],
templateUrl: './log-stream.component.html',
- styleUrl: './log-stream.component.less'
+ styleUrl: './log-stream.component.less',
+ changeDetection: ChangeDetectionStrategy.OnPush
})
export class LogStreamComponent implements OnInit, OnDestroy, AfterViewInit {
// SSE connection and state
@@ -73,10 +87,11 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
isConnected: boolean = false;
isConnecting: boolean = false;
- // Log data
+ // Log data - use ring buffer approach
logEntries: ExtendedLogEntry[] = [];
- maxLogEntries: number = 1000;
+ maxLogEntries: number = 10000;
isPaused: boolean = false;
+ displayedLogCount: number = 0;
// Filter properties
filterSeverityNumber: string = '';
@@ -94,13 +109,19 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
// Auto scroll state
userScrolled: boolean = false;
private scrollTimeout: any;
- private scrollDebounceTimeout: any;
- private isNearBottom: boolean = true;
+ private isNearTop: boolean = true;
+
+ // Batch processing for high TPS - use requestAnimationFrame
+ private pendingLogs: ExtendedLogEntry[] = [];
+ private rafId: number | null = null;
+ private lastFlushTime: number = 0;
+ private readonly MIN_FLUSH_INTERVAL = 200; // Minimum 200ms between flushes
+ private readonly MAX_PENDING_LOGS = 1000; // Drop logs if buffer exceeds this
// ViewChild for log container
- @ViewChild('logContainer', { static: false }) logContainerRef!: ElementRef;
+ @ViewChild(CdkVirtualScrollViewport) viewport!: CdkVirtualScrollViewport;
- constructor(@Inject(ALAIN_I18N_TOKEN) private i18nSvc: I18NService) {}
+ constructor(@Inject(ALAIN_I18N_TOKEN) private i18nSvc: I18NService, private
cdr: ChangeDetectorRef, private ngZone: NgZone) {}
ngOnInit(): void {
this.connectToLogStream();
@@ -113,6 +134,9 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
ngOnDestroy(): void {
this.disconnectFromLogStream();
this.cleanupScrollListener();
+ if (this.rafId) {
+ cancelAnimationFrame(this.rafId);
+ }
}
onReconnect(): void {
@@ -135,25 +159,34 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
this.eventSource = new EventSource(url);
this.eventSource.onopen = () => {
- this.isConnected = true;
- this.isConnecting = false;
+ this.ngZone.run(() => {
+ this.isConnected = true;
+ this.isConnecting = false;
+ this.cdr.markForCheck();
+ });
};
- this.eventSource.addEventListener('LOG_EVENT', (evt: MessageEvent) => {
- if (!this.isPaused) {
- try {
- const logEntry: LogEntry = JSON.parse(evt.data);
- this.addLogEntry(logEntry);
- } catch (error) {
- console.error('Error parsing log data:', error);
+ // Run outside Angular zone to prevent change detection on every message
+ this.ngZone.runOutsideAngular(() => {
+ this.eventSource.addEventListener('LOG_EVENT', (evt: MessageEvent) => {
+ if (!this.isPaused) {
+ try {
+ const logEntry: LogEntry = JSON.parse(evt.data);
+ this.queueLogEntry(logEntry);
+ } catch (error) {
+ // Silently ignore parse errors in high TPS scenario
+ console.error(error);
+ }
}
- }
+ });
});
this.eventSource.onerror = error => {
- console.error('Log stream connection error:', error);
- this.isConnected = false;
- this.isConnecting = false;
+ this.ngZone.run(() => {
+ this.isConnected = false;
+ this.isConnecting = false;
+ this.cdr.markForCheck();
+ });
// Auto-reconnect after 5 seconds
setTimeout(() => {
@@ -198,47 +231,94 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
return params.toString();
}
- private addLogEntry(logEntry: LogEntry): void {
+ private queueLogEntry(logEntry: LogEntry): void {
+ // Drop logs if buffer is full (backpressure)
+ if (this.pendingLogs.length >= this.MAX_PENDING_LOGS) {
+ return;
+ }
+
+ // Pre-compute everything to minimize work during render
const extendedEntry: ExtendedLogEntry = {
original: logEntry,
- isNew: true,
- timestamp: logEntry.timeUnixNano ? new Date(logEntry.timeUnixNano /
1000000) : new Date()
+ timestamp: logEntry.timeUnixNano ? new Date(logEntry.timeUnixNano /
1000000) : new Date(),
+ displayText: this.formatLogDisplay(logEntry),
+ severityColor: this.computeSeverityColor(logEntry.severityNumber)
};
- this.logEntries.unshift(extendedEntry);
+ this.pendingLogs.push(extendedEntry);
- // Limit the number of log entries
- if (this.logEntries.length > this.maxLogEntries) {
- this.logEntries = this.logEntries.slice(0, this.maxLogEntries);
+ // Schedule flush using requestAnimationFrame for smooth rendering
+ if (!this.rafId) {
+ this.rafId = requestAnimationFrame(() => this.flushPendingLogs());
}
+ }
- // Remove new indicator after animation
- setTimeout(() => {
- const index = this.logEntries.findIndex(entry => entry ===
extendedEntry);
- if (index !== -1) {
- this.logEntries[index].isNew = false;
- }
- }, 1000);
+ private formatLogDisplay(logEntry: LogEntry): string {
+ return JSON.stringify(logEntry);
+ }
+
+ private computeSeverityColor(severityNumber: number | undefined): string {
+ if (!severityNumber) return 'default';
+ if (severityNumber < 5) return 'default'; // 1-4
+ if (severityNumber < 9) return 'blue'; // 5-8
+ if (severityNumber < 13) return 'green'; // 9-12
+ if (severityNumber < 17) return 'orange'; // 13-16
+ if (severityNumber < 21) return 'red'; // 17-20
+ if (severityNumber < 25) return 'volcano'; // 21-24
+ return 'default';
+ }
+
+ private flushPendingLogs(): void {
+ this.rafId = null;
+
+ const now = performance.now();
+ if (now - this.lastFlushTime < this.MIN_FLUSH_INTERVAL) {
+ // Too soon, reschedule
+ this.rafId = requestAnimationFrame(() => this.flushPendingLogs());
+ return;
+ }
- // Auto scroll to top if enabled and user hasn't scrolled away
- if (!this.userScrolled) {
- this.scheduleAutoScroll();
+ if (this.pendingLogs.length === 0) {
+ return;
}
+
+ this.lastFlushTime = now;
+
+ // Get pending logs and clear
+ const newEntries = this.pendingLogs;
+ this.pendingLogs = [];
+
+ // Reverse in place for performance
+ newEntries.reverse();
+
+ // Run inside Angular zone for change detection
+ this.ngZone.run(() => {
+ // Create new array reference for virtual scroll
+ let updated: ExtendedLogEntry[];
+
+ if (this.logEntries.length + newEntries.length <= this.maxLogEntries) {
+ updated = [...newEntries, ...this.logEntries];
+ } else {
+ // Truncate old entries
+ const keepCount = Math.max(0, this.maxLogEntries - newEntries.length);
+ updated = [...newEntries, ...this.logEntries.slice(0, keepCount)];
+ }
+
+ this.logEntries = updated;
+
+ // Auto scroll to top if enabled and user hasn't scrolled away
+ if (!this.userScrolled && this.viewport) {
+ this.viewport.scrollToIndex(0);
+ }
+
+ this.cdr.markForCheck();
+ });
}
private setupScrollListener(): void {
- if (this.logContainerRef?.nativeElement) {
- const container = this.logContainerRef.nativeElement;
-
- container.addEventListener('scroll', () => {
- // Debounce scroll events for better performance
- if (this.scrollDebounceTimeout) {
- clearTimeout(this.scrollDebounceTimeout);
- }
-
- this.scrollDebounceTimeout = setTimeout(() => {
- this.handleScroll();
- }, 100);
+ if (this.viewport) {
+ this.viewport.elementScrolled().subscribe(() => {
+ this.handleScroll();
});
}
}
@@ -247,22 +327,18 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
if (this.scrollTimeout) {
clearTimeout(this.scrollTimeout);
}
- if (this.scrollDebounceTimeout) {
- clearTimeout(this.scrollDebounceTimeout);
- }
}
private handleScroll(): void {
- if (!this.logContainerRef?.nativeElement) return;
+ if (!this.viewport) return;
- const container = this.logContainerRef.nativeElement;
- const scrollTop = container.scrollTop;
+ const scrollTop = this.viewport.measureScrollOffset();
- // Check if user is near the top (within 20px for more precise detection)
- this.isNearBottom = scrollTop <= 20;
+ // Check if user is near the top
+ this.isNearTop = scrollTop <= 40;
// If user scrolls away from top, mark as user scrolled
- if (!this.isNearBottom) {
+ if (!this.isNearTop) {
this.userScrolled = true;
} else {
// If user scrolls back to top, reset the flag
@@ -283,23 +359,18 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
}
private performAutoScroll(): void {
- if (!this.logContainerRef?.nativeElement || this.userScrolled) {
+ if (!this.viewport || this.userScrolled) {
return;
}
- const container = this.logContainerRef.nativeElement;
-
- // Use smooth scroll for better UX
- container.scrollTo({
- top: 0,
- behavior: 'smooth'
- });
+ this.viewport.scrollToIndex(0, 'smooth');
}
// Event handlers
onApplyFilters(): void {
- this.logEntries = []; // Clear existing logs
- this.connectToLogStream(); // Reconnect with new filters
+ this.resetState();
+ this.connectToLogStream();
+ this.cdr.markForCheck();
}
onClearFilters(): void {
@@ -307,69 +378,48 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
this.filterSeverityText = '';
this.filterTraceId = '';
this.filterSpanId = '';
- this.logEntries = [];
+ this.resetState();
this.connectToLogStream();
+ this.cdr.markForCheck();
}
onTogglePause(): void {
this.isPaused = !this.isPaused;
+ this.cdr.markForCheck();
}
onClearLogs(): void {
- this.logEntries = [];
- this.userScrolled = false;
- this.isNearBottom = true;
+ this.resetState();
+ this.cdr.markForCheck();
}
onToggleFilters(): void {
this.showFilters = !this.showFilters;
+ this.cdr.markForCheck();
+ }
+
+ private resetState(): void {
+ this.logEntries = [];
+ this.pendingLogs = [];
+ this.displayedLogCount = 0;
+ if (this.rafId) {
+ cancelAnimationFrame(this.rafId);
+ this.rafId = null;
+ }
+ this.userScrolled = false;
+ this.isNearTop = true;
}
// Add method to manually scroll to top
scrollToTop(): void {
this.userScrolled = false;
- this.isNearBottom = true;
+ this.isNearTop = true;
this.scheduleAutoScroll();
}
// Utility methods
- getSeverityColor(severityNumber: number | undefined): string {
- if (!severityNumber) {
- return 'default';
- }
-
- // Based on OpenTelemetry specification:
- // 1-4: TRACE, 5-8: DEBUG, 9-12: INFO, 13-16: WARN, 17-20: ERROR, 21-24:
FATAL
- if (severityNumber >= 1 && severityNumber <= 4) {
- return 'default'; // TRACE
- } else if (severityNumber >= 5 && severityNumber <= 8) {
- return 'blue'; // DEBUG
- } else if (severityNumber >= 9 && severityNumber <= 12) {
- return 'green'; // INFO
- } else if (severityNumber >= 13 && severityNumber <= 16) {
- return 'orange'; // WARN
- } else if (severityNumber >= 17 && severityNumber <= 20) {
- return 'red'; // ERROR
- } else if (severityNumber >= 21 && severityNumber <= 24) {
- return 'volcano'; // FATAL
- } else {
- return 'default'; // Unknown
- }
- }
-
- formatTimestamp(timestamp: Date): string {
- return timestamp.toLocaleString();
- }
-
copyToClipboard(text: string): void {
- navigator.clipboard
- .writeText(text)
- .then(() => {
- console.log('Copied to clipboard');
- })
- .catch(err => {
- console.error('Failed to copy: ', err);
- });
+ navigator.clipboard.writeText(text).catch(() => {});
}
trackByLogEntry(index: number, logEntry: ExtendedLogEntry): any {
@@ -380,16 +430,19 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
showLogDetails(logEntry: ExtendedLogEntry): void {
this.selectedLogEntry = logEntry;
this.isModalVisible = true;
+ this.cdr.markForCheck();
}
handleModalOk(): void {
this.isModalVisible = false;
this.selectedLogEntry = null;
+ this.cdr.markForCheck();
}
handleModalCancel(): void {
this.isModalVisible = false;
this.selectedLogEntry = null;
+ this.cdr.markForCheck();
}
getLogEntryJson(logEntry: LogEntry): string {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]