This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 5741ebd0 Enhance query access logging across gRPC services (#755) 5741ebd0 is described below commit 5741ebd00bed3705eb286b0c5d1129a2246b0612 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Sep 5 20:52:23 2025 +0800 Enhance query access logging across gRPC services (#755) --- banyand/liaison/grpc/measure.go | 27 +++-- banyand/liaison/grpc/property.go | 14 +-- banyand/liaison/grpc/stream.go | 14 +-- banyand/liaison/grpc/trace.go | 14 +-- banyand/trace/query.go | 3 + pkg/accesslog/accesslog.go | 4 + pkg/accesslog/file.go | 51 +++++++-- pkg/accesslog/{accesslog.go => query_log.go} | 30 ++++-- pkg/accesslog/query_log_test.go | 148 +++++++++++++++++++++++++++ 9 files changed, 263 insertions(+), 42 deletions(-) diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index b3d6912b..2f9eafba 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -278,19 +278,21 @@ func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest } start := time.Now() defer func() { + duration := time.Since(start) for _, g := range req.Groups { ms.metrics.totalFinished.Inc(1, g, "measure", "query") if err != nil { ms.metrics.totalErr.Inc(1, g, "measure", "query") } - ms.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "measure", "query") + ms.metrics.totalLatency.Inc(duration.Seconds(), g, "measure", "query") } - }() - if ms.queryAccessLog != nil { - if errAccessLog := ms.queryAccessLog.Write(req); errAccessLog != nil { - ms.l.Error().Err(errAccessLog).Msg("query access log error") + // Log query with timing information at the end + if ms.queryAccessLog != nil { + if errAccessLog := ms.queryAccessLog.WriteQuery("measure", start, duration, req, err); errAccessLog != nil { + ms.l.Error().Err(errAccessLog).Msg("query access log error") + } } - } + }() if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil { return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", req.GetTimeRange(), err) } @@ -331,11 +333,16 @@ func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest } func (ms *measureService) TopN(ctx context.Context, topNRequest *measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) { - if ms.queryAccessLog != nil { - if errAccessLog := ms.queryAccessLog.Write(topNRequest); errAccessLog != nil { - ms.l.Error().Err(errAccessLog).Msg("query access log error") + start := time.Now() + defer func() { + duration := time.Since(start) + // Log query with timing information at the end + if ms.queryAccessLog != nil { + if errAccessLog := ms.queryAccessLog.WriteQuery("measure", start, duration, topNRequest, err); errAccessLog != nil { + ms.l.Error().Err(errAccessLog).Msg("query access log error") + } } - } + }() if err = timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err != nil { return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", topNRequest.GetTimeRange(), err) } diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go index 99880b86..9f68570c 100644 --- a/banyand/liaison/grpc/property.go +++ b/banyand/liaison/grpc/property.go @@ -415,17 +415,19 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques ps.metrics.totalStarted.Inc(1, "", "property", "query") start := time.Now() defer func() { + duration := time.Since(start) ps.metrics.totalFinished.Inc(1, "", "property", "query") - ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), "", "property", "query") + ps.metrics.totalLatency.Inc(duration.Seconds(), "", "property", "query") if err != nil { ps.metrics.totalErr.Inc(1, "", "property", "query") } - }() - if ps.queryAccessLog != nil { - if errAccessLog := ps.queryAccessLog.Write(req); errAccessLog != nil { - ps.log.Error().Err(errAccessLog).Msg("query access log error") + // Log query with timing information at the end + if ps.queryAccessLog != nil { + if errAccessLog := ps.queryAccessLog.WriteQuery("property", start, duration, req, err); errAccessLog != nil { + ps.log.Error().Err(errAccessLog).Msg("query access log error") + } } - } + }() if len(req.Groups) == 0 { return nil, schema.BadRequest("groups", "groups should not be empty") } diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index e8e65d13..5cc5a46a 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -256,19 +256,21 @@ func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) ( } start := time.Now() defer func() { + duration := time.Since(start) for _, g := range req.Groups { s.metrics.totalFinished.Inc(1, g, "stream", "query") if err != nil { s.metrics.totalErr.Inc(1, g, "stream", "query") } - s.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "stream", "query") + s.metrics.totalLatency.Inc(duration.Seconds(), g, "stream", "query") } - }() - if s.queryAccessLog != nil { - if errAccessLog := s.queryAccessLog.Write(req); errAccessLog != nil { - s.l.Error().Err(errAccessLog).Msg("query access log error") + // Log query with timing information at the end + if s.queryAccessLog != nil { + if errAccessLog := s.queryAccessLog.WriteQuery("stream", start, duration, req, err); errAccessLog != nil { + s.l.Error().Err(errAccessLog).Msg("query access log error") + } } - } + }() timeRange := req.GetTimeRange() if timeRange == nil { req.TimeRange = timestamp.DefaultTimeRange diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go index 7f265c82..b2105ead 100644 --- a/banyand/liaison/grpc/trace.go +++ b/banyand/liaison/grpc/trace.go @@ -319,19 +319,21 @@ func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest) (re } start := time.Now() defer func() { + duration := time.Since(start) for _, g := range req.Groups { s.metrics.totalFinished.Inc(1, g, "trace", "query") if err != nil { s.metrics.totalErr.Inc(1, g, "trace", "query") } - s.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "trace", "query") + s.metrics.totalLatency.Inc(duration.Seconds(), g, "trace", "query") } - }() - if s.queryAccessLog != nil { - if errAccessLog := s.queryAccessLog.Write(req); errAccessLog != nil { - s.l.Error().Err(errAccessLog).Msg("query access log error") + // Log query with timing information at the end + if s.queryAccessLog != nil { + if errAccessLog := s.queryAccessLog.WriteQuery("trace", start, duration, req, err); errAccessLog != nil { + s.l.Error().Err(errAccessLog).Msg("query access log error") + } } - } + }() timeRange := req.GetTimeRange() if timeRange == nil { req.TimeRange = timestamp.DefaultTimeRange diff --git a/banyand/trace/query.go b/banyand/trace/query.go index ffdcf699..c2c586da 100644 --- a/banyand/trace/query.go +++ b/banyand/trace/query.go @@ -52,6 +52,9 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T if tqo.TimeRange == nil { return nil, errors.New("invalid query options: timeRange are required") } + if len(tqo.TraceIDs) == 0 && tqo.Order == nil { + return nil, errors.New("invalid query options: either traceIDs or order must be specified") + } var tsdb storage.TSDB[*tsTable, option] var err error db := t.tsdb.Load() diff --git a/pkg/accesslog/accesslog.go b/pkg/accesslog/accesslog.go index 4363fab6..4ac2d90c 100644 --- a/pkg/accesslog/accesslog.go +++ b/pkg/accesslog/accesslog.go @@ -19,6 +19,8 @@ package accesslog import ( + "time" + "google.golang.org/protobuf/proto" ) @@ -26,6 +28,8 @@ import ( type Log interface { // Write writes the access log. Write(req proto.Message) error + // WriteQuery writes the query access log with timing information. + WriteQuery(service string, startTime time.Time, duration time.Duration, req proto.Message, err error) error // Close closes the access log. Close() error } diff --git a/pkg/accesslog/file.go b/pkg/accesslog/file.go index 2ef934af..a59ac7cb 100644 --- a/pkg/accesslog/file.go +++ b/pkg/accesslog/file.go @@ -19,6 +19,7 @@ package accesslog import ( "bytes" + "encoding/json" "fmt" "os" "path" @@ -40,7 +41,7 @@ const ( type fileLog struct { file *os.File // Single file for all goroutines - validRequests chan proto.Message + validRequests chan interface{} closer *run.Closer sampled bool } @@ -49,12 +50,12 @@ type fileLog struct { // sampled: if true (default), requests may be dropped when the channel is full. // If false, requests are never dropped but use buffered channel to prevent blocking. func NewFileLog(root, template string, interval time.Duration, log *logger.Logger, sampled bool) (Log, error) { - var validRequests chan proto.Message + var validRequests chan interface{} if sampled { - validRequests = make(chan proto.Message, 100) + validRequests = make(chan interface{}, 100) } else { // For non-sampled mode, use buffered channel to prevent blocking on writes - validRequests = make(chan proto.Message, 1000) // Buffer to handle burst writes + validRequests = make(chan interface{}, 1000) // Buffer to handle burst writes } f := &fileLog{ @@ -92,6 +93,27 @@ func (f *fileLog) Write(req proto.Message) error { return nil } +func (f *fileLog) WriteQuery(service string, startTime time.Time, duration time.Duration, req proto.Message, err error) error { + if f == nil { + return nil + } + + queryEntry := NewQueryLogEntry(service, startTime, duration, req, err) + + if f.sampled { + // Sampled mode: may drop requests if channel is full + select { + case f.validRequests <- queryEntry: + default: + return fmt.Errorf("access log is full") + } + } else { + // Non-sampled mode: never drop requests, block until buffer has space + f.validRequests <- queryEntry + } + return nil +} + func (f *fileLog) Close() error { if f == nil { return nil @@ -118,7 +140,7 @@ func startConsumer(f *fileLog, root, template string, interval time.Duration, lo flushTicker := time.NewTicker(DefaultFlushInterval) defer flushTicker.Stop() - batch := make([]proto.Message, 0, DefaultBatchSize) + batch := make([]interface{}, 0, DefaultBatchSize) for { select { @@ -151,14 +173,29 @@ func startConsumer(f *fileLog, root, template string, interval time.Duration, lo } // flushBatch marshals and writes a batch of requests to the specified file. -func flushBatch(file *os.File, batch []proto.Message, log *logger.Logger) { +func flushBatch(file *os.File, batch []interface{}, log *logger.Logger) { if file == nil || len(batch) == 0 { return } var buffer bytes.Buffer for _, req := range batch { - data, err := protojson.Marshal(req) + var data []byte + var err error + + // Handle different types of messages + switch v := req.(type) { + case *QueryLogEntry: + // For query log entries, use regular JSON marshaling + data, err = json.Marshal(v) + case proto.Message: + // For protobuf messages, use protojson marshaling + data, err = protojson.Marshal(v) + default: + // For unknown types, try JSON marshaling + data, err = json.Marshal(v) + } + if err != nil { log.Error().Err(err).Msg("failed to marshal request") continue diff --git a/pkg/accesslog/accesslog.go b/pkg/accesslog/query_log.go similarity index 54% copy from pkg/accesslog/accesslog.go copy to pkg/accesslog/query_log.go index 4363fab6..e1c54f18 100644 --- a/pkg/accesslog/accesslog.go +++ b/pkg/accesslog/query_log.go @@ -15,17 +15,33 @@ // specific language governing permissions and limitations // under the License. -// Package accesslog provides access log for banyandb. package accesslog import ( + "time" + "google.golang.org/protobuf/proto" ) -// Log is the interface for access log. -type Log interface { - // Write writes the access log. - Write(req proto.Message) error - // Close closes the access log. - Close() error +// QueryLogEntry wraps a query request with execution timing information. +type QueryLogEntry struct { + StartTime time.Time `json:"start_time"` + Request proto.Message `json:"request"` + Service string `json:"service"` + Error string `json:"error,omitempty"` + Duration time.Duration `json:"duration_ms"` +} + +// NewQueryLogEntry creates a new query log entry with timing information. +func NewQueryLogEntry(service string, startTime time.Time, duration time.Duration, request proto.Message, err error) *QueryLogEntry { + entry := &QueryLogEntry{ + Service: service, + StartTime: startTime, + Duration: duration, + Request: request, + } + if err != nil { + entry.Error = err.Error() + } + return entry } diff --git a/pkg/accesslog/query_log_test.go b/pkg/accesslog/query_log_test.go new file mode 100644 index 00000000..4d1310eb --- /dev/null +++ b/pkg/accesslog/query_log_test.go @@ -0,0 +1,148 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package accesslog + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +func TestQueryLogEntry_WriteQuery(t *testing.T) { + dir := t.TempDir() + initTestLogger(t) + l := logger.GetLogger("test", "accesslog") + flog, err := NewFileLog(dir, "query-test-%s.log", 10*time.Second, l, false) + require.NoError(t, err) + defer flog.Close() + + // Test data + startTime := time.Now() + duration := 150 * time.Millisecond + testErr := fmt.Errorf("test error") + + // Create realistic stream query request + streamReq := &streamv1.QueryRequest{ + Groups: []string{"default"}, + Name: "service_traffic", + Limit: 100, + } + + // Create realistic measure query request + measureReq := &measurev1.QueryRequest{ + Groups: []string{"default"}, + Name: "service_cpm", + Limit: 50, + } + + // Write query log with timing information for stream + err = flog.WriteQuery("stream", startTime, duration, streamReq, testErr) + require.NoError(t, err) + + // Write query log without error for measure + err = flog.WriteQuery("measure", startTime.Add(time.Second), duration*2, measureReq, nil) + require.NoError(t, err) + + // Wait for flush + time.Sleep(2 * time.Second) + + // Verify log files were created and contain query entries + files := listLogFiles(t, dir) + require.Len(t, files, 1) + + // Read and verify log content + file, err := os.Open(files[0]) + require.NoError(t, err) + defer file.Close() + + scanner := bufio.NewScanner(file) + lines := 0 + for scanner.Scan() { + line := scanner.Text() + if len(line) == 0 { + continue + } + lines++ + + // Parse the JSON to verify it contains timing information + var entry map[string]interface{} + err := json.Unmarshal([]byte(line), &entry) + require.NoError(t, err) + + // Verify query log entry structure + require.Contains(t, entry, "service") + require.Contains(t, entry, "start_time") + require.Contains(t, entry, "duration_ms") + require.Contains(t, entry, "request") + + // Verify service is correct + service := entry["service"].(string) + require.True(t, service == "stream" || service == "measure") + + // Verify duration is recorded + durationMs := entry["duration_ms"] + require.NotNil(t, durationMs) + + // Check error field for first entry (stream) + if service == "stream" { + require.Contains(t, entry, "error") + require.Equal(t, "test error", entry["error"]) + } + } + + require.NoError(t, scanner.Err()) + require.Equal(t, 2, lines, "Should have exactly 2 log entries") +} + +func TestQueryLogEntry_Creation(t *testing.T) { + startTime := time.Now() + duration := 100 * time.Millisecond + testErr := fmt.Errorf("test error") + + // Create realistic measure query request + req := &measurev1.QueryRequest{ + Groups: []string{"default", "prod"}, + Name: "service_throughput", + Limit: 200, + } + + // Test with error + entry := NewQueryLogEntry("measure", startTime, duration, req, testErr) + require.Equal(t, "measure", entry.Service) + require.Equal(t, startTime, entry.StartTime) + require.Equal(t, duration, entry.Duration) + require.Equal(t, req, entry.Request) + require.Equal(t, "test error", entry.Error) + + // Test without error + entryNoError := NewQueryLogEntry("measure", startTime, duration, req, nil) + require.Equal(t, "measure", entryNoError.Service) + require.Equal(t, startTime, entryNoError.StartTime) + require.Equal(t, duration, entryNoError.Duration) + require.Equal(t, req, entryNoError.Request) + require.Empty(t, entryNoError.Error) +}