This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch perf/trace-vs-stream
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit d7ad0a8fb4069e2e9b212fa8ad8f2e8e371eb67f
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sun Sep 7 00:15:37 2025 +0000

    feat: implement performance benchmarking for Stream vs Trace models
    
    - Added a comprehensive benchmarking suite to compare performance between 
Stream and Trace models.
    - Introduced new utility files for data generation, metrics collection, and 
distribution handling.
    - Implemented a benchmark runner to execute write and query performance 
tests with configurable parameters.
    - Enhanced test suite to validate schema setup and benchmark execution, 
ensuring accurate performance comparisons.
---
 .../lifecycle/segment_boundary_utils_test.go       |   2 +-
 test/stress/stream-vs-trace/benchmark_runner.go    | 444 ++++++++++++
 test/stress/stream-vs-trace/data_generator.go      | 784 +++++++++++++++++++++
 test/stress/stream-vs-trace/distribution.go        | 183 +++++
 test/stress/stream-vs-trace/metrics.go             | 350 +++++++++
 .../stream-vs-trace/stream_vs_trace_suite_test.go  |  28 +-
 6 files changed, 1781 insertions(+), 10 deletions(-)

diff --git a/banyand/backup/lifecycle/segment_boundary_utils_test.go 
b/banyand/backup/lifecycle/segment_boundary_utils_test.go
index b6ae9f0f..00355866 100644
--- a/banyand/backup/lifecycle/segment_boundary_utils_test.go
+++ b/banyand/backup/lifecycle/segment_boundary_utils_test.go
@@ -29,8 +29,8 @@ import (
 func TestCalculateTargetSegments(t *testing.T) {
        //nolint:govet // fieldalignment: test struct optimization not critical
        tests := []struct {
-               expected       []time.Time
                name           string
+               expected       []time.Time
                targetInterval storage.IntervalRule
                partMinTS      int64
                partMaxTS      int64
diff --git a/test/stress/stream-vs-trace/benchmark_runner.go 
b/test/stress/stream-vs-trace/benchmark_runner.go
new file mode 100644
index 00000000..51c6cf46
--- /dev/null
+++ b/test/stress/stream-vs-trace/benchmark_runner.go
@@ -0,0 +1,444 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package streamvstrace
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+)
+
+// BenchmarkConfig represents configuration for running benchmarks.
+type BenchmarkConfig struct {
+       Scale         Scale
+       Concurrency   int
+       BatchSize     int
+       TestDuration  time.Duration
+       EnableQueries bool
+       QueryInterval time.Duration
+}
+
+// DefaultBenchmarkConfig returns a default benchmark configuration.
+func DefaultBenchmarkConfig(scale Scale) BenchmarkConfig {
+       return BenchmarkConfig{
+               Scale:         scale,
+               Concurrency:   10,
+               BatchSize:     100,
+               TestDuration:  5 * time.Minute,
+               EnableQueries: true,
+               QueryInterval: 30 * time.Second,
+       }
+}
+
+// BenchmarkRunner coordinates the execution of performance benchmarks.
+type BenchmarkRunner struct {
+       streamClient  *StreamClient
+       traceClient   *TraceClient
+       streamMetrics *PerformanceMetrics
+       traceMetrics  *PerformanceMetrics
+       generator     *SpanGenerator
+       config        BenchmarkConfig
+}
+
+// NewBenchmarkRunner creates a new benchmark runner.
+func NewBenchmarkRunner(config BenchmarkConfig, streamClient *StreamClient, 
traceClient *TraceClient) *BenchmarkRunner {
+       return &BenchmarkRunner{
+               config:        config,
+               streamClient:  streamClient,
+               traceClient:   traceClient,
+               streamMetrics: NewPerformanceMetrics(),
+               traceMetrics:  NewPerformanceMetrics(),
+               generator:     NewSpanGenerator(config.Scale),
+       }
+}
+
+// RunWriteBenchmark runs write performance benchmarks for both models.
+func (r *BenchmarkRunner) RunWriteBenchmark(ctx context.Context) error {
+       fmt.Printf("Starting write benchmark for scale: %s\n", r.config.Scale)
+       fmt.Printf("Concurrency: %d, Batch Size: %d, Duration: %v\n",
+               r.config.Concurrency, r.config.BatchSize, r.config.TestDuration)
+
+       // Create context with timeout
+       benchCtx, cancel := context.WithTimeout(ctx, r.config.TestDuration)
+       defer cancel()
+
+       // Start both benchmarks concurrently
+       var wg sync.WaitGroup
+       errChan := make(chan error, 2)
+
+       // Stream write benchmark
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               if err := r.runStreamWriteBenchmark(benchCtx); err != nil {
+                       errChan <- fmt.Errorf("stream write benchmark failed: 
%w", err)
+               }
+       }()
+
+       // Trace write benchmark
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               if err := r.runTraceWriteBenchmark(benchCtx); err != nil {
+                       errChan <- fmt.Errorf("trace write benchmark failed: 
%w", err)
+               }
+       }()
+
+       // Wait for completion
+       wg.Wait()
+       close(errChan)
+
+       // Check for errors
+       for err := range errChan {
+               if err != nil {
+                       return err
+               }
+       }
+
+       // Generate reports
+       fmt.Println("\n=== Stream Model Write Performance ===")
+       streamReport := r.streamMetrics.GenerateReport("Stream Write", 
r.config.Scale)
+       streamReport.PrintReport()
+
+       fmt.Println("\n=== Trace Model Write Performance ===")
+       traceReport := r.traceMetrics.GenerateReport("Trace Write", 
r.config.Scale)
+       traceReport.PrintReport()
+
+       return nil
+}
+
+// runStreamWriteBenchmark runs the stream write benchmark.
+//
+//nolint:unparam
+func (r *BenchmarkRunner) runStreamWriteBenchmark(ctx context.Context) error {
+       // Generate test data
+       traces := r.generator.GenerateTraces()
+       allSpans := make([]*SpanData, 0)
+       for _, trace := range traces {
+               allSpans = append(allSpans, trace.Spans...)
+       }
+
+       fmt.Printf("Generated %d traces with %d total spans for stream 
benchmark\n",
+               len(traces), len(allSpans))
+
+       // Process spans in batches with concurrency
+       spanChan := make(chan []*SpanData, r.config.Concurrency*2)
+
+       // Start workers
+       var wg sync.WaitGroup
+       for i := 0; i < r.config.Concurrency; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       r.streamWriteWorker(ctx, spanChan)
+               }()
+       }
+
+       // Send batches to workers
+       go func() {
+               defer close(spanChan)
+               for i := 0; i < len(allSpans); i += r.config.BatchSize {
+                       end := i + r.config.BatchSize
+                       if end > len(allSpans) {
+                               end = len(allSpans)
+                       }
+
+                       select {
+                       case spanChan <- allSpans[i:end]:
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
+
+       // Wait for workers to complete
+       wg.Wait()
+
+       return nil
+}
+
+// runTraceWriteBenchmark runs the trace write benchmark.
+//
+//nolint:unparam
+func (r *BenchmarkRunner) runTraceWriteBenchmark(ctx context.Context) error {
+       // Generate test data (same as stream for fair comparison)
+       traces := r.generator.GenerateTraces()
+       allSpans := make([]*SpanData, 0)
+       for _, trace := range traces {
+               allSpans = append(allSpans, trace.Spans...)
+       }
+
+       fmt.Printf("Generated %d traces with %d total spans for trace 
benchmark\n",
+               len(traces), len(allSpans))
+
+       // Process spans in batches with concurrency
+       spanChan := make(chan []*SpanData, r.config.Concurrency*2)
+
+       // Start workers
+       var wg sync.WaitGroup
+       for i := 0; i < r.config.Concurrency; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       r.traceWriteWorker(ctx, spanChan)
+               }()
+       }
+
+       // Send batches to workers
+       go func() {
+               defer close(spanChan)
+               for i := 0; i < len(allSpans); i += r.config.BatchSize {
+                       end := i + r.config.BatchSize
+                       if end > len(allSpans) {
+                               end = len(allSpans)
+                       }
+
+                       select {
+                       case spanChan <- allSpans[i:end]:
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
+
+       // Wait for workers to complete
+       wg.Wait()
+
+       return nil
+}
+
+// streamWriteWorker processes batches of spans for stream writes.
+func (r *BenchmarkRunner) streamWriteWorker(ctx context.Context, spanChan 
<-chan []*SpanData) {
+       for {
+               select {
+               case spans, ok := <-spanChan:
+                       if !ok {
+                               return
+                       }
+
+                       start := time.Now()
+                       err := r.streamClient.WriteStreamData(ctx, spans)
+                       duration := time.Since(start)
+
+                       // Calculate data size
+                       var dataSize int64
+                       for _, span := range spans {
+                               dataSize += int64(len(span.DataBinary))
+                       }
+
+                       if err != nil {
+                               r.streamMetrics.RecordError()
+                               fmt.Printf("Stream write error: %v\n", err)
+                       } else {
+                               r.streamMetrics.RecordWrite(duration, dataSize)
+                       }
+
+               case <-ctx.Done():
+                       return
+               }
+       }
+}
+
+// traceWriteWorker processes batches of spans for trace writes.
+func (r *BenchmarkRunner) traceWriteWorker(ctx context.Context, spanChan 
<-chan []*SpanData) {
+       for {
+               select {
+               case spans, ok := <-spanChan:
+                       if !ok {
+                               return
+                       }
+
+                       start := time.Now()
+                       err := r.traceClient.WriteTraceData(ctx, spans)
+                       duration := time.Since(start)
+
+                       // Calculate data size
+                       var dataSize int64
+                       for _, span := range spans {
+                               dataSize += int64(len(span.DataBinary))
+                       }
+
+                       if err != nil {
+                               r.traceMetrics.RecordError()
+                               fmt.Printf("Trace write error: %v\n", err)
+                       } else {
+                               r.traceMetrics.RecordWrite(duration, dataSize)
+                       }
+
+               case <-ctx.Done():
+                       return
+               }
+       }
+}
+
+// RunQueryBenchmark runs query performance benchmarks.
+func (r *BenchmarkRunner) RunQueryBenchmark(ctx context.Context) error {
+       if !r.config.EnableQueries {
+               fmt.Println("Query benchmarks disabled")
+               return nil
+       }
+
+       fmt.Printf("Starting query benchmark for scale: %s\n", r.config.Scale)
+
+       // Run query benchmarks for both models
+       var wg sync.WaitGroup
+       errChan := make(chan error, 2)
+
+       // Stream query benchmark
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               if err := r.runStreamQueryBenchmark(ctx); err != nil {
+                       errChan <- fmt.Errorf("stream query benchmark failed: 
%w", err)
+               }
+       }()
+
+       // Trace query benchmark
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               if err := r.runTraceQueryBenchmark(ctx); err != nil {
+                       errChan <- fmt.Errorf("trace query benchmark failed: 
%w", err)
+               }
+       }()
+
+       // Wait for completion
+       wg.Wait()
+       close(errChan)
+
+       // Check for errors
+       for err := range errChan {
+               if err != nil {
+                       return err
+               }
+       }
+
+       // Generate reports
+       fmt.Println("\n=== Stream Model Query Performance ===")
+       streamReport := r.streamMetrics.GenerateReport("Stream Query", 
r.config.Scale)
+       streamReport.PrintReport()
+
+       fmt.Println("\n=== Trace Model Query Performance ===")
+       traceReport := r.traceMetrics.GenerateReport("Trace Query", 
r.config.Scale)
+       traceReport.PrintReport()
+
+       return nil
+}
+
+// runStreamQueryBenchmark runs stream query benchmarks.
+func (r *BenchmarkRunner) runStreamQueryBenchmark(ctx context.Context) error {
+       // TODO: Implement stream query benchmarks
+       // This would include various query patterns like:
+       // - Time range queries
+       // - Service-based queries
+       // - Error queries
+       // - Latency-based queries
+       fmt.Println("Stream query benchmark not yet implemented")
+       return nil
+}
+
+// runTraceQueryBenchmark runs trace query benchmarks.
+func (r *BenchmarkRunner) runTraceQueryBenchmark(ctx context.Context) error {
+       // TODO: Implement trace query benchmarks
+       // This would include various query patterns like:
+       // - Trace reconstruction queries
+       // - Time range queries
+       // - Service-based queries
+       // - Error queries
+       fmt.Println("Trace query benchmark not yet implemented")
+       return nil
+}
+
+// RunFullBenchmark runs both write and query benchmarks.
+func (r *BenchmarkRunner) RunFullBenchmark(ctx context.Context) error {
+       fmt.Println("Starting full benchmark suite...")
+
+       // Run write benchmarks
+       if err := r.RunWriteBenchmark(ctx); err != nil {
+               return fmt.Errorf("write benchmark failed: %w", err)
+       }
+
+       // Wait a bit between benchmarks
+       time.Sleep(5 * time.Second)
+
+       // Run query benchmarks
+       if err := r.RunQueryBenchmark(ctx); err != nil {
+               return fmt.Errorf("query benchmark failed: %w", err)
+       }
+
+       fmt.Println("Full benchmark suite completed successfully!")
+       return nil
+}
+
+// CompareResults compares performance results between models.
+func (r *BenchmarkRunner) CompareResults() {
+       streamReport := r.streamMetrics.GenerateReport("Stream", r.config.Scale)
+       traceReport := r.traceMetrics.GenerateReport("Trace", r.config.Scale)
+
+       fmt.Println("\n=== Performance Comparison ===")
+       fmt.Printf("Write Throughput Comparison:\n")
+       fmt.Printf("  Stream: %.2f ops/sec\n", 
streamReport.WriteMetrics.Throughput)
+       fmt.Printf("  Trace:  %.2f ops/sec\n", 
traceReport.WriteMetrics.Throughput)
+       fmt.Printf("  Winner: %s (%.1fx faster)\n",
+               r.getWinner(streamReport.WriteMetrics.Throughput, 
traceReport.WriteMetrics.Throughput),
+               r.getSpeedup(streamReport.WriteMetrics.Throughput, 
traceReport.WriteMetrics.Throughput))
+
+       fmt.Printf("\nWrite Latency Comparison (P95):\n")
+       fmt.Printf("  Stream: %v\n", streamReport.WriteMetrics.LatencyP95)
+       fmt.Printf("  Trace:  %v\n", traceReport.WriteMetrics.LatencyP95)
+       fmt.Printf("  Winner: %s (%.1fx faster)\n",
+               r.getLatencyWinner(streamReport.WriteMetrics.LatencyP95, 
traceReport.WriteMetrics.LatencyP95),
+               r.getLatencySpeedup(streamReport.WriteMetrics.LatencyP95, 
traceReport.WriteMetrics.LatencyP95))
+
+       fmt.Printf("\nData Throughput Comparison:\n")
+       fmt.Printf("  Stream: %.2f MB/sec\n", 
streamReport.WriteMetrics.DataThroughput)
+       fmt.Printf("  Trace:  %.2f MB/sec\n", 
traceReport.WriteMetrics.DataThroughput)
+       fmt.Printf("  Winner: %s (%.1fx faster)\n",
+               r.getWinner(streamReport.WriteMetrics.DataThroughput, 
traceReport.WriteMetrics.DataThroughput),
+               r.getSpeedup(streamReport.WriteMetrics.DataThroughput, 
traceReport.WriteMetrics.DataThroughput))
+}
+
+// Helper methods for comparison.
+func (r *BenchmarkRunner) getWinner(stream, trace float64) string {
+       if stream > trace {
+               return "Stream"
+       }
+       return "Trace"
+}
+
+func (r *BenchmarkRunner) getSpeedup(stream, trace float64) float64 {
+       if stream > trace {
+               return stream / trace
+       }
+       return trace / stream
+}
+
+func (r *BenchmarkRunner) getLatencyWinner(stream, trace time.Duration) string 
{
+       if stream < trace {
+               return "Stream"
+       }
+       return "Trace"
+}
+
+func (r *BenchmarkRunner) getLatencySpeedup(stream, trace time.Duration) 
float64 {
+       if stream < trace {
+               return float64(trace) / float64(stream)
+       }
+       return float64(stream) / float64(trace)
+}
diff --git a/test/stress/stream-vs-trace/data_generator.go 
b/test/stress/stream-vs-trace/data_generator.go
new file mode 100644
index 00000000..24a95c65
--- /dev/null
+++ b/test/stress/stream-vs-trace/data_generator.go
@@ -0,0 +1,784 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package streamvstrace
+
+import (
+       "context"
+       "crypto/rand"
+       "fmt"
+       "math"
+       "math/big"
+       "strings"
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+)
+
+// Scale represents the test scale configuration.
+type Scale string
+
+const (
+       // SmallScale represents a small scale test.
+       SmallScale Scale = "small"
+       // MediumScale represents a medium scale test.
+       MediumScale Scale = "medium"
+       // LargeScale represents a large scale test.
+       LargeScale Scale = "large"
+)
+
+// ScaleConfig defines the configuration for each scale.
+type ScaleConfig struct {
+       TotalSpans       int
+       TotalTraces      int
+       Services         int
+       ServiceInstances int
+       TimeRange        time.Duration
+       ErrorRate        float64
+       OperationNames   int
+       Components       int
+}
+
+// GetScaleConfig returns the configuration for a given scale.
+func GetScaleConfig(scale Scale) ScaleConfig {
+       switch scale {
+       case SmallScale:
+               return ScaleConfig{
+                       TotalSpans:       100000,
+                       TotalTraces:      10000,
+                       Services:         100,
+                       ServiceInstances: 500,
+                       TimeRange:        time.Hour,
+                       ErrorRate:        0.05,
+                       OperationNames:   1000,
+                       Components:       50,
+               }
+       case MediumScale:
+               return ScaleConfig{
+                       TotalSpans:       10000000,
+                       TotalTraces:      1000000,
+                       Services:         1000,
+                       ServiceInstances: 5000,
+                       TimeRange:        24 * time.Hour,
+                       ErrorRate:        0.05,
+                       OperationNames:   1000,
+                       Components:       50,
+               }
+       case LargeScale:
+               return ScaleConfig{
+                       TotalSpans:       1000000000,
+                       TotalTraces:      100000000,
+                       Services:         10000,
+                       ServiceInstances: 50000,
+                       TimeRange:        7 * 24 * time.Hour,
+                       ErrorRate:        0.05,
+                       OperationNames:   1000,
+                       Components:       50,
+               }
+       default:
+               return GetScaleConfig(SmallScale)
+       }
+}
+
+// SpanData represents a single span with all its attributes.
+type SpanData struct {
+       StartTime         time.Time
+       ServiceID         string
+       ServiceInstanceID string
+       TraceID           string
+       SpanID            string
+       ParentSpanID      string
+       OperationName     string
+       Component         string
+       DataBinary        []byte
+       Latency           time.Duration
+       IsError           bool
+}
+
+// TraceData represents a complete trace with multiple spans.
+type TraceData struct {
+       TraceID string
+       Spans   []*SpanData
+}
+
+// SpanGenerator generates realistic span data for performance testing.
+type SpanGenerator struct {
+       baseTime     time.Time
+       rand         *big.Int
+       zipfServices *ZipfGenerator
+       zipfOps      *ZipfGenerator
+       zipfComps    *ZipfGenerator
+       services     []string
+       instances    []string
+       operations   []string
+       components   []string
+       config       ScaleConfig
+}
+
+// NewSpanGenerator creates a new span generator with the given scale.
+func NewSpanGenerator(scale Scale) *SpanGenerator {
+       config := GetScaleConfig(scale)
+
+       // Generate service names
+       services := make([]string, config.Services)
+       for i := 0; i < config.Services; i++ {
+               services[i] = fmt.Sprintf("service_%d", i)
+       }
+
+       // Generate service instance names
+       instances := make([]string, config.ServiceInstances)
+       for i := 0; i < config.ServiceInstances; i++ {
+               instances[i] = fmt.Sprintf("instance_%d", i)
+       }
+
+       // Generate operation names
+       operations := make([]string, config.OperationNames)
+       for i := 0; i < config.OperationNames; i++ {
+               operations[i] = fmt.Sprintf("/api/v1/endpoint_%d", i)
+       }
+
+       // Generate component names
+       components := make([]string, config.Components)
+       componentTypes := []string{"http", "mysql", "redis", "kafka", "grpc", 
"dubbo", "spring", "tomcat", "nginx", "postgresql"}
+       for i := 0; i < config.Components; i++ {
+               components[i] = fmt.Sprintf("%s_%d", 
componentTypes[i%len(componentTypes)], i/len(componentTypes))
+       }
+
+       // Create Zipfian generators for realistic distribution
+       zipfServices := NewZipfGenerator(len(services), 1.2) // 80/20 
distribution
+       zipfOps := NewZipfGenerator(len(operations), 1.5)    // More skewed for 
operations
+       zipfComps := NewZipfGenerator(len(components), 1.1)  // Less skewed for 
components
+
+       return &SpanGenerator{
+               config:       config,
+               services:     services,
+               instances:    instances,
+               operations:   operations,
+               components:   components,
+               zipfServices: zipfServices,
+               zipfOps:      zipfOps,
+               zipfComps:    zipfComps,
+               baseTime:     time.Now().Add(-config.TimeRange),
+               rand:         big.NewInt(0),
+       }
+}
+
+// GenerateTrace generates a complete trace with the specified number of spans.
+func (g *SpanGenerator) GenerateTrace(spanCount int) *TraceData {
+       traceID := g.generateTraceID()
+       spans := make([]*SpanData, spanCount)
+
+       // Generate spans with parent-child relationships
+       for i := 0; i < spanCount; i++ {
+               var parentSpanID string
+               if i > 0 {
+                       // Randomly select a parent span (simplified tree 
structure)
+                       parentIdx := g.randomInt(i)
+                       parentSpanID = spans[parentIdx].SpanID
+               }
+
+               spans[i] = g.GenerateSpan(traceID, parentSpanID)
+       }
+
+       return &TraceData{
+               TraceID: traceID,
+               Spans:   spans,
+       }
+}
+
+// GenerateSpan generates a single span.
+func (g *SpanGenerator) GenerateSpan(traceID, parentSpanID string) *SpanData {
+       spanID := g.generateSpanID()
+
+       // Select service and instance using Zipfian distribution
+       serviceIdx := g.zipfServices.Next()
+       instanceIdx := g.randomInt(len(g.instances))
+
+       // Select operation and component using Zipfian distribution
+       opIdx := g.zipfOps.Next()
+       compIdx := g.zipfComps.Next()
+
+       // Generate timing information
+       startTime := g.generateStartTime()
+       latency := g.generateLatency()
+
+       // Determine if this is an error span
+       isError := g.randomFloat() < g.config.ErrorRate
+
+       // Generate data binary (simulated span data)
+       dataBinary := g.generateDataBinary(spanID, isError)
+
+       return &SpanData{
+               ServiceID:         g.services[serviceIdx],
+               ServiceInstanceID: g.instances[instanceIdx],
+               TraceID:           traceID,
+               SpanID:            spanID,
+               ParentSpanID:      parentSpanID,
+               StartTime:         startTime,
+               Latency:           latency,
+               IsError:           isError,
+               OperationName:     g.operations[opIdx],
+               Component:         g.components[compIdx],
+               DataBinary:        dataBinary,
+       }
+}
+
+// GenerateTraces generates multiple traces with realistic span counts.
+func (g *SpanGenerator) GenerateTraces() []*TraceData {
+       traces := make([]*TraceData, g.config.TotalTraces)
+
+       for i := 0; i < g.config.TotalTraces; i++ {
+               // Generate span count based on complexity distribution
+               spanCount := g.generateSpanCount()
+               traces[i] = g.GenerateTrace(spanCount)
+       }
+
+       return traces
+}
+
+// generateSpanCount generates span count based on complexity distribution.
+func (g *SpanGenerator) generateSpanCount() int {
+       rand := g.randomFloat()
+
+       // 20% simple traces (1-5 spans)
+       if rand < 0.2 {
+               return g.randomInt(5) + 1
+       }
+
+       // 60% medium traces (6-20 spans)
+       if rand < 0.8 {
+               return g.randomInt(15) + 6
+       }
+
+       // 20% complex traces (21-100 spans)
+       return g.randomInt(80) + 21
+}
+
+// generateStartTime generates a random start time within the configured time 
range.
+func (g *SpanGenerator) generateStartTime() time.Time {
+       offset := time.Duration(g.randomInt(int(g.config.TimeRange)))
+       // Truncate to millisecond precision for BanyanDB compatibility
+       return g.baseTime.Add(offset).Truncate(time.Millisecond)
+}
+
+// generateLatency generates a realistic latency using exponential 
distribution.
+func (g *SpanGenerator) generateLatency() time.Duration {
+       // Use exponential distribution with mean of 100ms
+       lambda := 0.01 // 1/100ms
+       latencyMs := -math.Log(1-g.randomFloat()) / lambda
+
+       // Cap at 30 seconds
+       if latencyMs > 30000 {
+               latencyMs = 30000
+       }
+
+       return time.Duration(latencyMs) * time.Millisecond
+}
+
+// generateTraceID generates a unique trace ID.
+func (g *SpanGenerator) generateTraceID() string {
+       return fmt.Sprintf("trace_%d_%d", time.Now().UnixNano(), 
g.randomInt(1000000))
+}
+
+// generateSpanID generates a unique span ID.
+func (g *SpanGenerator) generateSpanID() string {
+       return fmt.Sprintf("span_%d_%d", time.Now().UnixNano(), 
g.randomInt(1000000))
+}
+
+// generateDataBinary generates simulated binary data for the span.
+// In SkyWalking, a span represents a collection of all local function 
invocations in a process.
+func (g *SpanGenerator) generateDataBinary(spanID string, isError bool) []byte 
{
+       // Generate a realistic span structure with multiple local function 
calls
+       spanData := g.generateRealisticSpanData(spanID, isError)
+
+       // Serialize to JSON-like binary format (simulating SkyWalking's 
segment format)
+       data := fmt.Sprintf(`{
+               "segmentId": "%s",
+               "serviceId": "%s",
+               "serviceInstanceId": "%s",
+               "spans": %s,
+               "globalTraceIds": ["%s"],
+               "isSizeLimited": false,
+               "segmentSize": %d
+       }`,
+               spanID,
+               g.services[g.randomInt(len(g.services))],
+               g.instances[g.randomInt(len(g.instances))],
+               spanData,
+               g.generateTraceID(),
+               len(spanData))
+
+       return []byte(data)
+}
+
+// generateRealisticSpanData creates a comprehensive span structure with local 
function calls.
+func (g *SpanGenerator) generateRealisticSpanData(_ string, isError bool) 
string {
+       // Generate 3-8 local spans representing function calls within the 
process
+       numLocalSpans := g.randomInt(6) + 3 // 3-8 local spans
+
+       var spans []string
+       spanStartTime := time.Now().UnixNano()
+
+       // Generate the main entry span
+       entrySpan := g.generateEntrySpan(spanStartTime, isError)
+       spans = append(spans, entrySpan)
+
+       // Generate local spans representing function calls
+       for i := 0; i < numLocalSpans-1; i++ {
+               localSpan := g.generateLocalSpan(spanStartTime, i, isError)
+               spans = append(spans, localSpan)
+       }
+
+       return fmt.Sprintf("[%s]", strings.Join(spans, ","))
+}
+
+// generateEntrySpan creates the main entry span for the process.
+func (g *SpanGenerator) generateEntrySpan(startTime int64, isError bool) 
string {
+       operationName := g.operations[g.zipfOps.Next()]
+       component := g.components[g.zipfComps.Next()]
+
+       // Generate realistic timing
+       latency := g.generateLatency()
+       endTime := startTime + latency.Nanoseconds()
+
+       // Generate tags for the entry span
+       tags := g.generateSpanTags(operationName, component, isError)
+
+       // Generate logs for the entry span
+       logs := g.generateSpanLogs(startTime, endTime, isError)
+
+       return fmt.Sprintf(`{
+               "spanId": %d,
+               "parentSpanId": -1,
+               "segmentSpanId": 0,
+               "refs": [],
+               "operationName": "%s",
+               "peer": "localhost:8080",
+               "spanType": "Entry",
+               "spanLayer": "Http",
+               "componentId": %d,
+               "component": "%s",
+               "isError": %t,
+               "tags": %s,
+               "logs": %s,
+               "startTime": %d,
+               "endTime": %d,
+               "endpointName": "%s",
+               "endpointId": %d,
+               "dataBinary": "%s"
+       }`,
+               g.randomInt(1000000),
+               operationName,
+               g.randomInt(100),
+               component,
+               isError,
+               tags,
+               logs,
+               startTime,
+               endTime,
+               operationName,
+               g.randomInt(1000000),
+               g.generateDataBinaryContent(operationName, isError))
+}
+
+// generateLocalSpan creates a local span representing a function call.
+func (g *SpanGenerator) generateLocalSpan(traceStartTime int64, index int, 
isError bool) string {
+       // Generate function names that represent typical local operations
+       functionNames := []string{
+               "processRequest", "validateInput", "parseData", "transformData",
+               "executeBusinessLogic", "saveToDatabase", "sendResponse", 
"cleanup",
+               "authenticateUser", "authorizeRequest", "logActivity", 
"updateCache",
+               "calculateMetrics", "formatOutput", "handleException", 
"retryOperation",
+       }
+
+       functionName := functionNames[g.randomInt(len(functionNames))]
+       component := g.components[g.zipfComps.Next()]
+
+       // Generate timing for local span (nested within the main span)
+       spanOffset := time.Duration(g.randomInt(100)) * time.Millisecond
+       startTime := traceStartTime + spanOffset.Nanoseconds()
+       latency := g.generateLatency()
+       endTime := startTime + latency.Nanoseconds()
+
+       // Generate tags for the local span
+       tags := g.generateLocalSpanTags(functionName, component, isError)
+
+       // Generate logs for the local span
+       logs := g.generateSpanLogs(startTime, endTime, isError)
+
+       return fmt.Sprintf(`{
+               "spanId": %d,
+               "parentSpanId": 0,
+               "segmentSpanId": %d,
+               "refs": [],
+               "operationName": "%s",
+               "peer": "",
+               "spanType": "Local",
+               "spanLayer": "Unknown",
+               "componentId": %d,
+               "component": "%s",
+               "isError": %t,
+               "tags": %s,
+               "logs": %s,
+               "startTime": %d,
+               "endTime": %d,
+               "endpointName": "%s",
+               "endpointId": %d,
+               "dataBinary": "%s"
+       }`,
+               g.randomInt(1000000),
+               index+1,
+               functionName,
+               g.randomInt(100),
+               component,
+               isError,
+               tags,
+               logs,
+               startTime,
+               endTime,
+               functionName,
+               g.randomInt(1000000),
+               g.generateDataBinaryContent(functionName, isError))
+}
+
+// generateSpanTags creates realistic tags for a span.
+func (g *SpanGenerator) generateSpanTags(operationName, component string, 
isError bool) string {
+       tags := []string{
+               fmt.Sprintf(`{"key": "http.method", "value": "%s"}`, 
g.getRandomHTTPMethod()),
+               fmt.Sprintf(`{"key": "http.url", "value": "%s"}`, 
operationName),
+               fmt.Sprintf(`{"key": "component", "value": "%s"}`, component),
+               fmt.Sprintf(`{"key": "layer", "value": "%s"}`, 
g.getRandomLayer()),
+       }
+
+       if isError {
+               tags = append(tags, `{"key": "error", "value": "true"}`, 
fmt.Sprintf(`{"key": "error.message", "value": "%s"}`, 
g.getRandomErrorMessage()))
+       }
+
+       // Add some random business tags
+       businessTags := []string{
+               `{"key": "user.id", "value": "12345"}`,
+               `{"key": "request.id", "value": "req-12345"}`,
+               `{"key": "session.id", "value": "sess-67890"}`,
+               `{"key": "tenant.id", "value": "tenant-001"}`,
+       }
+
+       // Randomly select 2-4 business tags
+       numBusinessTags := g.randomInt(3) + 2
+       for i := 0; i < numBusinessTags && i < len(businessTags); i++ {
+               tags = append(tags, businessTags[i])
+       }
+
+       return fmt.Sprintf("[%s]", strings.Join(tags, ","))
+}
+
+// generateLocalSpanTags creates tags specific to local function calls.
+func (g *SpanGenerator) generateLocalSpanTags(functionName, component string, 
isError bool) string {
+       tags := []string{
+               fmt.Sprintf(`{"key": "function.name", "value": "%s"}`, 
functionName),
+               fmt.Sprintf(`{"key": "component", "value": "%s"}`, component),
+               `{"key": "span.type", "value": "Local"}`,
+               `{"key": "thread.name", "value": "main-thread"}`,
+       }
+
+       if isError {
+               tags = append(tags, `{"key": "error", "value": "true"}`, 
fmt.Sprintf(`{"key": "error.message", "value": "%s"}`, 
g.getRandomErrorMessage()))
+       }
+
+       // Add function-specific tags
+       functionTags := []string{
+               `{"key": "function.line", "value": "42"}`,
+               `{"key": "function.class", "value": "com.example.Service"}`,
+               `{"key": "function.package", "value": "com.example.service"}`,
+       }
+
+       // Randomly select 1-2 function tags
+       numFunctionTags := g.randomInt(2) + 1
+       for i := 0; i < numFunctionTags && i < len(functionTags); i++ {
+               tags = append(tags, functionTags[i])
+       }
+
+       return fmt.Sprintf("[%s]", strings.Join(tags, ","))
+}
+
+// generateSpanLogs creates realistic logs for a span.
+func (g *SpanGenerator) generateSpanLogs(startTime, endTime int64, isError 
bool) string {
+       logs := []string{
+               fmt.Sprintf(`{
+                       "time": %d,
+                       "data": [
+                               {"key": "event", "value": "span_start"},
+                               {"key": "message", "value": "Starting 
operation"}
+                       ]
+               }`, startTime),
+       }
+
+       // Add middle logs
+       middleTime := startTime + (endTime-startTime)/2
+       logs = append(logs, fmt.Sprintf(`{
+               "time": %d,
+               "data": [
+                       {"key": "event", "value": "processing"},
+                       {"key": "message", "value": "Processing request"},
+                       {"key": "progress", "value": "50%%"}
+               ]
+       }`, middleTime))
+
+       // Add end log
+       if isError {
+               logs = append(logs, fmt.Sprintf(`{
+                       "time": %d,
+                       "data": [
+                               {"key": "event", "value": "error"},
+                               {"key": "message", "value": "Operation failed"},
+                               {"key": "error.code", "value": "500"}
+                       ]
+               }`, endTime))
+       } else {
+               logs = append(logs, fmt.Sprintf(`{
+                       "time": %d,
+                       "data": [
+                               {"key": "event", "value": "span_end"},
+                               {"key": "message", "value": "Operation 
completed successfully"}
+                       ]
+               }`, endTime))
+       }
+
+       return fmt.Sprintf("[%s]", strings.Join(logs, ","))
+}
+
+// generateDataBinaryContent creates the actual binary content for the span.
+func (g *SpanGenerator) generateDataBinaryContent(operationName string, 
isError bool) string {
+       status := "success"
+       if isError {
+               status = "error"
+       }
+
+       // Create a more realistic binary content structure
+       content := fmt.Sprintf(`{
+               "operationName": "%s",
+               "status": "%s",
+               "timestamp": %d,
+               "duration": %d,
+               "metadata": {
+                       "version": "1.0",
+                       "source": "skywalking-agent",
+                       "collector": "banyandb"
+               },
+               "metrics": {
+                       "cpu_usage": %.2f,
+                       "memory_usage": %d,
+                       "gc_count": %d,
+                       "thread_count": %d
+               },
+               "context": {
+                       "trace_id": "%s",
+                       "span_id": "%s",
+                       "parent_span_id": "%s"
+               }
+       }`,
+               operationName,
+               status,
+               time.Now().UnixNano(),
+               g.randomInt(1000),
+               g.randomFloat()*100,
+               g.randomInt(1000000),
+               g.randomInt(100),
+               g.randomInt(50),
+               g.generateTraceID(),
+               g.generateSpanID(),
+               g.generateSpanID())
+
+       return content
+}
+
+// Helper functions for generating random values.
+func (g *SpanGenerator) getRandomHTTPMethod() string {
+       methods := []string{"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", 
"OPTIONS"}
+       return methods[g.randomInt(len(methods))]
+}
+
+func (g *SpanGenerator) getRandomLayer() string {
+       layers := []string{"Http", "Database", "Cache", "MQ", "RPC", "Unknown"}
+       return layers[g.randomInt(len(layers))]
+}
+
+func (g *SpanGenerator) getRandomErrorMessage() string {
+       errors := []string{
+               "Database connection failed",
+               "Invalid input parameters",
+               "Service unavailable",
+               "Timeout occurred",
+               "Authentication failed",
+               "Resource not found",
+               "Internal server error",
+               "Network timeout",
+       }
+       return errors[g.randomInt(len(errors))]
+}
+
+// randomInt generates a random integer between 0 and maxVal-1.
+func (g *SpanGenerator) randomInt(maxVal int) int {
+       if maxVal <= 0 {
+               return 0
+       }
+
+       // Use crypto/rand for better randomness
+       n, _ := rand.Int(rand.Reader, big.NewInt(int64(maxVal)))
+       return int(n.Int64())
+}
+
+// randomFloat generates a random float between 0 and 1.
+func (g *SpanGenerator) randomFloat() float64 {
+       n := g.randomInt(1000000)
+       return float64(n) / 1000000.0
+}
+
+// ToStreamWriteRequest converts a SpanData to a BanyanDB Stream write request.
+func (s *SpanData) ToStreamWriteRequest() *streamv1.WriteRequest {
+       var isError int64
+       if s.IsError {
+               isError = 1
+       }
+
+       return &streamv1.WriteRequest{
+               Metadata: &commonv1.Metadata{
+                       Name:  "segment_stream",
+                       Group: "stream_performance_test",
+               },
+               Element: &streamv1.ElementValue{
+                       ElementId: s.SpanID,
+                       Timestamp: 
timestamppb.New(s.StartTime.Truncate(time.Millisecond)),
+                       TagFamilies: []*modelv1.TagFamilyForWrite{
+                               {
+                                       // Primary tag family
+                                       Tags: []*modelv1.TagValue{
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.ServiceID}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.ServiceInstanceID}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.TraceID}}},
+                                               {Value: 
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: s.StartTime.UnixMilli()}}},
+                                               {Value: 
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: s.Latency.Milliseconds()}}},
+                                               {Value: 
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: isError}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.SpanID}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.ParentSpanID}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.OperationName}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.Component}}},
+                                       },
+                               },
+                               {
+                                       // Data binary tag family
+                                       Tags: []*modelv1.TagValue{
+                                               {Value: 
&modelv1.TagValue_BinaryData{BinaryData: s.DataBinary}},
+                                       },
+                               },
+                       },
+               },
+       }
+}
+
+// ToTraceWriteRequest converts a SpanData to a BanyanDB Trace write request.
+func (s *SpanData) ToTraceWriteRequest() *tracev1.WriteRequest {
+       var isError int64
+       if s.IsError {
+               isError = 1
+       }
+
+       return &tracev1.WriteRequest{
+               Metadata: &commonv1.Metadata{
+                       Name:  "segment_trace",
+                       Group: "trace_performance_test",
+               },
+               Tags: []*modelv1.TagValue{
+                       {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.ServiceID}}},
+                       {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.ServiceInstanceID}}},
+                       {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.TraceID}}},
+                       {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 
s.StartTime.UnixMilli()}}},
+                       {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 
s.Latency.Milliseconds()}}},
+                       {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 
isError}}},
+                       {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.SpanID}}},
+                       {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.ParentSpanID}}},
+                       {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.OperationName}}},
+                       {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.Component}}},
+                       {Value: &modelv1.TagValue_BinaryData{BinaryData: 
s.DataBinary}},
+               },
+               Span: s.DataBinary, // For trace model, span data goes in the 
span field
+       }
+}
+
+// WriteStreamData writes span data to the stream service.
+func (c *StreamClient) WriteStreamData(ctx context.Context, spans []*SpanData) 
error {
+       stream, err := c.Write(ctx, nil)
+       if err != nil {
+               return fmt.Errorf("failed to create stream write client: %w", 
err)
+       }
+
+       for i, span := range spans {
+               req := span.ToStreamWriteRequest()
+               req.MessageId = uint64(i + 1)
+
+               if sendErr := stream.Send(req); sendErr != nil {
+                       return fmt.Errorf("failed to send stream write request: 
%w", sendErr)
+               }
+       }
+
+       // Close send and wait for final response
+       if closeErr := stream.CloseSend(); closeErr != nil {
+               return fmt.Errorf("failed to close stream send: %w", closeErr)
+       }
+
+       // Wait for final response
+       _, err = stream.Recv()
+       if err != nil && err.Error() != "EOF" {
+               return fmt.Errorf("failed to receive final response: %w", err)
+       }
+
+       return nil
+}
+
+// WriteTraceData writes span data to the trace service.
+func (c *TraceClient) WriteTraceData(ctx context.Context, spans []*SpanData) 
error {
+       stream, err := c.Write(ctx, nil)
+       if err != nil {
+               return fmt.Errorf("failed to create trace write client: %w", 
err)
+       }
+
+       for i, span := range spans {
+               req := span.ToTraceWriteRequest()
+               req.Version = uint64(i + 1)
+
+               if sendErr := stream.Send(req); sendErr != nil {
+                       return fmt.Errorf("failed to send trace write request: 
%w", sendErr)
+               }
+       }
+
+       // Close send and wait for final response
+       if closeErr := stream.CloseSend(); closeErr != nil {
+               return fmt.Errorf("failed to close trace send: %w", closeErr)
+       }
+
+       // Wait for final response
+       _, err = stream.Recv()
+       if err != nil && err.Error() != "EOF" {
+               return fmt.Errorf("failed to receive final response: %w", err)
+       }
+
+       return nil
+}
diff --git a/test/stress/stream-vs-trace/distribution.go 
b/test/stress/stream-vs-trace/distribution.go
new file mode 100644
index 00000000..6570b05a
--- /dev/null
+++ b/test/stress/stream-vs-trace/distribution.go
@@ -0,0 +1,183 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package streamvstrace
+
+import (
+       "crypto/rand"
+       "math"
+       "math/big"
+)
+
+// ZipfGenerator generates values following a Zipfian distribution.
+// This is useful for simulating realistic data patterns where some values
+// are much more common than others (e.g., 80/20 rule).
+type ZipfGenerator struct {
+       n    int     // number of possible values
+       s    float64 // skewness parameter (s > 1, higher = more skewed)
+       zeta float64 // normalization constant
+}
+
+// NewZipfGenerator creates a new Zipfian generator
+// n: number of possible values (0 to n-1)
+// s: skewness parameter (s > 1, higher values = more skewed distribution)
+func NewZipfGenerator(n int, s float64) *ZipfGenerator {
+       if s <= 1.0 {
+               s = 1.1 // minimum valid value
+       }
+
+       // Calculate normalization constant (Riemann zeta function)
+       zeta := 0.0
+       for i := 1; i <= n; i++ {
+               zeta += 1.0 / math.Pow(float64(i), s)
+       }
+
+       return &ZipfGenerator{
+               n:    n,
+               s:    s,
+               zeta: zeta,
+       }
+}
+
+// Next generates the next Zipfian-distributed value.
+func (z *ZipfGenerator) Next() int {
+       if z.n <= 0 {
+               return 0
+       }
+
+       // Generate random value between 0 and 1
+       randVal := z.randomFloat()
+
+       // Find the value using inverse transform sampling
+       cumulative := 0.0
+       for i := 1; i <= z.n; i++ {
+               probability := (1.0 / math.Pow(float64(i), z.s)) / z.zeta
+               cumulative += probability
+
+               if randVal <= cumulative {
+                       return i - 1 // Convert to 0-based index
+               }
+       }
+
+       // Fallback (shouldn't happen with proper math)
+       return z.n - 1
+}
+
+// randomFloat generates a random float between 0 and 1.
+func (z *ZipfGenerator) randomFloat() float64 {
+       n, _ := rand.Int(rand.Reader, big.NewInt(1000000))
+       return float64(n.Int64()) / 1000000.0
+}
+
+// UniformGenerator generates values following a uniform distribution.
+type UniformGenerator struct {
+       min int
+       max int
+}
+
+// NewUniformGenerator creates a new uniform generator.
+func NewUniformGenerator(minVal, maxVal int) *UniformGenerator {
+       if minVal >= maxVal {
+               maxVal = minVal + 1
+       }
+       return &UniformGenerator{min: minVal, max: maxVal}
+}
+
+// Next generates the next uniformly distributed value.
+func (u *UniformGenerator) Next() int {
+       if u.min >= u.max {
+               return u.min
+       }
+
+       n, _ := rand.Int(rand.Reader, big.NewInt(int64(u.max-u.min)))
+       return u.min + int(n.Int64())
+}
+
+// ExponentialGenerator generates values following an exponential distribution.
+// This is useful for generating realistic latency values.
+type ExponentialGenerator struct {
+       lambda float64 // rate parameter (1/mean)
+}
+
+// NewExponentialGenerator creates a new exponential generator
+// mean: the mean value of the distribution
+func NewExponentialGenerator(mean float64) *ExponentialGenerator {
+       if mean <= 0 {
+               mean = 1.0
+       }
+       return &ExponentialGenerator{lambda: 1.0 / mean}
+}
+
+// Next generates the next exponentially distributed value.
+func (e *ExponentialGenerator) Next() float64 {
+       randVal := e.randomFloat()
+       if randVal >= 1.0 {
+               randVal = 0.999999 // Avoid log(0)
+       }
+       return -math.Log(1.0-randVal) / e.lambda
+}
+
+// randomFloat generates a random float between 0 and 1.
+func (e *ExponentialGenerator) randomFloat() float64 {
+       n, _ := rand.Int(rand.Reader, big.NewInt(1000000))
+       return float64(n.Int64()) / 1000000.0
+}
+
+// NormalGenerator generates values following a normal (Gaussian) distribution.
+type NormalGenerator struct {
+       mean    float64
+       stddev  float64
+       hasNext bool
+       nextVal float64
+}
+
+// NewNormalGenerator creates a new normal generator.
+func NewNormalGenerator(mean, stddev float64) *NormalGenerator {
+       return &NormalGenerator{
+               mean:    mean,
+               stddev:  stddev,
+               hasNext: false,
+       }
+}
+
+// Next generates the next normally distributed value using Box-Muller 
transform.
+func (n *NormalGenerator) Next() float64 {
+       if n.hasNext {
+               n.hasNext = false
+               return n.nextVal
+       }
+
+       // Generate two independent uniform random variables
+       u1 := n.randomFloat()
+       u2 := n.randomFloat()
+
+       // Box-Muller transform
+       z0 := math.Sqrt(-2.0*math.Log(u1)) * math.Cos(2.0*math.Pi*u2)
+       z1 := math.Sqrt(-2.0*math.Log(u1)) * math.Sin(2.0*math.Pi*u2)
+
+       // Store the second value for next call
+       n.nextVal = n.mean + n.stddev*z1
+       n.hasNext = true
+
+       return n.mean + n.stddev*z0
+}
+
+// randomFloat generates a random float between 0 and 1.
+func (n *NormalGenerator) randomFloat() float64 {
+       randVal, _ := rand.Int(rand.Reader, big.NewInt(1000000))
+       return float64(randVal.Int64()) / 1000000.0
+}
diff --git a/test/stress/stream-vs-trace/metrics.go 
b/test/stress/stream-vs-trace/metrics.go
new file mode 100644
index 00000000..5c61e4b2
--- /dev/null
+++ b/test/stress/stream-vs-trace/metrics.go
@@ -0,0 +1,350 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package streamvstrace
+
+import (
+       "fmt"
+       "sort"
+       "sync"
+       "time"
+)
+
+// PerformanceMetrics collects and analyzes performance data.
+type PerformanceMetrics struct {
+       StartTime      time.Time
+       WriteLatencies []time.Duration
+       QueryLatencies []time.Duration
+       WriteCount     int64
+       QueryCount     int64
+       ErrorCount     int64
+       DataSize       int64
+       CompressedSize int64
+       WriteDuration  time.Duration
+       QueryDuration  time.Duration
+       mu             sync.RWMutex
+}
+
+// NewPerformanceMetrics creates a new metrics collector.
+func NewPerformanceMetrics() *PerformanceMetrics {
+       return &PerformanceMetrics{
+               WriteLatencies: make([]time.Duration, 0, 1000),
+               QueryLatencies: make([]time.Duration, 0, 1000),
+               StartTime:      time.Now(),
+       }
+}
+
+// RecordWrite records a write operation.
+func (m *PerformanceMetrics) RecordWrite(duration time.Duration, dataSize 
int64) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       m.WriteCount++
+       m.WriteDuration += duration
+       m.WriteLatencies = append(m.WriteLatencies, duration)
+       m.DataSize += dataSize
+}
+
+// RecordQuery records a query operation.
+func (m *PerformanceMetrics) RecordQuery(duration time.Duration) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       m.QueryCount++
+       m.QueryDuration += duration
+       m.QueryLatencies = append(m.QueryLatencies, duration)
+}
+
+// RecordError records an error.
+func (m *PerformanceMetrics) RecordError() {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       m.ErrorCount++
+}
+
+// RecordCompression records compression data.
+func (m *PerformanceMetrics) RecordCompression(originalSize, compressedSize 
int64) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       m.CompressedSize += compressedSize
+}
+
+// GetWriteThroughput calculates writes per second.
+func (m *PerformanceMetrics) GetWriteThroughput() float64 {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       if m.WriteDuration == 0 {
+               return 0
+       }
+
+       return float64(m.WriteCount) / m.WriteDuration.Seconds()
+}
+
+// GetQueryThroughput calculates queries per second.
+func (m *PerformanceMetrics) GetQueryThroughput() float64 {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       if m.QueryDuration == 0 {
+               return 0
+       }
+
+       return float64(m.QueryCount) / m.QueryDuration.Seconds()
+}
+
+// GetDataThroughput calculates data throughput in MB/s.
+func (m *PerformanceMetrics) GetDataThroughput() float64 {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       if m.WriteDuration == 0 {
+               return 0
+       }
+
+       mbPerSecond := float64(m.DataSize) / (1024 * 1024) / 
m.WriteDuration.Seconds()
+       return mbPerSecond
+}
+
+// GetCompressionRatio calculates the compression ratio.
+func (m *PerformanceMetrics) GetCompressionRatio() float64 {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       if m.DataSize == 0 {
+               return 0
+       }
+
+       return float64(m.CompressedSize) / float64(m.DataSize)
+}
+
+// GetWriteLatencyPercentiles calculates latency percentiles for writes.
+func (m *PerformanceMetrics) GetWriteLatencyPercentiles() LatencyPercentiles {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       return calculatePercentiles(m.WriteLatencies)
+}
+
+// GetQueryLatencyPercentiles calculates latency percentiles for queries.
+func (m *PerformanceMetrics) GetQueryLatencyPercentiles() LatencyPercentiles {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       return calculatePercentiles(m.QueryLatencies)
+}
+
+// GetErrorRate calculates the error rate.
+func (m *PerformanceMetrics) GetErrorRate() float64 {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       total := m.WriteCount + m.QueryCount
+       if total == 0 {
+               return 0
+       }
+
+       return float64(m.ErrorCount) / float64(total)
+}
+
+// LatencyPercentiles represents latency distribution percentiles.
+type LatencyPercentiles struct {
+       P50  time.Duration
+       P95  time.Duration
+       P99  time.Duration
+       P999 time.Duration
+       Min  time.Duration
+       Max  time.Duration
+       Mean time.Duration
+}
+
+// calculatePercentiles calculates percentiles from a slice of durations.
+func calculatePercentiles(durations []time.Duration) LatencyPercentiles {
+       if len(durations) == 0 {
+               return LatencyPercentiles{}
+       }
+
+       // Create a copy and sort
+       sorted := make([]time.Duration, len(durations))
+       copy(sorted, durations)
+       sort.Slice(sorted, func(i, j int) bool {
+               return sorted[i] < sorted[j]
+       })
+
+       // Calculate percentiles
+       p50 := sorted[int(float64(len(sorted))*0.5)]
+       p95 := sorted[int(float64(len(sorted))*0.95)]
+       p99 := sorted[int(float64(len(sorted))*0.99)]
+       p999 := sorted[int(float64(len(sorted))*0.999)]
+
+       // Calculate mean
+       var sum time.Duration
+       for _, d := range sorted {
+               sum += d
+       }
+       mean := sum / time.Duration(len(sorted))
+
+       return LatencyPercentiles{
+               P50:  p50,
+               P95:  p95,
+               P99:  p99,
+               P999: p999,
+               Min:  sorted[0],
+               Max:  sorted[len(sorted)-1],
+               Mean: mean,
+       }
+}
+
+// PerformanceReport represents a comprehensive performance report.
+type PerformanceReport struct {
+       TestName       string
+       Scale          Scale
+       Duration       time.Duration
+       WriteMetrics   WriteMetrics
+       QueryMetrics   QueryMetrics
+       StorageMetrics StorageMetrics
+       ErrorMetrics   ErrorMetrics
+}
+
+// WriteMetrics contains write performance data.
+type WriteMetrics struct {
+       Count          int64
+       Throughput     float64 // operations per second
+       DataThroughput float64 // MB per second
+       LatencyP50     time.Duration
+       LatencyP95     time.Duration
+       LatencyP99     time.Duration
+       LatencyP999    time.Duration
+       LatencyMin     time.Duration
+       LatencyMax     time.Duration
+       LatencyMean    time.Duration
+}
+
+// QueryMetrics contains query performance data.
+type QueryMetrics struct {
+       Count       int64
+       Throughput  float64 // operations per second
+       LatencyP50  time.Duration
+       LatencyP95  time.Duration
+       LatencyP99  time.Duration
+       LatencyP999 time.Duration
+       LatencyMin  time.Duration
+       LatencyMax  time.Duration
+       LatencyMean time.Duration
+}
+
+// StorageMetrics contains storage efficiency data.
+type StorageMetrics struct {
+       OriginalSize     int64
+       CompressedSize   int64
+       CompressionRatio float64
+}
+
+// ErrorMetrics contains error statistics.
+type ErrorMetrics struct {
+       Count     int64
+       ErrorRate float64
+}
+
+// GenerateReport creates a comprehensive performance report.
+func (m *PerformanceMetrics) GenerateReport(testName string, scale Scale) 
PerformanceReport {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       writePercentiles := calculatePercentiles(m.WriteLatencies)
+       queryPercentiles := calculatePercentiles(m.QueryLatencies)
+
+       return PerformanceReport{
+               TestName: testName,
+               Scale:    scale,
+               Duration: time.Since(m.StartTime),
+               WriteMetrics: WriteMetrics{
+                       Count:          m.WriteCount,
+                       Throughput:     m.GetWriteThroughput(),
+                       DataThroughput: m.GetDataThroughput(),
+                       LatencyP50:     writePercentiles.P50,
+                       LatencyP95:     writePercentiles.P95,
+                       LatencyP99:     writePercentiles.P99,
+                       LatencyP999:    writePercentiles.P999,
+                       LatencyMin:     writePercentiles.Min,
+                       LatencyMax:     writePercentiles.Max,
+                       LatencyMean:    writePercentiles.Mean,
+               },
+               QueryMetrics: QueryMetrics{
+                       Count:       m.QueryCount,
+                       Throughput:  m.GetQueryThroughput(),
+                       LatencyP50:  queryPercentiles.P50,
+                       LatencyP95:  queryPercentiles.P95,
+                       LatencyP99:  queryPercentiles.P99,
+                       LatencyP999: queryPercentiles.P999,
+                       LatencyMin:  queryPercentiles.Min,
+                       LatencyMax:  queryPercentiles.Max,
+                       LatencyMean: queryPercentiles.Mean,
+               },
+               StorageMetrics: StorageMetrics{
+                       OriginalSize:     m.DataSize,
+                       CompressedSize:   m.CompressedSize,
+                       CompressionRatio: m.GetCompressionRatio(),
+               },
+               ErrorMetrics: ErrorMetrics{
+                       Count:     m.ErrorCount,
+                       ErrorRate: m.GetErrorRate(),
+               },
+       }
+}
+
+// PrintReport prints a formatted performance report.
+func (r PerformanceReport) PrintReport() {
+       fmt.Printf("\n=== Performance Report: %s ===\n", r.TestName)
+       fmt.Printf("Scale: %s\n", r.Scale)
+       fmt.Printf("Duration: %v\n", r.Duration)
+
+       fmt.Printf("\n--- Write Performance ---\n")
+       fmt.Printf("Count: %d\n", r.WriteMetrics.Count)
+       fmt.Printf("Throughput: %.2f ops/sec\n", r.WriteMetrics.Throughput)
+       fmt.Printf("Data Throughput: %.2f MB/sec\n", 
r.WriteMetrics.DataThroughput)
+       fmt.Printf("Latency P50: %v\n", r.WriteMetrics.LatencyP50)
+       fmt.Printf("Latency P95: %v\n", r.WriteMetrics.LatencyP95)
+       fmt.Printf("Latency P99: %v\n", r.WriteMetrics.LatencyP99)
+       fmt.Printf("Latency P999: %v\n", r.WriteMetrics.LatencyP999)
+       fmt.Printf("Latency Min: %v\n", r.WriteMetrics.LatencyMin)
+       fmt.Printf("Latency Max: %v\n", r.WriteMetrics.LatencyMax)
+       fmt.Printf("Latency Mean: %v\n", r.WriteMetrics.LatencyMean)
+
+       fmt.Printf("\n--- Query Performance ---\n")
+       fmt.Printf("Count: %d\n", r.QueryMetrics.Count)
+       fmt.Printf("Throughput: %.2f ops/sec\n", r.QueryMetrics.Throughput)
+       fmt.Printf("Latency P50: %v\n", r.QueryMetrics.LatencyP50)
+       fmt.Printf("Latency P95: %v\n", r.QueryMetrics.LatencyP95)
+       fmt.Printf("Latency P99: %v\n", r.QueryMetrics.LatencyP99)
+       fmt.Printf("Latency P999: %v\n", r.QueryMetrics.LatencyP999)
+       fmt.Printf("Latency Min: %v\n", r.QueryMetrics.LatencyMin)
+       fmt.Printf("Latency Max: %v\n", r.QueryMetrics.LatencyMax)
+       fmt.Printf("Latency Mean: %v\n", r.QueryMetrics.LatencyMean)
+
+       fmt.Printf("\n--- Storage Efficiency ---\n")
+       fmt.Printf("Original Size: %d bytes\n", r.StorageMetrics.OriginalSize)
+       fmt.Printf("Compressed Size: %d bytes\n", 
r.StorageMetrics.CompressedSize)
+       fmt.Printf("Compression Ratio: %.2f%%\n", 
r.StorageMetrics.CompressionRatio*100)
+
+       fmt.Printf("\n--- Error Statistics ---\n")
+       fmt.Printf("Error Count: %d\n", r.ErrorMetrics.Count)
+       fmt.Printf("Error Rate: %.2f%%\n", r.ErrorMetrics.ErrorRate*100)
+
+       fmt.Printf("\n================================\n")
+}
diff --git a/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go 
b/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go
index b3f24f68..250beabc 100644
--- a/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go
+++ b/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go
@@ -82,19 +82,29 @@ var _ = g.Describe("Stream vs Trace Performance", func() {
                // Run performance tests
                g.By("Running Stream vs Trace performance comparison")
 
-               // TODO: Implement actual performance tests
-               // This is a placeholder for the performance test implementation
-               fmt.Println("Schema setup completed successfully!")
-               fmt.Println("Stream group: stream_performance_test")
-               fmt.Println("Trace group: trace_performance_test")
-               fmt.Println("Ready to run performance tests...")
+               // Create clients
+               streamClient := NewStreamClient(conn)
+               traceClient := NewTraceClient(conn)
 
-               // Verify schemas are created
+               // Create context for operations
                ctx := context.Background()
 
+               // Create benchmark runner with small scale for testing
+               config := DefaultBenchmarkConfig(SmallScale)
+               config.TestDuration = 2 * time.Minute // Shorter duration for 
testing
+               config.Concurrency = 5                // Lower concurrency for 
testing
+
+               benchmarkRunner := NewBenchmarkRunner(config, streamClient, 
traceClient)
+
+               // Run write benchmark
+               err = benchmarkRunner.RunWriteBenchmark(ctx)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               // Compare results
+               benchmarkRunner.CompareResults()
+
+               // Verify schemas are created
                // Test basic connectivity
-               streamClient := NewStreamClient(conn)
-               traceClient := NewTraceClient(conn)
 
                // Verify stream group
                streamGroupExists, err := streamClient.VerifyGroup(ctx, 
"stream_performance_test")

Reply via email to