Copilot commented on code in PR #830:
URL: 
https://github.com/apache/skywalking-banyandb/pull/830#discussion_r2475995636


##########
banyand/internal/sidx/sidx.go:
##########
@@ -223,28 +226,129 @@ func (s *sidx) loadBlockCursor(bc *blockCursor, tmpBlock 
*block, bs blockScanRes
                return false
        }
 
-       // Copy data to block cursor
-       bc.userKeys = make([]int64, len(tmpBlock.userKeys))
-       copy(bc.userKeys, tmpBlock.userKeys)
+       if metrics != nil {
+               metrics.blockElementsLoaded.Add(int64(len(tmpBlock.userKeys)))
+       }
 
-       bc.data = make([][]byte, len(tmpBlock.data))
-       for i, data := range tmpBlock.data {
-               bc.data[i] = make([]byte, len(data))
-               copy(bc.data[i], data)
+       totalElements := len(tmpBlock.userKeys)
+       if totalElements == 0 {
+               return false
        }
 
-       // Copy tags
+       // Pre-allocate slices for filtered data (optimize for common case 
where most elements match)
+       bc.userKeys = make([]int64, 0, totalElements)
+       bc.data = make([][]byte, 0, totalElements)
+
+       // Pre-allocate tag slices
        bc.tags = make(map[string][]Tag)
-       for tagName, tagData := range tmpBlock.tags {
-               tagSlice := make([]Tag, len(tagData.values))
-               for i, value := range tagData.values {
-                       tagSlice[i] = Tag{
-                               Name:      tagName,
-                               Value:     value,
-                               ValueType: tagData.valueType,
+       for tagName := range tmpBlock.tags {
+               bc.tags[tagName] = make([]Tag, 0, totalElements)
+       }
+
+       // Track seen data for deduplication
+       seenData := make(map[string]struct{})

Review Comment:
   Using `string(tmpBlock.data[i])` as map keys (line 258, 317) creates string 
copies of potentially large byte slices. Consider using a hash function or byte 
slice comparison to avoid unnecessary allocations, especially for large trace 
data.



##########
banyand/trace/block_writer.go:
##########
@@ -258,30 +282,33 @@ func (bw *blockWriter) mustWriteBlock(tid string, b 
*block) {
 
 func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) {
        if len(data) > 0 {
-               bw.primaryBlockMetadata.mustWriteBlock(data, bw.traceIDs[0], 
&bw.writers)
+               bw.primaryBlockMetadata.mustWriteBlock(data, bw.tidFirst, 
&bw.writers)
                bw.metaData = bw.primaryBlockMetadata.marshal(bw.metaData)
        }
+       bw.hasWrittenBlocks = false
        bw.minTimestamp = 0
        bw.maxTimestamp = 0
+       bw.tidFirst = ""

Review Comment:
   The `tidLast` field is not reset in `mustFlushPrimaryBlock`. This could 
cause issues if blocks from different primary blocks are compared, as `tidLast` 
will retain the value from the last block of the previous primary block.
   ```suggestion
        bw.tidFirst = ""
        bw.tidLast = ""
   ```



##########
banyand/internal/sidx/sidx.go:
##########
@@ -364,21 +469,35 @@ func (bch *blockCursorHeap) merge(ctx context.Context, 
batchSize int, resultsCh
 
        // Initialize first batch
        batch := &QueryResponse{
-               Keys: make([]int64, 0),
-               Data: make([][]byte, 0),
-               Tags: make([][]Tag, 0),
-               SIDs: make([]common.SeriesID, 0),
+               Keys:    make([]int64, 0),
+               Data:    make([][]byte, 0),
+               Tags:    make([][]Tag, 0),
+               SIDs:    make([]common.SeriesID, 0),
+               PartIDs: make([]uint64, 0),
        }
 
+       // Track seen data for deduplication across all batches
+       seenData := make(map[string]struct{})
+
        for bch.Len() > 0 {
                topBC := bch.bcc[0]
                if topBC.idx < 0 || topBC.idx >= len(topBC.userKeys) {
                        heap.Pop(bch)
                        continue
                }
 
-               // Copy the element (may be filtered out by key range)
-               topBC.copyTo(batch)
+               // Check for duplicate data before copying
+               currentData := topBC.data[topBC.idx]
+               dataKey := string(currentData)
+
+               // Only copy if this data hasn't been seen across all batches
+               if _, exists := seenData[dataKey]; !exists {
+                       // Copy the element (may be filtered out by key range)
+                       if topBC.copyTo(batch) {
+                               // Mark this data as seen
+                               seenData[dataKey] = struct{}{}
+                       }
+               }
 

Review Comment:
   Deduplication logic appears in three places: lines 258-265, lines 317-321, 
and lines 489-500. Consider extracting this into a helper function to reduce 
code duplication and make the deduplication strategy easier to maintain.
   ```suggestion
                // Deduplicate and copy data
                deduplicateAndCopy(topBC, batch, seenData)
   ```



##########
banyand/trace/streaming_pipeline.go:
##########
@@ -671,23 +703,96 @@ func (t *trace) startBlockScanStage(
                                continue
                        }
 
+                       // Acquire snapshots from all tables
+                       snapshots := make([]*snapshot, 0, len(tables))
+                       for _, table := range tables {
+                               s := table.currentSnapshot()
+                               if s == nil {
+                                       continue
+                               }
+                               snapshots = append(snapshots, s)
+                       }
+
+                       // If no snapshots available, skip this batch
+                       if len(snapshots) == 0 {
+                               select {
+                               case <-ctx.Done():
+                                       return
+                               case out <- &scanBatch{traceBatch: batch}:
+                               }
+                               continue
+                       }
+
+                       // Start part selection span
+                       partSelectionCtx, finishPartSelection := 
startPartSelectionSpan(ctx, &batch, snapshots)
+
+                       parts := make([]*part, 0)
+                       groupedIDs := make([][]string, 0)
+
+                       allTraceIDs := make([]string, 0)
+                       for _, ids := range batch.traceIDs {
+                               allTraceIDs = append(allTraceIDs, ids...)
+                       }
+                       sort.Strings(allTraceIDs)
+
+                       bloomFilteredPartIDs := make([]uint64, 0)
+                       totalGroupedIDs := 0
+
+                       for _, s := range snapshots {
+                               for _, pw := range s.parts {
+                                       p := pw.p
+                                       partID := p.partMetadata.ID
+
+                                       var idsFromSIDX []string
+                                       if traceIDsFromSIDX, exists := 
batch.traceIDs[partID]; exists {
+                                               idsFromSIDX = 
append([]string(nil), traceIDsFromSIDX...)
+                                       }
+                                       var idsForPart []string
+                                       for _, traceID := range allTraceIDs {
+                                               if slices.Contains(idsFromSIDX, 
traceID) || p.traceIDFilter.filter.MightContain(convert.StringToBytes(traceID)) 
{

Review Comment:
   Using `slices.Contains` in a nested loop (inside the `for _, traceID := 
range allTraceIDs` loop) results in O(n*m) complexity. Convert `idsFromSIDX` to 
a map for O(1) lookups to improve performance.
   ```suggestion
                                        // Convert idsFromSIDX to a map for 
O(1) lookups
                                        idsFromSIDXMap := 
make(map[string]struct{}, len(idsFromSIDX))
                                        for _, id := range idsFromSIDX {
                                                idsFromSIDXMap[id] = struct{}{}
                                        }
                                        var idsForPart []string
                                        for _, traceID := range allTraceIDs {
                                                if _, ok := 
idsFromSIDXMap[traceID]; ok || 
p.traceIDFilter.filter.MightContain(convert.StringToBytes(traceID)) {
   ```



##########
pkg/query/logical/trace/trace_plan_tag_filter.go:
##########
@@ -116,14 +127,31 @@ func (uis *unresolvedTraceTagFilter) Analyze(s 
logical.Schema) (logical.Plan, er
                        return nil, errProject
                }
        }
-       plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs, minVal, maxVal)
+       // Build tag filter and create matcher for SIDX
+       var tagFilterMatcher model.TagFilterMatcher
+       var tagFilter logical.TagFilter
        if uis.criteria != nil {
-               spanIDFilter := buildSpanIDFilter(uis.criteria, 
uis.spanIDTagName)
-               tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, 
entityDict, s, s, len(traceIDs) > 0, uis.traceIDTagName, uis.spanIDTagName)
-               if errFilter != nil {
-                       return nil, errFilter
+               var orderByTags []string
+               if ok, indexRule := s.IndexRuleDefined(uis.orderByTag); ok {
+                       orderByTags = indexRule.Tags
+               }
+               skippedTagNames := make([]string, 0, len(orderByTags)+2)
+               skippedTagNames = append(skippedTagNames, uis.traceIDTagName, 
uis.spanIDTagName)
+               skippedTagNames = append(skippedTagNames, orderByTags...)
+               tagFilter, err = logical.BuildTagFilter(uis.criteria, 
entityDict, s, s, len(traceIDs) > 0, skippedTagNames...)
+               if err != nil {
+                       return nil, err
                }
-               if tagFilter != logical.DummyFilter || spanIDFilter != nil {
+               // Get the decoder from the execution context (trace module)
+               decoder := 
uis.ec.(model.TagValueDecoderProvider).GetTagValueDecoder()

Review Comment:
   This type assertion could panic if `uis.ec` doesn't implement 
`TagValueDecoderProvider`. Add a type check or document the assumption that 
execution context always implements this interface.



##########
banyand/internal/sidx/sidx.go:
##########
@@ -364,21 +469,35 @@ func (bch *blockCursorHeap) merge(ctx context.Context, 
batchSize int, resultsCh
 
        // Initialize first batch
        batch := &QueryResponse{
-               Keys: make([]int64, 0),
-               Data: make([][]byte, 0),
-               Tags: make([][]Tag, 0),
-               SIDs: make([]common.SeriesID, 0),
+               Keys:    make([]int64, 0),
+               Data:    make([][]byte, 0),
+               Tags:    make([][]Tag, 0),
+               SIDs:    make([]common.SeriesID, 0),
+               PartIDs: make([]uint64, 0),
        }
 
+       // Track seen data for deduplication across all batches
+       seenData := make(map[string]struct{})

Review Comment:
   The `seenData` map grows unbounded across all batches in the merge operation 
and is never cleared. For long-running queries with many results, this could 
lead to significant memory consumption. Consider implementing a size limit or 
periodic cleanup strategy.



##########
banyand/trace/part_iter.go:
##########
@@ -63,6 +63,7 @@ func (pi *partIter) init(bma *blockMetadataArray, p *part, 
tids []string) {
 
        pi.bms = bma.arr
        pi.tids = tids
+       sort.Strings(pi.tids)

Review Comment:
   [nitpick] Sorting trace IDs on every `init` call could be expensive for 
queries with many trace IDs. Consider requiring the caller to provide 
pre-sorted IDs or documenting that callers should sort before passing to `init`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to