This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace/query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit df4798f209084b068140c7a4cc8436bbc8ade288
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Aug 29 23:04:43 2025 +0800

    Refactor trace query execution and filtering to utilize iterators for 
improved performance and memory efficiency. Update the TraceExecutable 
interface to return iterators, enabling lazy evaluation of trace results. 
Enhance trace processing logic to handle offsets and limits more effectively, 
ensuring accurate result management during query execution.
---
 banyand/query/processor.go                       | 57 ++++++++++----
 pkg/query/executor/interface.go                  |  3 +-
 pkg/query/logical/trace/trace_analyzer.go        | 71 ++++++++++-------
 pkg/query/logical/trace/trace_plan_local.go      | 80 ++++++++++++-------
 pkg/query/logical/trace/trace_plan_tag_filter.go | 98 ++++++++++++++++--------
 5 files changed, 205 insertions(+), 104 deletions(-)

diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 18f8be84..b711ec20 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -524,32 +524,55 @@ func (p *traceQueryProcessor) executeQuery(ctx 
context.Context, queryCriteria *t
 
        te := plan.(executor.TraceExecutable)
        defer te.Close()
-       result, err := te.Execute(ctx)
+       resultIterator, err := te.Execute(ctx)
        if err != nil {
                p.log.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to execute the trace query plan")
                resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("execute the query plan for trace %s: %v", 
queryCriteria.GetName(), err))
                return
        }
 
-       // Convert model.TraceResult to tracev1.QueryResponse format
-       spans := make([]*tracev1.Span, len(result.Spans))
-       for i, spanBytes := range result.Spans {
-               // Create trace tags from the result
-               var traceTags []*modelv1.Tag
-               if result.Tags != nil {
-                       for _, tag := range result.Tags {
-                               if len(tag.Values) > 0 {
-                                       traceTags = append(traceTags, 
&modelv1.Tag{
-                                               Key:   tag.Name,
-                                               Value: tag.Values[0],
-                                       })
+       // Convert model.TraceResult iterator to tracev1.QueryResponse format
+       var spans []*tracev1.Span
+
+       for {
+               result, hasNext := resultIterator.Next()
+               if !hasNext {
+                       break
+               }
+
+               // Convert each span in the trace result
+               for i, spanBytes := range result.Spans {
+                       // Create trace tags from the result
+                       var traceTags []*modelv1.Tag
+                       if result.Tags != nil {
+                               for _, tag := range result.Tags {
+                                       if i < len(tag.Values) {
+                                               traceTags = append(traceTags, 
&modelv1.Tag{
+                                                       Key:   tag.Name,
+                                                       Value: tag.Values[i],
+                                               })
+                                       }
                                }
                        }
-               }
 
-               spans[i] = &tracev1.Span{
-                       Tags: traceTags,
-                       Span: spanBytes,
+                       // Add trace ID tag to each span
+                       if traceIDTagName != "" && result.TID != "" {
+                               traceTags = append(traceTags, &modelv1.Tag{
+                                       Key: traceIDTagName,
+                                       Value: &modelv1.TagValue{
+                                               Value: &modelv1.TagValue_Str{
+                                                       Str: &modelv1.Str{
+                                                               Value: 
result.TID,
+                                                       },
+                                               },
+                                       },
+                               })
+                       }
+
+                       spans = append(spans, &tracev1.Span{
+                               Tags: traceTags,
+                               Span: spanBytes,
+                       })
                }
        }
 
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index f9ce9717..9953f538 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -25,6 +25,7 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/iter"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
@@ -87,6 +88,6 @@ type TraceExecutionContext interface {
 
 // TraceExecutable allows querying in the trace schema.
 type TraceExecutable interface {
-       Execute(context.Context) (model.TraceResult, error)
+       Execute(context.Context) (iter.Iterator[model.TraceResult], error)
        Close()
 }
diff --git a/pkg/query/logical/trace/trace_analyzer.go 
b/pkg/query/logical/trace/trace_analyzer.go
index 5ac4504d..7830a104 100644
--- a/pkg/query/logical/trace/trace_analyzer.go
+++ b/pkg/query/logical/trace/trace_analyzer.go
@@ -23,6 +23,7 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/pkg/iter"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -123,42 +124,56 @@ func (l *traceLimit) Close() {
        l.Parent.Input.(executor.TraceExecutable).Close()
 }
 
-func (l *traceLimit) Execute(ctx context.Context) (model.TraceResult, error) {
-       // For trace queries, we typically get one result per execution
-       // The limit and offset are handled at the span level within each trace
-       result, err := l.Parent.Input.(executor.TraceExecutable).Execute(ctx)
+func (l *traceLimit) Execute(ctx context.Context) 
(iter.Iterator[model.TraceResult], error) {
+       // Apply offset and limit to trace results (not spans within each trace)
+       resultIterator, err := 
l.Parent.Input.(executor.TraceExecutable).Execute(ctx)
        if err != nil {
-               return model.TraceResult{}, err
+               return iter.Empty[model.TraceResult](), err
        }
 
-       // Apply offset and limit to spans within the trace
-       if len(result.Spans) == 0 {
-               return result, nil
-       }
+       // Return a lazy iterator that handles offset and limit at the result 
level
+       return &traceLimitIterator{
+               sourceIterator: resultIterator,
+               offset:         int(l.offsetNum),
+               limit:          int(l.limitNum),
+               currentIndex:   0,
+               returned:       0,
+       }, nil
+}
 
-       offset := int(l.offsetNum)
-       limit := int(l.limitNum)
+// traceLimitIterator implements iter.Iterator[model.TraceResult] by applying
+// offset and limit to the number of trace results (not spans within results).
+type traceLimitIterator struct {
+       sourceIterator iter.Iterator[model.TraceResult]
+       offset         int
+       limit          int
+       currentIndex   int
+       returned       int
+}
 
-       if offset >= len(result.Spans) {
-               return model.TraceResult{
-                       Error: result.Error,
-                       Spans: [][]byte{},
-                       TID:   result.TID,
-                       Tags:  result.Tags,
-               }, nil
+func (tli *traceLimitIterator) Next() (model.TraceResult, bool) {
+       // If we've already returned the maximum number of results, stop
+       if tli.limit > 0 && tli.returned >= tli.limit {
+               return model.TraceResult{}, false
        }
 
-       endIndex := offset + limit
-       if endIndex > len(result.Spans) {
-               endIndex = len(result.Spans)
-       }
+       for {
+               result, hasNext := tli.sourceIterator.Next()
+               if !hasNext {
+                       return model.TraceResult{}, false
+               }
 
-       return model.TraceResult{
-               Error: result.Error,
-               Spans: result.Spans[offset:endIndex],
-               TID:   result.TID,
-               Tags:  result.Tags,
-       }, nil
+               // Skip results until we reach the offset
+               if tli.currentIndex < tli.offset {
+                       tli.currentIndex++
+                       continue
+               }
+
+               // We're past the offset, return this result and increment 
counters
+               tli.currentIndex++
+               tli.returned++
+               return result, true
+       }
 }
 
 func (l *traceLimit) Analyze(s logical.Schema) (logical.Plan, error) {
diff --git a/pkg/query/logical/trace/trace_plan_local.go 
b/pkg/query/logical/trace/trace_plan_local.go
index eaa1785d..d73af754 100644
--- a/pkg/query/logical/trace/trace_plan_local.go
+++ b/pkg/query/logical/trace/trace_plan_local.go
@@ -23,6 +23,7 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/iter"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
@@ -66,42 +67,67 @@ func (i *localScan) Sort(order *logical.OrderBy) {
        i.order = order
 }
 
-func (i *localScan) Execute(ctx context.Context) (model.TraceResult, error) {
+func (i *localScan) Execute(ctx context.Context) 
(iter.Iterator[model.TraceResult], error) {
        select {
        case <-ctx.Done():
-               return model.TraceResult{}, ctx.Err()
+               return iter.Empty[model.TraceResult](), ctx.Err()
        default:
        }
-       if i.result != nil {
-               return *i.result.Pull(), nil
-       }
-       var orderBy *index.OrderBy
-       if i.order != nil {
-               orderBy = &index.OrderBy{
-                       Index: i.order.Index,
-                       Sort:  i.order.Sort,
+
+       // If we don't have a result yet, execute the query
+       if i.result == nil {
+               var orderBy *index.OrderBy
+               if i.order != nil {
+                       orderBy = &index.OrderBy{
+                               Index: i.order.Index,
+                               Sort:  i.order.Sort,
+                       }
+               }
+               var err error
+               if i.result, err = i.ec.Query(ctx, model.TraceQueryOptions{
+                       Name:           i.metadata.GetName(),
+                       TimeRange:      &i.timeRange,
+                       SkippingFilter: i.skippingFilter,
+                       Order:          orderBy,
+                       TagProjection:  i.projectionTags,
+                       MaxTraceSize:   i.maxTraceSize,
+                       TraceIDs:       i.traceIDs,
+               }); err != nil {
+                       return iter.Empty[model.TraceResult](), err
+               }
+               if i.result == nil {
+                       return iter.Empty[model.TraceResult](), nil
                }
        }
-       var err error
-       if i.result, err = i.ec.Query(ctx, model.TraceQueryOptions{
-               Name:           i.metadata.GetName(),
-               TimeRange:      &i.timeRange,
-               SkippingFilter: i.skippingFilter,
-               Order:          orderBy,
-               TagProjection:  i.projectionTags,
-               MaxTraceSize:   i.maxTraceSize,
-               TraceIDs:       i.traceIDs,
-       }); err != nil {
-               return model.TraceResult{}, err
-       }
-       if i.result == nil {
-               return model.TraceResult{}, nil
+
+       // Return a custom iterator that continuously pulls from i.result
+       return &traceResultIterator{result: i.result}, nil
+}
+
+// traceResultIterator implements iter.Iterator[model.TraceResult] by 
continuously
+// calling Pull() on the TraceQueryResult until it returns nil or encounters 
an error.
+type traceResultIterator struct {
+       result model.TraceQueryResult
+       err    error
+}
+
+func (tri *traceResultIterator) Next() (model.TraceResult, bool) {
+       if tri.err != nil || tri.result == nil {
+               return model.TraceResult{}, false
        }
-       traceResult := i.result.Pull()
+
+       traceResult := tri.result.Pull()
        if traceResult == nil {
-               return model.TraceResult{}, nil
+               return model.TraceResult{}, false
+       }
+
+       // Check if the result contains an error
+       if traceResult.Error != nil {
+               tri.err = traceResult.Error
+               return *traceResult, false
        }
-       return *traceResult, nil
+
+       return *traceResult, true
 }
 
 func (i *localScan) String() string {
diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go 
b/pkg/query/logical/trace/trace_plan_tag_filter.go
index 9addb100..f868d49a 100644
--- a/pkg/query/logical/trace/trace_plan_tag_filter.go
+++ b/pkg/query/logical/trace/trace_plan_tag_filter.go
@@ -25,6 +25,7 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/iter"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
@@ -188,51 +189,86 @@ func newTraceTagFilter(s logical.Schema, parent 
logical.Plan, tagFilter logical.
        }
 }
 
-func (t *traceTagFilterPlan) Execute(ctx context.Context) (model.TraceResult, 
error) {
-       result, err := t.parent.(executor.TraceExecutable).Execute(ctx)
+func (t *traceTagFilterPlan) Execute(ctx context.Context) 
(iter.Iterator[model.TraceResult], error) {
+       resultIterator, err := t.parent.(executor.TraceExecutable).Execute(ctx)
        if err != nil {
-               return model.TraceResult{}, err
+               return iter.Empty[model.TraceResult](), err
        }
 
-       // If there are no spans, no need to check the filter
-       if len(result.Spans) == 0 {
-               return model.TraceResult{}, nil
+       // Return a lazy filtering iterator that processes results on demand
+       return &traceTagFilterIterator{
+               sourceIterator: resultIterator,
+               tagFilter:      t.tagFilter,
+               schema:         t.s,
+       }, nil
+}
+
+// traceTagFilterIterator implements iter.Iterator[model.TraceResult] by lazily
+// filtering results from the source iterator using the tag filter.
+type traceTagFilterIterator struct {
+       sourceIterator iter.Iterator[model.TraceResult]
+       tagFilter      logical.TagFilter
+       schema         logical.Schema
+       err            error
+}
+
+func (tfti *traceTagFilterIterator) Next() (model.TraceResult, bool) {
+       if tfti.err != nil {
+               return model.TraceResult{}, false
        }
 
-       maxRows := len(result.Spans)
+       for {
+               result, hasNext := tfti.sourceIterator.Next()
+               if !hasNext {
+                       return model.TraceResult{}, false
+               }
 
-       // Check each row to see if any matches the filter
-       for rowIdx := 0; rowIdx < maxRows; rowIdx++ {
-               // Create TagFamilies for this specific row
-               family := &modelv1.TagFamily{
-                       Name: "",
-                       Tags: make([]*modelv1.Tag, 0, len(result.Tags)),
+               // If there are no spans, skip this result
+               if len(result.Spans) == 0 {
+                       continue
                }
 
-               // Build the row by taking the value at rowIdx from each tag 
(if it exists)
-               for _, tag := range result.Tags {
-                       if rowIdx < len(tag.Values) {
-                               family.Tags = append(family.Tags, &modelv1.Tag{
-                                       Key:   tag.Name,
-                                       Value: tag.Values[rowIdx],
-                               })
+               maxRows := len(result.Spans)
+               matched := false
+
+               // Check each row to see if any matches the filter
+               for rowIdx := 0; rowIdx < maxRows; rowIdx++ {
+                       // Create TagFamilies for this specific row
+                       family := &modelv1.TagFamily{
+                               Name: "",
+                               Tags: make([]*modelv1.Tag, 0, len(result.Tags)),
                        }
-               }
 
-               tagFamilies := []*modelv1.TagFamily{family}
-               ok, err := t.tagFilter.Match(logical.TagFamilies(tagFamilies), 
t.s)
-               if err != nil {
-                       return model.TraceResult{}, err
+                       // Build the row by taking the value at rowIdx from 
each tag (if it exists)
+                       for _, tag := range result.Tags {
+                               if rowIdx < len(tag.Values) {
+                                       family.Tags = append(family.Tags, 
&modelv1.Tag{
+                                               Key:   tag.Name,
+                                               Value: tag.Values[rowIdx],
+                                       })
+                               }
+                       }
+
+                       tagFamilies := []*modelv1.TagFamily{family}
+                       ok, err := 
tfti.tagFilter.Match(logical.TagFamilies(tagFamilies), tfti.schema)
+                       if err != nil {
+                               tfti.err = err
+                               return model.TraceResult{}, false
+                       }
+
+                       // If ANY row matches, return this result
+                       if ok {
+                               matched = true
+                               break
+                       }
                }
 
-               // If ANY row matches, return the entire result
-               if ok {
-                       return result, nil
+               if matched {
+                       return result, true
                }
-       }
 
-       // No rows matched the filter
-       return model.TraceResult{}, nil
+               // If no match, continue to the next result
+       }
 }
 
 func (t *traceTagFilterPlan) String() string {

Reply via email to