Copilot commented on code in PR #897:
URL: 
https://github.com/apache/skywalking-banyandb/pull/897#discussion_r2623612840


##########
fodc/internal/flightrecorder/datasource.go:
##########
@@ -0,0 +1,248 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package flightrecorder implements a flight recorder for metrics data.
+package flightrecorder
+
+import (
+       "fmt"
+       "sync"
+
+       "github.com/apache/skywalking-banyandb/fodc/internal/metrics"
+)
+
+const (
+       defaultCapacity      = 1000
+       intSize              = 8  // Size of int on 64-bit systems
+       sliceHeaderSize      = 24 // Size of slice header (pointer + length + 
capacity)
+       mapBaseOverhead      = 48 // Base map structure overhead
+       mapEntryOverhead     = 16 // Per-entry overhead in map
+       descMapEntryOverhead = 24 // Per-entry overhead for description map
+       stringHeaderSize     = 16 // String header size
+)
+
+// MetricRingBuffer is a type alias for RingBuffer[float64].
+type MetricRingBuffer = RingBuffer[float64]
+
+// TimestampRingBuffer is a type alias for RingBuffer[int64].
+type TimestampRingBuffer = RingBuffer[int64]
+
+// NewMetricRingBuffer creates a new MetricRingBuffer.
+func NewMetricRingBuffer() *MetricRingBuffer {
+       return NewRingBuffer[float64]()
+}
+
+// NewTimestampRingBuffer creates a new TimestampRingBuffer.
+func NewTimestampRingBuffer() *TimestampRingBuffer {
+       return NewRingBuffer[int64]()
+}
+
+// UpdateMetricRingBuffer adds a metric value to the metric ring buffer.
+func UpdateMetricRingBuffer(mrb *MetricRingBuffer, v float64) {
+       mrb.Add(v)
+}
+
+// UpdateTimestampRingBuffer adds a timestamp value to the timestamp ring 
buffer.
+func UpdateTimestampRingBuffer(trb *TimestampRingBuffer, v int64) {
+       if trb != nil {
+               trb.Add(v)
+       }
+}
+
+// Datasource stores metrics data with ring buffers.
+type Datasource struct {
+       metrics      map[string]*MetricRingBuffer // Map from metric 
name+labels to RingBuffer storing metric values
+       descriptions map[string]string            // Map from metric name to 
HELP content descriptions
+       timestamps   *TimestampRingBuffer         // RingBuffer storing 
timestamps for each polling cycle
+       mu           sync.RWMutex
+       CapacitySize int // Memory limit in bytes
+}
+
+// NewDatasource creates a new Datasource.
+func NewDatasource() *Datasource {
+       return &Datasource{
+               metrics:      make(map[string]*MetricRingBuffer),
+               timestamps:   NewTimestampRingBuffer(),
+               descriptions: make(map[string]string),
+               CapacitySize: 0,
+       }
+}
+
+// Update records a metric in the datasource.
+func (ds *Datasource) Update(m *metrics.RawMetric) error {
+       if m == nil {
+               return fmt.Errorf("metric cannot be nil")
+       }
+
+       ds.mu.Lock()
+       defer ds.mu.Unlock()
+
+       mk := metrics.MetricKey{
+               Name:   m.Name,
+               Labels: m.Labels,
+       }
+       metricKey := mk.String()
+
+       if _, exists := ds.metrics[metricKey]; !exists {
+               ds.metrics[metricKey] = NewMetricRingBuffer()
+       }
+       if m.Desc != "" {
+               ds.descriptions[m.Name] = m.Desc
+       }
+       UpdateMetricRingBuffer(ds.metrics[metricKey], m.Value)
+
+       return nil
+}
+
+// AddTimestamp adds a timestamp for the current polling cycle.
+func (ds *Datasource) AddTimestamp(timestamp int64) {
+       ds.mu.Lock()
+       defer ds.mu.Unlock()
+       UpdateTimestampRingBuffer(ds.timestamps, timestamp)
+}
+
+// UpdateBatch atomically updates a batch of metrics and timestamp.
+func (ds *Datasource) UpdateBatch(rawMetrics []metrics.RawMetric, timestamp 
int64) error {
+       ds.mu.Lock()
+       defer ds.mu.Unlock()
+
+       // Update timestamp first
+       UpdateTimestampRingBuffer(ds.timestamps, timestamp)
+
+       // Update all metrics
+       for idx := range rawMetrics {
+               m := &rawMetrics[idx]
+
+               mk := metrics.MetricKey{
+                       Name:   m.Name,
+                       Labels: m.Labels,
+               }
+               metricKey := mk.String()
+
+               if _, exists := ds.metrics[metricKey]; !exists {
+                       ds.metrics[metricKey] = NewMetricRingBuffer()
+               }
+               if m.Desc != "" {
+                       ds.descriptions[m.Name] = m.Desc
+               }
+               UpdateMetricRingBuffer(ds.metrics[metricKey], m.Value)
+       }
+
+       return nil
+}
+
+// SetCapacity sets the capacity for the datasource and updates all ring 
buffer capacities.
+func (ds *Datasource) SetCapacity(capacity int) {
+       ds.mu.Lock()
+       defer ds.mu.Unlock()
+       ds.CapacitySize = capacity
+
+       computedCapacity := ds.ComputeCapacity(capacity)
+
+       for _, metricBuffer := range ds.metrics {
+               metricBuffer.SetCapacity(computedCapacity)
+       }
+
+       if ds.timestamps != nil {
+               ds.timestamps.SetCapacity(computedCapacity)
+       }
+}
+
+// ComputeCapacity computes the maximum capacity for ring buffers based on 
available memory constraints.
+func (ds *Datasource) ComputeCapacity(capacitySize int) int {
+       if capacitySize <= 0 {
+               return 1
+       }
+       numMetrics := len(ds.metrics)
+       if numMetrics == 0 {
+               return defaultCapacity
+       }
+
+       // Fixed Overheads
+       // Metrics Map Overhead
+       metricsMapOverhead := mapBaseOverhead + (numMetrics * mapEntryOverhead)
+
+       // Descriptions Map Overhead
+       descriptionsMapOverhead := mapBaseOverhead + (numMetrics * 
descMapEntryOverhead)
+
+       // String Storage Overhead
+       stringOverhead := 0
+       for key := range ds.metrics {
+               stringOverhead += stringHeaderSize + len(key)
+       }
+       for _, desc := range ds.descriptions {
+               stringOverhead += stringHeaderSize + len(desc)
+       }
+
+       // For each metric RingBuffer: next field (8 bytes) + size field (8 
bytes) + slice header (24 bytes)
+       metricBufferOverhead := numMetrics * (intSize + intSize + 
sliceHeaderSize)
+       // For timestamp RingBuffer: pointer (8 bytes) + next field (8 bytes) + 
size field (8 bytes) + slice header (24 bytes) + mutex (24 bytes)
+       timestampBufferOverhead := 8 + intSize + intSize + sliceHeaderSize + 24
+
+       totalFixedOverhead := metricsMapOverhead + descriptionsMapOverhead + 
stringOverhead +
+               metricBufferOverhead + timestampBufferOverhead
+
+       if totalFixedOverhead >= capacitySize {
+               return 0

Review Comment:
   The function returns 0 when overhead exceeds capacity, but line 213 ensures 
a minimum of 1 is returned. This creates inconsistent behavior. Consider 
returning 1 directly at line 200 to maintain consistency with the minimum 
capacity guarantee stated in the function logic.
   ```suggestion
                return 1
   ```



##########
fodc/internal/integration/metrics_export_test.go:
##########
@@ -0,0 +1,622 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_test
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "net"
+       "net/http"
+       "strings"
+       "sync"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/prometheus/client_golang/prometheus"
+
+       "github.com/apache/skywalking-banyandb/fodc/internal/exporter"
+       "github.com/apache/skywalking-banyandb/fodc/internal/flightrecorder"
+       "github.com/apache/skywalking-banyandb/fodc/internal/metrics"
+       "github.com/apache/skywalking-banyandb/fodc/internal/server"
+       "github.com/apache/skywalking-banyandb/fodc/internal/watchdog"
+)
+
+// volatileMetricNames contains names of metrics that are known to be volatile 
(gauges that change frequently).
+var volatileMetricNames = []string{
+       "go_memstats_heap_alloc_bytes",
+       "go_memstats_heap_sys_bytes",
+       "go_memstats_heap_inuse_bytes",
+       "go_memstats_heap_idle_bytes",
+       "go_memstats_heap_released_bytes",
+       "go_memstats_alloc_bytes",
+       "go_memstats_sys_bytes",
+       "go_memstats_mallocs_total",
+       "go_memstats_frees_total",
+       "go_memstats_gc_sys_bytes",
+       "go_memstats_other_sys_bytes",
+       "go_memstats_next_gc_bytes",
+       "go_memstats_last_gc_time_seconds",
+       "process_resident_memory_bytes",
+       "process_virtual_memory_bytes",
+}
+
+// isVolatileMetric checks if a metric key contains any volatile metric name.
+// Latency metrics are also considered volatile as they represent timing 
measurements that fluctuate.
+func isVolatileMetric(metricKeyStr string) bool {
+       // Check for latency-related keywords (case-insensitive)
+       lowerKey := strings.ToLower(metricKeyStr)
+       if strings.Contains(lowerKey, "latency") {
+               return true
+       }
+
+       // Check against predefined volatile metric names
+       for _, volatileName := range volatileMetricNames {
+               if strings.Contains(metricKeyStr, volatileName) {
+                       return true
+               }
+       }
+       return false
+}
+
+// isLatencyMetric checks if a metric key represents a latency metric.
+func isLatencyMetric(metricKeyStr string) bool {
+       lowerKey := strings.ToLower(metricKeyStr)
+       return strings.Contains(lowerKey, "latency")
+}
+
+// calculateTolerance calculates the appropriate tolerance for a volatile 
metric.
+// For latency metrics, uses a smaller absolute tolerance (0.1 seconds).
+// For memory metrics, uses 10MB as absolute tolerance.
+func calculateTolerance(metricKeyStr string, bufferedValue float64) float64 {
+       if isLatencyMetric(metricKeyStr) {
+               // For latency metrics, use percentage-based tolerance (10%) or 
0.1 seconds, whichever is larger
+               absTolerance := 0.1                      // 100ms
+               percentTolerance := bufferedValue * 0.10 // 10%
+               if percentTolerance > absTolerance {
+                       return percentTolerance
+               }
+               return absTolerance
+       }
+       // For memory metrics, use percentage-based tolerance (10%) or 10MB, 
whichever is larger
+       absTolerance := 10.0 * 1024 * 1024       // 10MB
+       percentTolerance := bufferedValue * 0.10 // 10%
+       if percentTolerance > absTolerance {
+               return percentTolerance
+       }
+       return absTolerance
+}
+
+var _ = Describe("Test Case 3: Metrics Export to Prometheus", func() {
+       var (
+               metricsEndpoint     string
+               fr                  *flightrecorder.FlightRecorder
+               wd                  *watchdog.Watchdog
+               metricsServer       *server.Server
+               metricsServerAddr   string
+               promReg             *prometheus.Registry
+               datasourceCollector *exporter.DatasourceCollector
+       )
+
+       BeforeEach(func() {
+               // Construct BanyanDB metrics endpoint URL
+               host, _, splitErr := net.SplitHostPort(banyanDBHTTPAddr)
+               if splitErr != nil {
+                       parts := strings.Split(banyanDBHTTPAddr, ":")
+                       if len(parts) > 0 {
+                               host = parts[0]
+                       } else {
+                               host = defaultLocalhost
+                       }
+               }
+               if host == "" {
+                       host = defaultLocalhost
+               }
+               metricsEndpoint = fmt.Sprintf("http://%s:2121/metrics";, host)
+
+               // Create Flight Recorder with reasonable capacity
+               capacitySize := 10 * 1024 * 1024 // 10MB
+               fr = flightrecorder.NewFlightRecorder(capacitySize)
+
+               // Create Prometheus registry and collector
+               promReg = prometheus.NewRegistry()
+               datasourceCollector = exporter.NewDatasourceCollector(fr)
+
+               // Create and start Prometheus metrics server for FODC with 
fixed port for testing
+               // Use a high port number to avoid conflicts
+               metricsServerAddr = defaultLocalhost + ":9091"
+               var serverCreateErr error
+               metricsServer, serverCreateErr = server.NewServer(server.Config{
+                       ListenAddr:        metricsServerAddr,
+                       ReadHeaderTimeout: 3 * time.Second,
+                       ShutdownTimeout:   5 * time.Second,
+               })
+               Expect(serverCreateErr).NotTo(HaveOccurred())
+
+               serverErrCh, serverStartErr := metricsServer.Start(promReg, 
datasourceCollector)
+               Expect(serverStartErr).NotTo(HaveOccurred())
+               Expect(serverErrCh).NotTo(BeNil())
+
+               // Create Watchdog with short polling interval for testing
+               pollInterval := 2 * time.Second
+               wd = watchdog.NewWatchdogWithConfig(fr, metricsEndpoint, 
pollInterval)
+
+               ctx := context.Background()
+               preRunErr := wd.PreRun(ctx)
+               Expect(preRunErr).NotTo(HaveOccurred())
+
+               // Verify BanyanDB metrics endpoint is accessible before 
starting watchdog
+               client := &http.Client{Timeout: 2 * time.Second}
+               resp, healthErr := client.Get(metricsEndpoint)
+               Expect(healthErr).NotTo(HaveOccurred(), "BanyanDB metrics 
endpoint should be accessible")
+               if resp != nil {
+                       resp.Body.Close()
+               }
+
+               // Start watchdog polling
+               stopCh := wd.Serve()
+               Expect(stopCh).NotTo(BeNil())
+
+               // Give watchdog a moment to start
+               time.Sleep(500 * time.Millisecond)
+       })
+
+       AfterEach(func() {
+               // Stop watchdog
+               if wd != nil {
+                       wd.GracefulStop()
+               }
+
+               // Stop metrics server
+               if metricsServer != nil {
+                       stopErr := metricsServer.Stop()
+                       Expect(stopErr).NotTo(HaveOccurred())
+               }
+       })
+
+       It("should export metrics to Prometheus format correctly", func() {
+               // Step 1: Generate metrics and wait for Watchdog to collect 
them
+               client := &http.Client{
+                       Timeout: 5 * time.Second,
+               }
+
+               // Generate metrics by performing operations
+               for i := 0; i < 10; i++ {
+                       req, reqErr := http.NewRequest(http.MethodGet, 
fmt.Sprintf("http://%s/api/v1/health";, banyanDBHTTPAddr), nil)
+                       Expect(reqErr).NotTo(HaveOccurred())
+
+                       resp, respErr := client.Do(req)
+                       if respErr == nil && resp != nil {
+                               resp.Body.Close()
+                       }
+
+                       metricsReq, metricsReqErr := 
http.NewRequest(http.MethodGet, metricsEndpoint, nil)
+                       Expect(metricsReqErr).NotTo(HaveOccurred())
+
+                       metricsResp, metricsRespErr := client.Do(metricsReq)
+                       if metricsRespErr == nil && metricsResp != nil {
+                               metricsResp.Body.Close()
+                       }
+
+                       time.Sleep(200 * time.Millisecond)
+               }
+
+               // Wait for Watchdog to poll metrics
+               Eventually(func() bool {
+                       datasources := fr.GetDatasources()
+                       if len(datasources) == 0 {
+                               return false
+                       }
+                       ds := datasources[0]
+                       metricsMap := ds.GetMetrics()
+                       return len(metricsMap) > 0
+               }, 10*time.Second, 500*time.Millisecond).Should(BeTrue(), 
"Metrics should be buffered after watchdog polls")
+
+               // Step 2: Verify metrics are stored in FlightRecorder 
Datasources
+               datasources := fr.GetDatasources()
+               Expect(datasources).NotTo(BeEmpty(), "FlightRecorder should 
have at least one datasource")
+
+               ds := datasources[0]
+               metricsMap := ds.GetMetrics()
+               Expect(metricsMap).NotTo(BeEmpty(), "Datasource should have 
buffered metrics")
+
+               // Step 3: Scrape `/metrics` endpoint using Prometheus client
+               fodcMetricsURL := fmt.Sprintf("http://%s/metrics";, 
metricsServerAddr)
+               scrapeClient := &http.Client{
+                       Timeout: 5 * time.Second,
+               }
+
+               // Wait for metrics server to be ready
+               Eventually(func() error {
+                       resp, err := scrapeClient.Get(fodcMetricsURL)
+                       if err != nil {
+                               return err
+                       }
+                       resp.Body.Close()
+                       if resp.StatusCode != http.StatusOK {
+                               return fmt.Errorf("unexpected status code: %d", 
resp.StatusCode)
+                       }
+                       return nil
+               }, 5*time.Second, 500*time.Millisecond).Should(Succeed(), "FODC 
metrics endpoint should be accessible")
+
+               // Scrape metrics
+               resp, scrapeErr := scrapeClient.Get(fodcMetricsURL)
+               Expect(scrapeErr).NotTo(HaveOccurred(), "Should be able to 
scrape metrics endpoint")
+               Expect(resp.StatusCode).To(Equal(http.StatusOK), "Metrics 
endpoint should return 200 OK")
+
+               // Verify Content-Type header
+               contentType := resp.Header.Get("Content-Type")
+               Expect(contentType).To(ContainSubstring("text/plain"), 
"Content-Type should be text/plain")
+               Expect(contentType).To(ContainSubstring("version=0.0.4"), 
"Should include Prometheus format version")
+
+               // Read response body
+               bodyBytes, readErr := io.ReadAll(resp.Body)
+               resp.Body.Close()
+               Expect(readErr).NotTo(HaveOccurred(), "Should be able to read 
response body")
+
+               body := string(bodyBytes)
+               Expect(body).NotTo(BeEmpty(), "Metrics response should not be 
empty")
+
+               GinkgoWriter.Printf("Scraped metrics:\n%s\n", body)
+
+               // Step 4: Parse Prometheus format and verify exported metrics 
match buffered metrics
+               // Strip timestamps from metric lines before parsing 
(Prometheus format includes optional timestamps)
+               bodyWithoutTimestamps := 
stripTimestampsFromPrometheusFormat(body)
+               parsedMetrics, parseErr := metrics.Parse(bodyWithoutTimestamps)
+               Expect(parseErr).NotTo(HaveOccurred(), "Should be able to parse 
Prometheus format")
+
+               Expect(len(parsedMetrics)).To(BeNumerically(">", 0), "Should 
have parsed at least one metric")
+
+               // Build a map of buffered metrics for comparison
+               bufferedMetricsMap := make(map[string]float64)
+               descriptions := ds.GetDescriptions()
+
+               for metricKeyStr, metricBuffer := range metricsMap {
+                       if metricBuffer.Len() == 0 {
+                               continue
+                       }
+                       currentValue := metricBuffer.GetCurrentValue()
+                       bufferedMetricsMap[metricKeyStr] = currentValue
+               }
+
+               // Step 5: Verify exported metrics match buffered metrics
+               exportedMetricsMap := make(map[string]float64)
+               for _, parsedMetric := range parsedMetrics {
+                       // Reconstruct metric key from parsed metric
+                       metricKey := metrics.MetricKey{
+                               Name:   parsedMetric.Name,
+                               Labels: parsedMetric.Labels,
+                       }
+                       metricKeyStr := metricKey.String()
+                       exportedMetricsMap[metricKeyStr] = parsedMetric.Value
+               }
+
+               // Verify that exported metrics match buffered metrics
+               matchedCount := 0
+               for metricKeyStr, bufferedValue := range bufferedMetricsMap {
+                       exportedValue, exists := 
exportedMetricsMap[metricKeyStr]
+                       if exists {
+                               matchedCount++
+                               // Check if this is a volatile metric 
(memory-related gauges or latency metrics)
+                               if isVolatileMetric(metricKeyStr) {
+                                       tolerance := 
calculateTolerance(metricKeyStr, bufferedValue)
+                                       
Expect(exportedValue).To(BeNumerically("~", bufferedValue, tolerance),
+                                               fmt.Sprintf("Exported metric %s 
value should be within tolerance of buffered value (volatile metric)", 
metricKeyStr))
+                               } else {
+                                       // Allow small floating point 
differences for non-volatile metrics
+                                       
Expect(exportedValue).To(BeNumerically("~", bufferedValue, 0.0001),
+                                               fmt.Sprintf("Exported metric %s 
value should match buffered value", metricKeyStr))
+                               }
+                       }
+               }
+
+               Expect(matchedCount).To(BeNumerically(">", 0),
+                       "At least some exported metrics should match buffered 
metrics")
+
+               // Step 6: Verify metric labels are preserved correctly
+               for _, parsedMetric := range parsedMetrics {
+                       metricKey := metrics.MetricKey{
+                               Name:   parsedMetric.Name,
+                               Labels: parsedMetric.Labels,
+                       }
+                       metricKeyStr := metricKey.String()
+
+                       // Check if this metric exists in buffered metrics
+                       if _, exists := bufferedMetricsMap[metricKeyStr]; 
exists {
+                               // Verify labels are preserved
+                               bufferedBuffer, bufferedExists := 
metricsMap[metricKeyStr]
+                               Expect(bufferedExists).To(BeTrue(), "Metric 
should exist in buffered metrics")
+
+                               // Verify the metric has the same structure
+                               Expect(bufferedBuffer).NotTo(BeNil(), "Buffered 
metric buffer should not be nil")
+                       }
+               }
+
+               // Step 7: Verify HELP text is included in exported metrics
+               lines := strings.Split(body, "\n")
+               helpLines := make(map[string]string)
+
+               for _, line := range lines {
+                       line = strings.TrimSpace(line)
+                       if strings.HasPrefix(line, "# HELP ") {
+                               // Parse HELP line: # HELP metric_name 
description
+                               // Format: # HELP metric_name description_text

Review Comment:
   The comment describes the format as '# HELP metric_name description_text' 
but the code uses `SplitN(line, \" \", 3)` which splits into 3 parts (\"#\", 
\"HELP\", \"metric_name description_text\"), not matching the described format. 
The comment should clarify that it splits into ['#', 'HELP', 'metric_name 
description_text'] where parts[1] is 'HELP' and parts[2] contains the rest.
   ```suggestion
                                // SplitN(line, " ", 3) produces ["#", "HELP", 
"metric_name description_text"]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to