This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 5741ebd0 Enhance query access logging across gRPC services (#755)
5741ebd0 is described below

commit 5741ebd00bed3705eb286b0c5d1129a2246b0612
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Sep 5 20:52:23 2025 +0800

    Enhance query access logging across gRPC services (#755)
---
 banyand/liaison/grpc/measure.go              |  27 +++--
 banyand/liaison/grpc/property.go             |  14 +--
 banyand/liaison/grpc/stream.go               |  14 +--
 banyand/liaison/grpc/trace.go                |  14 +--
 banyand/trace/query.go                       |   3 +
 pkg/accesslog/accesslog.go                   |   4 +
 pkg/accesslog/file.go                        |  51 +++++++--
 pkg/accesslog/{accesslog.go => query_log.go} |  30 ++++--
 pkg/accesslog/query_log_test.go              | 148 +++++++++++++++++++++++++++
 9 files changed, 263 insertions(+), 42 deletions(-)

diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index b3d6912b..2f9eafba 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -278,19 +278,21 @@ func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest
        }
        start := time.Now()
        defer func() {
+               duration := time.Since(start)
                for _, g := range req.Groups {
                        ms.metrics.totalFinished.Inc(1, g, "measure", "query")
                        if err != nil {
                                ms.metrics.totalErr.Inc(1, g, "measure", 
"query")
                        }
-                       
ms.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "measure", "query")
+                       ms.metrics.totalLatency.Inc(duration.Seconds(), g, 
"measure", "query")
                }
-       }()
-       if ms.queryAccessLog != nil {
-               if errAccessLog := ms.queryAccessLog.Write(req); errAccessLog 
!= nil {
-                       ms.l.Error().Err(errAccessLog).Msg("query access log 
error")
+               // Log query with timing information at the end
+               if ms.queryAccessLog != nil {
+                       if errAccessLog := 
ms.queryAccessLog.WriteQuery("measure", start, duration, req, err); 
errAccessLog != nil {
+                               ms.l.Error().Err(errAccessLog).Msg("query 
access log error")
+                       }
                }
-       }
+       }()
        if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", req.GetTimeRange(), err)
        }
@@ -331,11 +333,16 @@ func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest
 }
 
 func (ms *measureService) TopN(ctx context.Context, topNRequest 
*measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
-       if ms.queryAccessLog != nil {
-               if errAccessLog := ms.queryAccessLog.Write(topNRequest); 
errAccessLog != nil {
-                       ms.l.Error().Err(errAccessLog).Msg("query access log 
error")
+       start := time.Now()
+       defer func() {
+               duration := time.Since(start)
+               // Log query with timing information at the end
+               if ms.queryAccessLog != nil {
+                       if errAccessLog := 
ms.queryAccessLog.WriteQuery("measure", start, duration, topNRequest, err); 
errAccessLog != nil {
+                               ms.l.Error().Err(errAccessLog).Msg("query 
access log error")
+                       }
                }
-       }
+       }()
        if err = timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err != 
nil {
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", topNRequest.GetTimeRange(), err)
        }
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index 99880b86..9f68570c 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -415,17 +415,19 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
        ps.metrics.totalStarted.Inc(1, "", "property", "query")
        start := time.Now()
        defer func() {
+               duration := time.Since(start)
                ps.metrics.totalFinished.Inc(1, "", "property", "query")
-               ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), "", 
"property", "query")
+               ps.metrics.totalLatency.Inc(duration.Seconds(), "", "property", 
"query")
                if err != nil {
                        ps.metrics.totalErr.Inc(1, "", "property", "query")
                }
-       }()
-       if ps.queryAccessLog != nil {
-               if errAccessLog := ps.queryAccessLog.Write(req); errAccessLog 
!= nil {
-                       ps.log.Error().Err(errAccessLog).Msg("query access log 
error")
+               // Log query with timing information at the end
+               if ps.queryAccessLog != nil {
+                       if errAccessLog := 
ps.queryAccessLog.WriteQuery("property", start, duration, req, err); 
errAccessLog != nil {
+                               ps.log.Error().Err(errAccessLog).Msg("query 
access log error")
+                       }
                }
-       }
+       }()
        if len(req.Groups) == 0 {
                return nil, schema.BadRequest("groups", "groups should not be 
empty")
        }
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index e8e65d13..5cc5a46a 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -256,19 +256,21 @@ func (s *streamService) Query(ctx context.Context, req 
*streamv1.QueryRequest) (
        }
        start := time.Now()
        defer func() {
+               duration := time.Since(start)
                for _, g := range req.Groups {
                        s.metrics.totalFinished.Inc(1, g, "stream", "query")
                        if err != nil {
                                s.metrics.totalErr.Inc(1, g, "stream", "query")
                        }
-                       s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
g, "stream", "query")
+                       s.metrics.totalLatency.Inc(duration.Seconds(), g, 
"stream", "query")
                }
-       }()
-       if s.queryAccessLog != nil {
-               if errAccessLog := s.queryAccessLog.Write(req); errAccessLog != 
nil {
-                       s.l.Error().Err(errAccessLog).Msg("query access log 
error")
+               // Log query with timing information at the end
+               if s.queryAccessLog != nil {
+                       if errAccessLog := 
s.queryAccessLog.WriteQuery("stream", start, duration, req, err); errAccessLog 
!= nil {
+                               s.l.Error().Err(errAccessLog).Msg("query access 
log error")
+                       }
                }
-       }
+       }()
        timeRange := req.GetTimeRange()
        if timeRange == nil {
                req.TimeRange = timestamp.DefaultTimeRange
diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go
index 7f265c82..b2105ead 100644
--- a/banyand/liaison/grpc/trace.go
+++ b/banyand/liaison/grpc/trace.go
@@ -319,19 +319,21 @@ func (s *traceService) Query(ctx context.Context, req 
*tracev1.QueryRequest) (re
        }
        start := time.Now()
        defer func() {
+               duration := time.Since(start)
                for _, g := range req.Groups {
                        s.metrics.totalFinished.Inc(1, g, "trace", "query")
                        if err != nil {
                                s.metrics.totalErr.Inc(1, g, "trace", "query")
                        }
-                       s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
g, "trace", "query")
+                       s.metrics.totalLatency.Inc(duration.Seconds(), g, 
"trace", "query")
                }
-       }()
-       if s.queryAccessLog != nil {
-               if errAccessLog := s.queryAccessLog.Write(req); errAccessLog != 
nil {
-                       s.l.Error().Err(errAccessLog).Msg("query access log 
error")
+               // Log query with timing information at the end
+               if s.queryAccessLog != nil {
+                       if errAccessLog := s.queryAccessLog.WriteQuery("trace", 
start, duration, req, err); errAccessLog != nil {
+                               s.l.Error().Err(errAccessLog).Msg("query access 
log error")
+                       }
                }
-       }
+       }()
        timeRange := req.GetTimeRange()
        if timeRange == nil {
                req.TimeRange = timestamp.DefaultTimeRange
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index ffdcf699..c2c586da 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -52,6 +52,9 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
        if tqo.TimeRange == nil {
                return nil, errors.New("invalid query options: timeRange are 
required")
        }
+       if len(tqo.TraceIDs) == 0 && tqo.Order == nil {
+               return nil, errors.New("invalid query options: either traceIDs 
or order must be specified")
+       }
        var tsdb storage.TSDB[*tsTable, option]
        var err error
        db := t.tsdb.Load()
diff --git a/pkg/accesslog/accesslog.go b/pkg/accesslog/accesslog.go
index 4363fab6..4ac2d90c 100644
--- a/pkg/accesslog/accesslog.go
+++ b/pkg/accesslog/accesslog.go
@@ -19,6 +19,8 @@
 package accesslog
 
 import (
+       "time"
+
        "google.golang.org/protobuf/proto"
 )
 
@@ -26,6 +28,8 @@ import (
 type Log interface {
        // Write writes the access log.
        Write(req proto.Message) error
+       // WriteQuery writes the query access log with timing information.
+       WriteQuery(service string, startTime time.Time, duration time.Duration, 
req proto.Message, err error) error
        // Close closes the access log.
        Close() error
 }
diff --git a/pkg/accesslog/file.go b/pkg/accesslog/file.go
index 2ef934af..a59ac7cb 100644
--- a/pkg/accesslog/file.go
+++ b/pkg/accesslog/file.go
@@ -19,6 +19,7 @@ package accesslog
 
 import (
        "bytes"
+       "encoding/json"
        "fmt"
        "os"
        "path"
@@ -40,7 +41,7 @@ const (
 
 type fileLog struct {
        file          *os.File // Single file for all goroutines
-       validRequests chan proto.Message
+       validRequests chan interface{}
        closer        *run.Closer
        sampled       bool
 }
@@ -49,12 +50,12 @@ type fileLog struct {
 // sampled: if true (default), requests may be dropped when the channel is 
full.
 // If false, requests are never dropped but use buffered channel to prevent 
blocking.
 func NewFileLog(root, template string, interval time.Duration, log 
*logger.Logger, sampled bool) (Log, error) {
-       var validRequests chan proto.Message
+       var validRequests chan interface{}
        if sampled {
-               validRequests = make(chan proto.Message, 100)
+               validRequests = make(chan interface{}, 100)
        } else {
                // For non-sampled mode, use buffered channel to prevent 
blocking on writes
-               validRequests = make(chan proto.Message, 1000) // Buffer to 
handle burst writes
+               validRequests = make(chan interface{}, 1000) // Buffer to 
handle burst writes
        }
 
        f := &fileLog{
@@ -92,6 +93,27 @@ func (f *fileLog) Write(req proto.Message) error {
        return nil
 }
 
+func (f *fileLog) WriteQuery(service string, startTime time.Time, duration 
time.Duration, req proto.Message, err error) error {
+       if f == nil {
+               return nil
+       }
+
+       queryEntry := NewQueryLogEntry(service, startTime, duration, req, err)
+
+       if f.sampled {
+               // Sampled mode: may drop requests if channel is full
+               select {
+               case f.validRequests <- queryEntry:
+               default:
+                       return fmt.Errorf("access log is full")
+               }
+       } else {
+               // Non-sampled mode: never drop requests, block until buffer 
has space
+               f.validRequests <- queryEntry
+       }
+       return nil
+}
+
 func (f *fileLog) Close() error {
        if f == nil {
                return nil
@@ -118,7 +140,7 @@ func startConsumer(f *fileLog, root, template string, 
interval time.Duration, lo
        flushTicker := time.NewTicker(DefaultFlushInterval)
        defer flushTicker.Stop()
 
-       batch := make([]proto.Message, 0, DefaultBatchSize)
+       batch := make([]interface{}, 0, DefaultBatchSize)
 
        for {
                select {
@@ -151,14 +173,29 @@ func startConsumer(f *fileLog, root, template string, 
interval time.Duration, lo
 }
 
 // flushBatch marshals and writes a batch of requests to the specified file.
-func flushBatch(file *os.File, batch []proto.Message, log *logger.Logger) {
+func flushBatch(file *os.File, batch []interface{}, log *logger.Logger) {
        if file == nil || len(batch) == 0 {
                return
        }
 
        var buffer bytes.Buffer
        for _, req := range batch {
-               data, err := protojson.Marshal(req)
+               var data []byte
+               var err error
+
+               // Handle different types of messages
+               switch v := req.(type) {
+               case *QueryLogEntry:
+                       // For query log entries, use regular JSON marshaling
+                       data, err = json.Marshal(v)
+               case proto.Message:
+                       // For protobuf messages, use protojson marshaling
+                       data, err = protojson.Marshal(v)
+               default:
+                       // For unknown types, try JSON marshaling
+                       data, err = json.Marshal(v)
+               }
+
                if err != nil {
                        log.Error().Err(err).Msg("failed to marshal request")
                        continue
diff --git a/pkg/accesslog/accesslog.go b/pkg/accesslog/query_log.go
similarity index 54%
copy from pkg/accesslog/accesslog.go
copy to pkg/accesslog/query_log.go
index 4363fab6..e1c54f18 100644
--- a/pkg/accesslog/accesslog.go
+++ b/pkg/accesslog/query_log.go
@@ -15,17 +15,33 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package accesslog provides access log for banyandb.
 package accesslog
 
 import (
+       "time"
+
        "google.golang.org/protobuf/proto"
 )
 
-// Log is the interface for access log.
-type Log interface {
-       // Write writes the access log.
-       Write(req proto.Message) error
-       // Close closes the access log.
-       Close() error
+// QueryLogEntry wraps a query request with execution timing information.
+type QueryLogEntry struct {
+       StartTime time.Time     `json:"start_time"`
+       Request   proto.Message `json:"request"`
+       Service   string        `json:"service"`
+       Error     string        `json:"error,omitempty"`
+       Duration  time.Duration `json:"duration_ms"`
+}
+
+// NewQueryLogEntry creates a new query log entry with timing information.
+func NewQueryLogEntry(service string, startTime time.Time, duration 
time.Duration, request proto.Message, err error) *QueryLogEntry {
+       entry := &QueryLogEntry{
+               Service:   service,
+               StartTime: startTime,
+               Duration:  duration,
+               Request:   request,
+       }
+       if err != nil {
+               entry.Error = err.Error()
+       }
+       return entry
 }
diff --git a/pkg/accesslog/query_log_test.go b/pkg/accesslog/query_log_test.go
new file mode 100644
index 00000000..4d1310eb
--- /dev/null
+++ b/pkg/accesslog/query_log_test.go
@@ -0,0 +1,148 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package accesslog
+
+import (
+       "bufio"
+       "encoding/json"
+       "fmt"
+       "os"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestQueryLogEntry_WriteQuery(t *testing.T) {
+       dir := t.TempDir()
+       initTestLogger(t)
+       l := logger.GetLogger("test", "accesslog")
+       flog, err := NewFileLog(dir, "query-test-%s.log", 10*time.Second, l, 
false)
+       require.NoError(t, err)
+       defer flog.Close()
+
+       // Test data
+       startTime := time.Now()
+       duration := 150 * time.Millisecond
+       testErr := fmt.Errorf("test error")
+
+       // Create realistic stream query request
+       streamReq := &streamv1.QueryRequest{
+               Groups: []string{"default"},
+               Name:   "service_traffic",
+               Limit:  100,
+       }
+
+       // Create realistic measure query request
+       measureReq := &measurev1.QueryRequest{
+               Groups: []string{"default"},
+               Name:   "service_cpm",
+               Limit:  50,
+       }
+
+       // Write query log with timing information for stream
+       err = flog.WriteQuery("stream", startTime, duration, streamReq, testErr)
+       require.NoError(t, err)
+
+       // Write query log without error for measure
+       err = flog.WriteQuery("measure", startTime.Add(time.Second), 
duration*2, measureReq, nil)
+       require.NoError(t, err)
+
+       // Wait for flush
+       time.Sleep(2 * time.Second)
+
+       // Verify log files were created and contain query entries
+       files := listLogFiles(t, dir)
+       require.Len(t, files, 1)
+
+       // Read and verify log content
+       file, err := os.Open(files[0])
+       require.NoError(t, err)
+       defer file.Close()
+
+       scanner := bufio.NewScanner(file)
+       lines := 0
+       for scanner.Scan() {
+               line := scanner.Text()
+               if len(line) == 0 {
+                       continue
+               }
+               lines++
+
+               // Parse the JSON to verify it contains timing information
+               var entry map[string]interface{}
+               err := json.Unmarshal([]byte(line), &entry)
+               require.NoError(t, err)
+
+               // Verify query log entry structure
+               require.Contains(t, entry, "service")
+               require.Contains(t, entry, "start_time")
+               require.Contains(t, entry, "duration_ms")
+               require.Contains(t, entry, "request")
+
+               // Verify service is correct
+               service := entry["service"].(string)
+               require.True(t, service == "stream" || service == "measure")
+
+               // Verify duration is recorded
+               durationMs := entry["duration_ms"]
+               require.NotNil(t, durationMs)
+
+               // Check error field for first entry (stream)
+               if service == "stream" {
+                       require.Contains(t, entry, "error")
+                       require.Equal(t, "test error", entry["error"])
+               }
+       }
+
+       require.NoError(t, scanner.Err())
+       require.Equal(t, 2, lines, "Should have exactly 2 log entries")
+}
+
+func TestQueryLogEntry_Creation(t *testing.T) {
+       startTime := time.Now()
+       duration := 100 * time.Millisecond
+       testErr := fmt.Errorf("test error")
+
+       // Create realistic measure query request
+       req := &measurev1.QueryRequest{
+               Groups: []string{"default", "prod"},
+               Name:   "service_throughput",
+               Limit:  200,
+       }
+
+       // Test with error
+       entry := NewQueryLogEntry("measure", startTime, duration, req, testErr)
+       require.Equal(t, "measure", entry.Service)
+       require.Equal(t, startTime, entry.StartTime)
+       require.Equal(t, duration, entry.Duration)
+       require.Equal(t, req, entry.Request)
+       require.Equal(t, "test error", entry.Error)
+
+       // Test without error
+       entryNoError := NewQueryLogEntry("measure", startTime, duration, req, 
nil)
+       require.Equal(t, "measure", entryNoError.Service)
+       require.Equal(t, startTime, entryNoError.StartTime)
+       require.Equal(t, duration, entryNoError.Duration)
+       require.Equal(t, req, entryNoError.Request)
+       require.Empty(t, entryNoError.Error)
+}

Reply via email to