Copilot commented on code in PR #3876:
URL: https://github.com/apache/hertzbeat/pull/3876#discussion_r2576702168
##########
web-app/src/app/routes/log/log-stream/log-stream.component.html:
##########
@@ -219,13 +221,13 @@
<div class="basic-info">
<div class="info-row">
<span class="info-label">{{ 'log.stream.severity' | i18n }}</span>
- <nz-tag
[nzColor]="getSeverityColor(selectedLogEntry.original.severityNumber)">
- {{ selectedLogEntry.original.severityText ||
('log.stream.unknown' | i18n) }}
+ <nz-tag [nzColor]="selectedLogEntry.severityColor">
+ {{ selectedLogEntry.original.severityText || 'UNKNOWN' }}
Review Comment:
[nitpick] Similar to line 179, hardcoding 'UNKNOWN' breaks i18n support. Use
`{{ selectedLogEntry.original.severityText || ('log.stream.unknown' | i18n) }}`
for consistency with the original implementation.
```suggestion
{{ selectedLogEntry.original.severityText ||
('log.stream.unknown' | i18n) }}
```
##########
web-app/src/app/routes/log/log-stream/log-stream.component.ts:
##########
@@ -135,25 +159,33 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
this.eventSource = new EventSource(url);
this.eventSource.onopen = () => {
- this.isConnected = true;
- this.isConnecting = false;
+ this.ngZone.run(() => {
+ this.isConnected = true;
+ this.isConnecting = false;
+ this.cdr.markForCheck();
+ });
};
- this.eventSource.addEventListener('LOG_EVENT', (evt: MessageEvent) => {
- if (!this.isPaused) {
- try {
- const logEntry: LogEntry = JSON.parse(evt.data);
- this.addLogEntry(logEntry);
- } catch (error) {
- console.error('Error parsing log data:', error);
+ // Run outside Angular zone to prevent change detection on every message
+ this.ngZone.runOutsideAngular(() => {
+ this.eventSource.addEventListener('LOG_EVENT', (evt: MessageEvent) => {
+ if (!this.isPaused) {
+ try {
+ const logEntry: LogEntry = JSON.parse(evt.data);
+ this.queueLogEntry(logEntry);
+ } catch (error) {
+ // Silently ignore parse errors in high TPS scenario
Review Comment:
Silently ignoring parse errors in high TPS scenarios could make debugging
difficult. At minimum, consider using `console.error` in development mode or
incrementing a counter to track dropped messages. This would help identify
issues with malformed log data without impacting performance.
##########
web-app/src/app/routes/log/log-stream/log-stream.component.ts:
##########
@@ -198,47 +230,95 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
return params.toString();
}
- private addLogEntry(logEntry: LogEntry): void {
+ private queueLogEntry(logEntry: LogEntry): void {
+ // Drop logs if buffer is full (backpressure)
+ if (this.pendingLogs.length >= this.MAX_PENDING_LOGS) {
+ return;
+ }
+
+ // Pre-compute everything to minimize work during render
const extendedEntry: ExtendedLogEntry = {
original: logEntry,
- isNew: true,
- timestamp: logEntry.timeUnixNano ? new Date(logEntry.timeUnixNano /
1000000) : new Date()
+ timestamp: logEntry.timeUnixNano ? new Date(logEntry.timeUnixNano /
1000000) : new Date(),
+ displayText: this.formatLogDisplay(logEntry),
+ severityColor: this.computeSeverityColor(logEntry.severityNumber)
};
- this.logEntries.unshift(extendedEntry);
+ this.pendingLogs.push(extendedEntry);
+ this.displayedLogCount++;
- // Limit the number of log entries
- if (this.logEntries.length > this.maxLogEntries) {
- this.logEntries = this.logEntries.slice(0, this.maxLogEntries);
+ // Schedule flush using requestAnimationFrame for smooth rendering
+ if (!this.rafId) {
+ this.rafId = requestAnimationFrame(() => this.flushPendingLogs());
}
+ }
- // Remove new indicator after animation
- setTimeout(() => {
- const index = this.logEntries.findIndex(entry => entry ===
extendedEntry);
- if (index !== -1) {
- this.logEntries[index].isNew = false;
- }
- }, 1000);
+ private formatLogDisplay(logEntry: LogEntry): string {
+ return JSON.stringify(logEntry);
+ }
+
+ private computeSeverityColor(severityNumber: number | undefined): string {
+ if (!severityNumber) return 'default';
+ if (severityNumber <= 4) return 'default';
+ if (severityNumber <= 8) return 'blue';
+ if (severityNumber <= 12) return 'green';
+ if (severityNumber <= 16) return 'orange';
+ if (severityNumber <= 20) return 'red';
+ if (severityNumber <= 24) return 'volcano';
Review Comment:
The severity color mapping logic is incorrect. The cascading if-else
structure will cause wrong color assignments. For example, `severityNumber = 5`
will match the first condition `<= 4` is false, then `<= 8` is true, returning
'blue'. However, looking at the original implementation, the ranges should be:
1-4 (TRACE/default), 5-8 (DEBUG/blue), 9-12 (INFO/green), 13-16 (WARN/orange),
17-20 (ERROR/red), 21-24 (FATAL/volcano).
The correct implementation should be:
```typescript
if (severityNumber >= 1 && severityNumber <= 4) return 'default';
if (severityNumber >= 5 && severityNumber <= 8) return 'blue';
// ... etc
```
Or use the cascading approach with corrected bounds:
```typescript
if (severityNumber < 5) return 'default';
if (severityNumber < 9) return 'blue';
if (severityNumber < 13) return 'green';
// ... etc
```
```suggestion
if (severityNumber < 5) return 'default'; // 1-4
if (severityNumber < 9) return 'blue'; // 5-8
if (severityNumber < 13) return 'green'; // 9-12
if (severityNumber < 17) return 'orange'; // 13-16
if (severityNumber < 21) return 'red'; // 17-20
if (severityNumber < 25) return 'volcano'; // 21-24
```
##########
hertzbeat-log/src/main/java/org/apache/hertzbeat/log/notice/LogSseManager.java:
##########
@@ -19,90 +19,185 @@
package org.apache.hertzbeat.log.notice;
+import jakarta.annotation.PreDestroy;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.log.LogEntry;
-import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * SSE manager for log
+ * SSE manager for log with batch processing support for high TPS scenarios
*/
@Component
@Slf4j
@Getter
public class LogSseManager {
+
+ private static final long BATCH_INTERVAL_MS = 200;
+ private static final int MAX_BATCH_SIZE = 1000;
+ private static final int MAX_QUEUE_SIZE = 10000;
+
private final Map<Long, SseSubscriber> emitters = new
ConcurrentHashMap<>();
+ private final Queue<LogEntry> logQueue = new ConcurrentLinkedQueue<>();
+ private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "sse-batch-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
+ private final ExecutorService senderPool = Executors.newCachedThreadPool(r
-> {
+ Thread t = new Thread(r, "sse-sender");
+ t.setDaemon(true);
+ return t;
+ });
+ private final AtomicLong queueSize = new AtomicLong(0);
+
+ public LogSseManager() {
+ scheduler.scheduleAtFixedRate(this::flushBatch, BATCH_INTERVAL_MS,
BATCH_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ scheduler.shutdown();
+ senderPool.shutdown();
+ try {
+ scheduler.awaitTermination(2, TimeUnit.SECONDS);
+ senderPool.awaitTermination(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ scheduler.shutdownNow();
+ senderPool.shutdownNow();
+ }
/**
* Create a new SSE emitter for a client with specified filters
- * @param clientId The unique identifier for the client
- * @param filters The filters to apply to the log data
- * @return The SSE emitter
*/
public SseEmitter createEmitter(Long clientId, LogSseFilterCriteria
filters) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
emitter.onCompletion(() -> removeEmitter(clientId));
emitter.onTimeout(() -> removeEmitter(clientId));
emitter.onError((ex) -> removeEmitter(clientId));
- SseSubscriber subscriber = new SseSubscriber(emitter, filters);
- emitters.put(clientId, subscriber);
+ emitters.put(clientId, new SseSubscriber(emitter, filters));
return emitter;
}
/**
- * Broadcast log data to all subscribers
- * @param logEntry The log data to broadcast
+ * Queue log entry for batch processing
*/
- @Async
public void broadcast(LogEntry logEntry) {
- emitters.forEach((clientId, subscriber) -> {
- try {
- // Check if the log entry matches the subscriber's filter
criteria
- if (subscriber.filters == null ||
subscriber.filters.matches(logEntry)) {
- subscriber.emitter.send(SseEmitter.event()
- .id(String.valueOf(System.currentTimeMillis()))
- .name("LOG_EVENT")
- .data(logEntry));
+ if (queueSize.get() >= MAX_QUEUE_SIZE) {
+ return;
+ }
+ logQueue.offer(logEntry);
+ queueSize.incrementAndGet();
Review Comment:
Race condition: The check-then-act pattern `if (queueSize.get() >=
MAX_QUEUE_SIZE)` followed by `logQueue.offer(logEntry)` and
`queueSize.incrementAndGet()` is not atomic. Multiple threads could pass the
check simultaneously before incrementing, causing the queue to exceed
MAX_QUEUE_SIZE. Consider using `compareAndSet` or checking the queue size after
increment:
```java
if (queueSize.incrementAndGet() > MAX_QUEUE_SIZE) {
queueSize.decrementAndGet();
return;
}
logQueue.offer(logEntry);
```
```suggestion
if (queueSize.incrementAndGet() > MAX_QUEUE_SIZE) {
queueSize.decrementAndGet();
return;
}
logQueue.offer(logEntry);
```
##########
web-app/src/app/routes/log/log-stream/log-stream.component.ts:
##########
@@ -135,25 +159,33 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
this.eventSource = new EventSource(url);
this.eventSource.onopen = () => {
- this.isConnected = true;
- this.isConnecting = false;
+ this.ngZone.run(() => {
+ this.isConnected = true;
+ this.isConnecting = false;
+ this.cdr.markForCheck();
+ });
};
- this.eventSource.addEventListener('LOG_EVENT', (evt: MessageEvent) => {
- if (!this.isPaused) {
- try {
- const logEntry: LogEntry = JSON.parse(evt.data);
- this.addLogEntry(logEntry);
- } catch (error) {
- console.error('Error parsing log data:', error);
+ // Run outside Angular zone to prevent change detection on every message
+ this.ngZone.runOutsideAngular(() => {
+ this.eventSource.addEventListener('LOG_EVENT', (evt: MessageEvent) => {
+ if (!this.isPaused) {
+ try {
Review Comment:
The `[class.new-entry]` binding references `logEntry.isNew` which has been
removed from the `ExtendedLogEntry` interface. This property no longer exists
and will always evaluate to `undefined`, causing the CSS class to never be
applied. Either remove this class binding entirely or implement a new mechanism
to identify newly added entries.
##########
hertzbeat-log/src/main/java/org/apache/hertzbeat/log/notice/LogSseManager.java:
##########
@@ -19,90 +19,185 @@
package org.apache.hertzbeat.log.notice;
+import jakarta.annotation.PreDestroy;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.log.LogEntry;
-import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * SSE manager for log
+ * SSE manager for log with batch processing support for high TPS scenarios
*/
@Component
@Slf4j
@Getter
public class LogSseManager {
+
+ private static final long BATCH_INTERVAL_MS = 200;
+ private static final int MAX_BATCH_SIZE = 1000;
+ private static final int MAX_QUEUE_SIZE = 10000;
+
private final Map<Long, SseSubscriber> emitters = new
ConcurrentHashMap<>();
+ private final Queue<LogEntry> logQueue = new ConcurrentLinkedQueue<>();
+ private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "sse-batch-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
+ private final ExecutorService senderPool = Executors.newCachedThreadPool(r
-> {
+ Thread t = new Thread(r, "sse-sender");
+ t.setDaemon(true);
+ return t;
+ });
Review Comment:
Using `Executors.newCachedThreadPool()` for SSE sending can create unbounded
threads under high load, potentially leading to resource exhaustion. In high
TPS scenarios with many subscribers, this could create hundreds or thousands of
threads. Consider using a fixed or bounded thread pool with a queue, e.g., `new
ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize))` to control resource usage.
##########
hertzbeat-log/src/test/java/org/apache/hertzbeat/log/notice/LogSseManagerTest.java:
##########
@@ -146,8 +166,25 @@ void shouldRemoveEmitterWhenBroadcastFails() throws
IOException {
logSseManager.broadcast(log);
// Then: The failing emitter should be completed and removed
- verify(mockEmitter).complete();
- assertFalse(logSseManager.getEmitters().containsKey(CLIENT_ID));
+ await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() -> {
+ verify(mockEmitter).complete();
+ assertFalse(logSseManager.getEmitters().containsKey(CLIENT_ID));
+ });
+ }
+
+ @Test
+ void shouldQueueLogsForBatchProcessing() {
+ // Given: A client
+ SseEmitter mockEmitter = mock(SseEmitter.class);
+ subscribeClient(CLIENT_ID, null, mockEmitter);
+
+ // When: Multiple logs are broadcast
+ for (int i = 0; i < 10; i++) {
+ logSseManager.broadcast(createLogEntry("INFO", "Message " + i));
+ }
+
+ // Then: Queue size should increase
+ assertTrue(logSseManager.getQueueSize() > 0 ||
logSseManager.getQueueSize() == 0); // May have been processed
Review Comment:
The assertion `assertTrue(logSseManager.getQueueSize() > 0 ||
logSseManager.getQueueSize() == 0)` is a tautology that always passes. This
test doesn't verify batch processing behavior meaningfully. Consider asserting
that the queue size is within expected bounds or that the batch was processed
within the expected time window using awaitility.
##########
web-app/src/app/routes/log/log-stream/log-stream.component.ts:
##########
@@ -40,8 +51,9 @@ import { LogEntry } from '../../../pojo/LogEntry';
interface ExtendedLogEntry {
original: LogEntry;
Review Comment:
[nitpick] The `timestamp` field is marked as required (non-optional) but
`logEntry.timeUnixNano` could be undefined. If `timeUnixNano` is not present, a
`new Date()` fallback is used, which is correct. However, the interface
declaration should reflect this guarantee more explicitly or add a comment
explaining that the timestamp is always initialized in the constructor.
```suggestion
original: LogEntry;
/**
* Always initialized to a valid Date object in the constructor/factory,
* either from logEntry.timeUnixNano or as a fallback to new Date().
*/
```
##########
hertzbeat-log/src/main/java/org/apache/hertzbeat/log/notice/LogSseManager.java:
##########
@@ -19,90 +19,185 @@
package org.apache.hertzbeat.log.notice;
+import jakarta.annotation.PreDestroy;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.log.LogEntry;
-import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * SSE manager for log
+ * SSE manager for log with batch processing support for high TPS scenarios
*/
@Component
@Slf4j
@Getter
public class LogSseManager {
+
+ private static final long BATCH_INTERVAL_MS = 200;
+ private static final int MAX_BATCH_SIZE = 1000;
+ private static final int MAX_QUEUE_SIZE = 10000;
+
private final Map<Long, SseSubscriber> emitters = new
ConcurrentHashMap<>();
+ private final Queue<LogEntry> logQueue = new ConcurrentLinkedQueue<>();
+ private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "sse-batch-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
+ private final ExecutorService senderPool = Executors.newCachedThreadPool(r
-> {
+ Thread t = new Thread(r, "sse-sender");
+ t.setDaemon(true);
+ return t;
+ });
+ private final AtomicLong queueSize = new AtomicLong(0);
+
+ public LogSseManager() {
+ scheduler.scheduleAtFixedRate(this::flushBatch, BATCH_INTERVAL_MS,
BATCH_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ scheduler.shutdown();
+ senderPool.shutdown();
+ try {
+ scheduler.awaitTermination(2, TimeUnit.SECONDS);
+ senderPool.awaitTermination(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ scheduler.shutdownNow();
+ senderPool.shutdownNow();
+ }
/**
* Create a new SSE emitter for a client with specified filters
- * @param clientId The unique identifier for the client
- * @param filters The filters to apply to the log data
- * @return The SSE emitter
*/
public SseEmitter createEmitter(Long clientId, LogSseFilterCriteria
filters) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
emitter.onCompletion(() -> removeEmitter(clientId));
emitter.onTimeout(() -> removeEmitter(clientId));
emitter.onError((ex) -> removeEmitter(clientId));
- SseSubscriber subscriber = new SseSubscriber(emitter, filters);
- emitters.put(clientId, subscriber);
+ emitters.put(clientId, new SseSubscriber(emitter, filters));
return emitter;
}
/**
- * Broadcast log data to all subscribers
- * @param logEntry The log data to broadcast
+ * Queue log entry for batch processing
*/
- @Async
public void broadcast(LogEntry logEntry) {
- emitters.forEach((clientId, subscriber) -> {
- try {
- // Check if the log entry matches the subscriber's filter
criteria
- if (subscriber.filters == null ||
subscriber.filters.matches(logEntry)) {
- subscriber.emitter.send(SseEmitter.event()
- .id(String.valueOf(System.currentTimeMillis()))
- .name("LOG_EVENT")
- .data(logEntry));
+ if (queueSize.get() >= MAX_QUEUE_SIZE) {
+ return;
+ }
+ logQueue.offer(logEntry);
+ queueSize.incrementAndGet();
Review Comment:
Method broadcast ignores exceptional return value of Queue<LogEntry>.offer.
```suggestion
// Optionally log a warning about dropping the log entry
log.warn("Log queue is full ({} entries), dropping log entry:
{}", queueSize.get(), logEntry);
return;
}
boolean offered = logQueue.offer(logEntry);
if (offered) {
queueSize.incrementAndGet();
} else {
// This should not happen with ConcurrentLinkedQueue, but log
just in case
log.warn("Failed to enqueue log entry: {}", logEntry);
}
```
##########
web-app/src/app/routes/log/log-stream/log-stream.component.ts:
##########
@@ -198,47 +230,95 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
return params.toString();
}
- private addLogEntry(logEntry: LogEntry): void {
+ private queueLogEntry(logEntry: LogEntry): void {
+ // Drop logs if buffer is full (backpressure)
+ if (this.pendingLogs.length >= this.MAX_PENDING_LOGS) {
+ return;
+ }
+
+ // Pre-compute everything to minimize work during render
const extendedEntry: ExtendedLogEntry = {
original: logEntry,
- isNew: true,
- timestamp: logEntry.timeUnixNano ? new Date(logEntry.timeUnixNano /
1000000) : new Date()
+ timestamp: logEntry.timeUnixNano ? new Date(logEntry.timeUnixNano /
1000000) : new Date(),
+ displayText: this.formatLogDisplay(logEntry),
+ severityColor: this.computeSeverityColor(logEntry.severityNumber)
};
- this.logEntries.unshift(extendedEntry);
+ this.pendingLogs.push(extendedEntry);
+ this.displayedLogCount++;
- // Limit the number of log entries
- if (this.logEntries.length > this.maxLogEntries) {
- this.logEntries = this.logEntries.slice(0, this.maxLogEntries);
+ // Schedule flush using requestAnimationFrame for smooth rendering
+ if (!this.rafId) {
+ this.rafId = requestAnimationFrame(() => this.flushPendingLogs());
}
+ }
- // Remove new indicator after animation
- setTimeout(() => {
- const index = this.logEntries.findIndex(entry => entry ===
extendedEntry);
- if (index !== -1) {
- this.logEntries[index].isNew = false;
- }
- }, 1000);
+ private formatLogDisplay(logEntry: LogEntry): string {
+ return JSON.stringify(logEntry);
+ }
+
+ private computeSeverityColor(severityNumber: number | undefined): string {
+ if (!severityNumber) return 'default';
+ if (severityNumber <= 4) return 'default';
+ if (severityNumber <= 8) return 'blue';
+ if (severityNumber <= 12) return 'green';
+ if (severityNumber <= 16) return 'orange';
+ if (severityNumber <= 20) return 'red';
+ if (severityNumber <= 24) return 'volcano';
+ return 'default';
+ }
+
+ private flushPendingLogs(): void {
+ this.rafId = null;
+
+ const now = performance.now();
+ if (now - this.lastFlushTime < this.MIN_FLUSH_INTERVAL) {
+ // Too soon, reschedule
+ this.rafId = requestAnimationFrame(() => this.flushPendingLogs());
+ return;
+ }
- // Auto scroll to top if enabled and user hasn't scrolled away
- if (!this.userScrolled) {
- this.scheduleAutoScroll();
+ if (this.pendingLogs.length === 0) {
+ return;
}
+
+ this.lastFlushTime = now;
+
+ // Get pending logs and clear
+ const newEntries = this.pendingLogs;
+ this.pendingLogs = [];
+
+ // Reverse in place for performance
+ newEntries.reverse();
+
+ // Run inside Angular zone for change detection
+ this.ngZone.run(() => {
+ // Create new array reference for virtual scroll
+ let updated: ExtendedLogEntry[];
+
+ if (this.logEntries.length + newEntries.length <= this.maxLogEntries) {
+ updated = [...newEntries, ...this.logEntries];
+ } else {
+ // Truncate old entries
+ const keepCount = Math.max(0, this.maxLogEntries - newEntries.length);
+ updated = [...newEntries, ...this.logEntries.slice(0, keepCount)];
Review Comment:
Calling `reverse()` mutates the array in place, which is efficient. However,
this reverses the order of `pendingLogs` which will be empty after this
operation since it was just reassigned to `newEntries`. This is fine, but the
comment "Reverse in place for performance" is slightly misleading since you're
reversing a local variable that's about to be spread. Consider clarifying the
comment or directly using `[...this.pendingLogs.reverse()]` pattern for clarity.
```suggestion
// Reverse log order for correct display (use non-mutating reversal for
clarity)
const reversedEntries = [...newEntries].reverse();
// Run inside Angular zone for change detection
this.ngZone.run(() => {
// Create new array reference for virtual scroll
let updated: ExtendedLogEntry[];
if (this.logEntries.length + reversedEntries.length <=
this.maxLogEntries) {
updated = [...reversedEntries, ...this.logEntries];
} else {
// Truncate old entries
const keepCount = Math.max(0, this.maxLogEntries -
reversedEntries.length);
updated = [...reversedEntries, ...this.logEntries.slice(0,
keepCount)];
```
##########
web-app/src/app/routes/log/log-stream/log-stream.component.ts:
##########
@@ -283,93 +359,67 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
}
private performAutoScroll(): void {
- if (!this.logContainerRef?.nativeElement || this.userScrolled) {
+ if (!this.viewport || this.userScrolled) {
return;
}
- const container = this.logContainerRef.nativeElement;
-
- // Use smooth scroll for better UX
- container.scrollTo({
- top: 0,
- behavior: 'smooth'
- });
+ this.viewport.scrollToIndex(0, 'smooth');
Review Comment:
The `scrollToIndex` method is called with 'smooth' as the second parameter,
but according to the Angular CDK Virtual Scroll documentation, `scrollToIndex`
only accepts one parameter (the index). The scroll behavior cannot be
specified. Either remove the 'smooth' parameter or use `scrollTo()` method with
a ScrollToOptions object if smooth scrolling is required.
```suggestion
this.viewport.scrollTo({ top: 0, behavior: 'smooth' });
```
##########
web-app/src/app/routes/log/log-stream/log-stream.component.html:
##########
@@ -159,47 +159,49 @@
</div>
</div>
- <div class="log-container" #logContainer [class.paused]="isPaused">
+ <div class="log-container" [class.paused]="isPaused">
<!-- Empty State -->
<div *ngIf="!isConnecting && logEntries.length === 0"
class="empty-state">
<nz-empty [nzNotFoundContent]="'log.stream.no-logs' | i18n">
</nz-empty>
</div>
- <!-- Log Entries -->
- <div
- *ngFor="let logEntry of logEntries; trackBy: trackByLogEntry"
- class="log-entry"
- [class.new-entry]="logEntry.isNew"
- (click)="showLogDetails(logEntry)"
- >
- <div class="log-content">
- <div class="log-meta">
- <nz-tag
[nzColor]="getSeverityColor(logEntry.original.severityNumber)"
class="severity-tag">
- {{ logEntry.original.severityText || ('log.stream.unknown' |
i18n) }}
- </nz-tag>
-
- <span class="timestamp">
- {{ formatTimestamp(logEntry.timestamp!) }}
- </span>
- </div>
-
- <div class="log-message">
- {{ getLogEntryJson(logEntry.original) }}
- </div>
-
- <div class="log-actions">
- <button
- nz-button
- nzSize="small"
- nzType="text"
- (click)="$event.stopPropagation();
copyToClipboard(getLogEntryJson(logEntry.original))"
- [nz-tooltip]="'log.stream.copy-message' | i18n"
- >
- <i nz-icon nzType="copy"></i>
- </button>
+ <!-- Log Entries with Virtual Scroll -->
+ <cdk-virtual-scroll-viewport itemSize="40"
class="virtual-scroll-viewport">
+ <div
+ *cdkVirtualFor="let logEntry of logEntries; trackBy: trackByLogEntry"
+ class="log-entry"
+ [class.new-entry]="logEntry.isNew"
+ (click)="showLogDetails(logEntry)"
+ >
+ <div class="log-content">
+ <div class="log-meta">
+ <nz-tag [nzColor]="logEntry.severityColor" class="severity-tag">
+ {{ logEntry.original.severityText || 'UNKNOWN' }}
+ </nz-tag>
+
+ <span class="timestamp">
+ {{ logEntry.timestamp | date : 'HH:mm:ss.SSS' }}
Review Comment:
The date format 'HH:mm:ss.SSS' shows only time without date information. For
log streams that run across midnight or need historical review, this could be
confusing. Consider including the date or using a relative timestamp format
(e.g., 'MMM d, HH:mm:ss.SSS') for better context.
```suggestion
{{ logEntry.timestamp | date : 'MMM d, HH:mm:ss.SSS' }}
```
##########
hertzbeat-log/src/main/java/org/apache/hertzbeat/log/notice/LogSseManager.java:
##########
@@ -19,90 +19,185 @@
package org.apache.hertzbeat.log.notice;
+import jakarta.annotation.PreDestroy;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.log.LogEntry;
-import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * SSE manager for log
+ * SSE manager for log with batch processing support for high TPS scenarios
*/
@Component
@Slf4j
@Getter
public class LogSseManager {
+
+ private static final long BATCH_INTERVAL_MS = 200;
+ private static final int MAX_BATCH_SIZE = 1000;
+ private static final int MAX_QUEUE_SIZE = 10000;
+
private final Map<Long, SseSubscriber> emitters = new
ConcurrentHashMap<>();
+ private final Queue<LogEntry> logQueue = new ConcurrentLinkedQueue<>();
+ private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "sse-batch-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
+ private final ExecutorService senderPool = Executors.newCachedThreadPool(r
-> {
+ Thread t = new Thread(r, "sse-sender");
+ t.setDaemon(true);
+ return t;
+ });
+ private final AtomicLong queueSize = new AtomicLong(0);
+
+ public LogSseManager() {
+ scheduler.scheduleAtFixedRate(this::flushBatch, BATCH_INTERVAL_MS,
BATCH_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ scheduler.shutdown();
+ senderPool.shutdown();
+ try {
+ scheduler.awaitTermination(2, TimeUnit.SECONDS);
+ senderPool.awaitTermination(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ scheduler.shutdownNow();
+ senderPool.shutdownNow();
+ }
/**
* Create a new SSE emitter for a client with specified filters
- * @param clientId The unique identifier for the client
- * @param filters The filters to apply to the log data
- * @return The SSE emitter
*/
public SseEmitter createEmitter(Long clientId, LogSseFilterCriteria
filters) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
emitter.onCompletion(() -> removeEmitter(clientId));
emitter.onTimeout(() -> removeEmitter(clientId));
emitter.onError((ex) -> removeEmitter(clientId));
- SseSubscriber subscriber = new SseSubscriber(emitter, filters);
- emitters.put(clientId, subscriber);
+ emitters.put(clientId, new SseSubscriber(emitter, filters));
return emitter;
}
/**
- * Broadcast log data to all subscribers
- * @param logEntry The log data to broadcast
+ * Queue log entry for batch processing
*/
- @Async
public void broadcast(LogEntry logEntry) {
- emitters.forEach((clientId, subscriber) -> {
- try {
- // Check if the log entry matches the subscriber's filter
criteria
- if (subscriber.filters == null ||
subscriber.filters.matches(logEntry)) {
- subscriber.emitter.send(SseEmitter.event()
- .id(String.valueOf(System.currentTimeMillis()))
- .name("LOG_EVENT")
- .data(logEntry));
+ if (queueSize.get() >= MAX_QUEUE_SIZE) {
+ return;
+ }
+ logQueue.offer(logEntry);
+ queueSize.incrementAndGet();
+ }
+
+ /**
+ * Flush queued logs to all subscribers in batch
+ */
+ private void flushBatch() {
+ try {
+ if (logQueue.isEmpty() || emitters.isEmpty()) {
+ return;
+ }
+
+ List<LogEntry> batch = new ArrayList<>(MAX_BATCH_SIZE);
+ LogEntry entry;
+ while (batch.size() < MAX_BATCH_SIZE && (entry = logQueue.poll())
!= null) {
+ batch.add(entry);
+ queueSize.decrementAndGet();
+ }
+
+ if (batch.isEmpty()) {
+ return;
+ }
+
+ // Send to each subscriber in parallel
+ List<Map.Entry<Long, SseSubscriber>> snapshot = new
ArrayList<>(emitters.entrySet());
+ for (Map.Entry<Long, SseSubscriber> e : snapshot) {
+ Long clientId = e.getKey();
+ SseSubscriber subscriber = e.getValue();
+ List<LogEntry> filtered = filterLogs(batch,
subscriber.filters);
+ if (!filtered.isEmpty()) {
+ senderPool.submit(() -> sendToSubscriber(clientId,
subscriber.emitter, filtered));
}
- } catch (IOException | IllegalStateException e) {
- subscriber.emitter.complete();
- removeEmitter(clientId);
- } catch (Exception exception) {
- log.error("Failed to broadcast log to client: {}",
exception.getMessage());
- subscriber.emitter.complete();
- removeEmitter(clientId);
}
- });
+ } catch (Exception e) {
+ log.error("Error in flushBatch: {}", e.getMessage(), e);
+ }
+ }
+
+ private void sendToSubscriber(Long clientId, SseEmitter emitter,
List<LogEntry> logs) {
+ try {
+ for (LogEntry logEntry : logs) {
+ emitter.send(SseEmitter.event()
+ .id(String.valueOf(System.currentTimeMillis()))
Review Comment:
Using the same timestamp (`System.currentTimeMillis()`) for all events in a
batch means multiple log entries will have identical event IDs. SSE event IDs
should be unique to allow proper event tracking and recovery. Consider using a
counter or appending a sequence number:
`String.valueOf(System.currentTimeMillis()) + "-" + sequenceNumber`
```suggestion
long batchTimestamp = System.currentTimeMillis();
int sequenceNumber = 0;
for (LogEntry logEntry : logs) {
String eventId = batchTimestamp + "-" + sequenceNumber++;
emitter.send(SseEmitter.event()
.id(eventId)
```
##########
web-app/src/app/routes/log/log-stream/log-stream.component.html:
##########
@@ -159,47 +159,49 @@
</div>
</div>
- <div class="log-container" #logContainer [class.paused]="isPaused">
+ <div class="log-container" [class.paused]="isPaused">
<!-- Empty State -->
<div *ngIf="!isConnecting && logEntries.length === 0"
class="empty-state">
<nz-empty [nzNotFoundContent]="'log.stream.no-logs' | i18n">
</nz-empty>
</div>
- <!-- Log Entries -->
- <div
- *ngFor="let logEntry of logEntries; trackBy: trackByLogEntry"
- class="log-entry"
- [class.new-entry]="logEntry.isNew"
- (click)="showLogDetails(logEntry)"
- >
- <div class="log-content">
- <div class="log-meta">
- <nz-tag
[nzColor]="getSeverityColor(logEntry.original.severityNumber)"
class="severity-tag">
- {{ logEntry.original.severityText || ('log.stream.unknown' |
i18n) }}
- </nz-tag>
-
- <span class="timestamp">
- {{ formatTimestamp(logEntry.timestamp!) }}
- </span>
- </div>
-
- <div class="log-message">
- {{ getLogEntryJson(logEntry.original) }}
- </div>
-
- <div class="log-actions">
- <button
- nz-button
- nzSize="small"
- nzType="text"
- (click)="$event.stopPropagation();
copyToClipboard(getLogEntryJson(logEntry.original))"
- [nz-tooltip]="'log.stream.copy-message' | i18n"
- >
- <i nz-icon nzType="copy"></i>
- </button>
+ <!-- Log Entries with Virtual Scroll -->
+ <cdk-virtual-scroll-viewport itemSize="40"
class="virtual-scroll-viewport">
+ <div
+ *cdkVirtualFor="let logEntry of logEntries; trackBy: trackByLogEntry"
+ class="log-entry"
+ [class.new-entry]="logEntry.isNew"
+ (click)="showLogDetails(logEntry)"
+ >
+ <div class="log-content">
+ <div class="log-meta">
+ <nz-tag [nzColor]="logEntry.severityColor" class="severity-tag">
+ {{ logEntry.original.severityText || 'UNKNOWN' }}
Review Comment:
[nitpick] Hardcoding 'UNKNOWN' as a fallback breaks i18n. The original
template used `'log.stream.unknown' | i18n` for translation support. Consider
using the same pattern: `{{ logEntry.original.severityText ||
('log.stream.unknown' | i18n) }}` to maintain internationalization.
```suggestion
{{ logEntry.original.severityText || ('log.stream.unknown' |
i18n) }}
```
##########
web-app/src/app/routes/log/log-stream/log-stream.component.ts:
##########
@@ -198,47 +230,95 @@ export class LogStreamComponent implements OnInit,
OnDestroy, AfterViewInit {
return params.toString();
}
- private addLogEntry(logEntry: LogEntry): void {
+ private queueLogEntry(logEntry: LogEntry): void {
+ // Drop logs if buffer is full (backpressure)
+ if (this.pendingLogs.length >= this.MAX_PENDING_LOGS) {
+ return;
+ }
+
+ // Pre-compute everything to minimize work during render
const extendedEntry: ExtendedLogEntry = {
original: logEntry,
- isNew: true,
- timestamp: logEntry.timeUnixNano ? new Date(logEntry.timeUnixNano /
1000000) : new Date()
+ timestamp: logEntry.timeUnixNano ? new Date(logEntry.timeUnixNano /
1000000) : new Date(),
+ displayText: this.formatLogDisplay(logEntry),
+ severityColor: this.computeSeverityColor(logEntry.severityNumber)
};
- this.logEntries.unshift(extendedEntry);
+ this.pendingLogs.push(extendedEntry);
+ this.displayedLogCount++;
Review Comment:
The `displayedLogCount` is incremented in `queueLogEntry` but never
decremented or used elsewhere in the component. This counter will grow
indefinitely and doesn't reflect the actual number of logs displayed (which is
bounded by `maxLogEntries`). Either remove this unused field or implement
proper logic to track displayed vs total received logs.
```suggestion
```
##########
hertzbeat-log/src/main/java/org/apache/hertzbeat/log/notice/LogSseManager.java:
##########
@@ -19,90 +19,185 @@
package org.apache.hertzbeat.log.notice;
+import jakarta.annotation.PreDestroy;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.log.LogEntry;
-import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * SSE manager for log
+ * SSE manager for log with batch processing support for high TPS scenarios
*/
@Component
@Slf4j
@Getter
public class LogSseManager {
+
+ private static final long BATCH_INTERVAL_MS = 200;
+ private static final int MAX_BATCH_SIZE = 1000;
+ private static final int MAX_QUEUE_SIZE = 10000;
+
private final Map<Long, SseSubscriber> emitters = new
ConcurrentHashMap<>();
+ private final Queue<LogEntry> logQueue = new ConcurrentLinkedQueue<>();
+ private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "sse-batch-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
+ private final ExecutorService senderPool = Executors.newCachedThreadPool(r
-> {
+ Thread t = new Thread(r, "sse-sender");
+ t.setDaemon(true);
+ return t;
+ });
+ private final AtomicLong queueSize = new AtomicLong(0);
+
+ public LogSseManager() {
+ scheduler.scheduleAtFixedRate(this::flushBatch, BATCH_INTERVAL_MS,
BATCH_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ scheduler.shutdown();
+ senderPool.shutdown();
+ try {
+ scheduler.awaitTermination(2, TimeUnit.SECONDS);
+ senderPool.awaitTermination(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ scheduler.shutdownNow();
+ senderPool.shutdownNow();
+ }
/**
* Create a new SSE emitter for a client with specified filters
- * @param clientId The unique identifier for the client
- * @param filters The filters to apply to the log data
- * @return The SSE emitter
*/
public SseEmitter createEmitter(Long clientId, LogSseFilterCriteria
filters) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
emitter.onCompletion(() -> removeEmitter(clientId));
emitter.onTimeout(() -> removeEmitter(clientId));
emitter.onError((ex) -> removeEmitter(clientId));
- SseSubscriber subscriber = new SseSubscriber(emitter, filters);
- emitters.put(clientId, subscriber);
+ emitters.put(clientId, new SseSubscriber(emitter, filters));
return emitter;
}
/**
- * Broadcast log data to all subscribers
- * @param logEntry The log data to broadcast
+ * Queue log entry for batch processing
*/
- @Async
public void broadcast(LogEntry logEntry) {
- emitters.forEach((clientId, subscriber) -> {
- try {
- // Check if the log entry matches the subscriber's filter
criteria
- if (subscriber.filters == null ||
subscriber.filters.matches(logEntry)) {
- subscriber.emitter.send(SseEmitter.event()
- .id(String.valueOf(System.currentTimeMillis()))
- .name("LOG_EVENT")
- .data(logEntry));
+ if (queueSize.get() >= MAX_QUEUE_SIZE) {
+ return;
+ }
+ logQueue.offer(logEntry);
+ queueSize.incrementAndGet();
+ }
+
+ /**
+ * Flush queued logs to all subscribers in batch
+ */
+ private void flushBatch() {
+ try {
+ if (logQueue.isEmpty() || emitters.isEmpty()) {
+ return;
+ }
+
+ List<LogEntry> batch = new ArrayList<>(MAX_BATCH_SIZE);
+ LogEntry entry;
+ while (batch.size() < MAX_BATCH_SIZE && (entry = logQueue.poll())
!= null) {
+ batch.add(entry);
+ queueSize.decrementAndGet();
+ }
+
+ if (batch.isEmpty()) {
+ return;
+ }
+
+ // Send to each subscriber in parallel
+ List<Map.Entry<Long, SseSubscriber>> snapshot = new
ArrayList<>(emitters.entrySet());
+ for (Map.Entry<Long, SseSubscriber> e : snapshot) {
Review Comment:
Creating a snapshot of `emitters.entrySet()` is good practice to avoid
ConcurrentModificationException. However, creating a new `ArrayList` copies all
entries. For better performance with many subscribers, consider using
`emitters.entrySet().toArray()` or iterating directly over the concurrent map
which is designed for concurrent reads.
```suggestion
for (Map.Entry<Long, SseSubscriber> e : emitters.entrySet()) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]