Copilot commented on code in PR #830:
URL:
https://github.com/apache/skywalking-banyandb/pull/830#discussion_r2475995636
##########
banyand/internal/sidx/sidx.go:
##########
@@ -223,28 +226,129 @@ func (s *sidx) loadBlockCursor(bc *blockCursor, tmpBlock
*block, bs blockScanRes
return false
}
- // Copy data to block cursor
- bc.userKeys = make([]int64, len(tmpBlock.userKeys))
- copy(bc.userKeys, tmpBlock.userKeys)
+ if metrics != nil {
+ metrics.blockElementsLoaded.Add(int64(len(tmpBlock.userKeys)))
+ }
- bc.data = make([][]byte, len(tmpBlock.data))
- for i, data := range tmpBlock.data {
- bc.data[i] = make([]byte, len(data))
- copy(bc.data[i], data)
+ totalElements := len(tmpBlock.userKeys)
+ if totalElements == 0 {
+ return false
}
- // Copy tags
+ // Pre-allocate slices for filtered data (optimize for common case
where most elements match)
+ bc.userKeys = make([]int64, 0, totalElements)
+ bc.data = make([][]byte, 0, totalElements)
+
+ // Pre-allocate tag slices
bc.tags = make(map[string][]Tag)
- for tagName, tagData := range tmpBlock.tags {
- tagSlice := make([]Tag, len(tagData.values))
- for i, value := range tagData.values {
- tagSlice[i] = Tag{
- Name: tagName,
- Value: value,
- ValueType: tagData.valueType,
+ for tagName := range tmpBlock.tags {
+ bc.tags[tagName] = make([]Tag, 0, totalElements)
+ }
+
+ // Track seen data for deduplication
+ seenData := make(map[string]struct{})
Review Comment:
Using `string(tmpBlock.data[i])` as map keys (line 258, 317) creates string
copies of potentially large byte slices. Consider using a hash function or byte
slice comparison to avoid unnecessary allocations, especially for large trace
data.
##########
banyand/trace/block_writer.go:
##########
@@ -258,30 +282,33 @@ func (bw *blockWriter) mustWriteBlock(tid string, b
*block) {
func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) {
if len(data) > 0 {
- bw.primaryBlockMetadata.mustWriteBlock(data, bw.traceIDs[0],
&bw.writers)
+ bw.primaryBlockMetadata.mustWriteBlock(data, bw.tidFirst,
&bw.writers)
bw.metaData = bw.primaryBlockMetadata.marshal(bw.metaData)
}
+ bw.hasWrittenBlocks = false
bw.minTimestamp = 0
bw.maxTimestamp = 0
+ bw.tidFirst = ""
Review Comment:
The `tidLast` field is not reset in `mustFlushPrimaryBlock`. This could
cause issues if blocks from different primary blocks are compared, as `tidLast`
will retain the value from the last block of the previous primary block.
```suggestion
bw.tidFirst = ""
bw.tidLast = ""
```
##########
banyand/internal/sidx/sidx.go:
##########
@@ -364,21 +469,35 @@ func (bch *blockCursorHeap) merge(ctx context.Context,
batchSize int, resultsCh
// Initialize first batch
batch := &QueryResponse{
- Keys: make([]int64, 0),
- Data: make([][]byte, 0),
- Tags: make([][]Tag, 0),
- SIDs: make([]common.SeriesID, 0),
+ Keys: make([]int64, 0),
+ Data: make([][]byte, 0),
+ Tags: make([][]Tag, 0),
+ SIDs: make([]common.SeriesID, 0),
+ PartIDs: make([]uint64, 0),
}
+ // Track seen data for deduplication across all batches
+ seenData := make(map[string]struct{})
+
for bch.Len() > 0 {
topBC := bch.bcc[0]
if topBC.idx < 0 || topBC.idx >= len(topBC.userKeys) {
heap.Pop(bch)
continue
}
- // Copy the element (may be filtered out by key range)
- topBC.copyTo(batch)
+ // Check for duplicate data before copying
+ currentData := topBC.data[topBC.idx]
+ dataKey := string(currentData)
+
+ // Only copy if this data hasn't been seen across all batches
+ if _, exists := seenData[dataKey]; !exists {
+ // Copy the element (may be filtered out by key range)
+ if topBC.copyTo(batch) {
+ // Mark this data as seen
+ seenData[dataKey] = struct{}{}
+ }
+ }
Review Comment:
Deduplication logic appears in three places: lines 258-265, lines 317-321,
and lines 489-500. Consider extracting this into a helper function to reduce
code duplication and make the deduplication strategy easier to maintain.
```suggestion
// Deduplicate and copy data
deduplicateAndCopy(topBC, batch, seenData)
```
##########
banyand/trace/streaming_pipeline.go:
##########
@@ -671,23 +703,96 @@ func (t *trace) startBlockScanStage(
continue
}
+ // Acquire snapshots from all tables
+ snapshots := make([]*snapshot, 0, len(tables))
+ for _, table := range tables {
+ s := table.currentSnapshot()
+ if s == nil {
+ continue
+ }
+ snapshots = append(snapshots, s)
+ }
+
+ // If no snapshots available, skip this batch
+ if len(snapshots) == 0 {
+ select {
+ case <-ctx.Done():
+ return
+ case out <- &scanBatch{traceBatch: batch}:
+ }
+ continue
+ }
+
+ // Start part selection span
+ partSelectionCtx, finishPartSelection :=
startPartSelectionSpan(ctx, &batch, snapshots)
+
+ parts := make([]*part, 0)
+ groupedIDs := make([][]string, 0)
+
+ allTraceIDs := make([]string, 0)
+ for _, ids := range batch.traceIDs {
+ allTraceIDs = append(allTraceIDs, ids...)
+ }
+ sort.Strings(allTraceIDs)
+
+ bloomFilteredPartIDs := make([]uint64, 0)
+ totalGroupedIDs := 0
+
+ for _, s := range snapshots {
+ for _, pw := range s.parts {
+ p := pw.p
+ partID := p.partMetadata.ID
+
+ var idsFromSIDX []string
+ if traceIDsFromSIDX, exists :=
batch.traceIDs[partID]; exists {
+ idsFromSIDX =
append([]string(nil), traceIDsFromSIDX...)
+ }
+ var idsForPart []string
+ for _, traceID := range allTraceIDs {
+ if slices.Contains(idsFromSIDX,
traceID) || p.traceIDFilter.filter.MightContain(convert.StringToBytes(traceID))
{
Review Comment:
Using `slices.Contains` in a nested loop (inside the `for _, traceID :=
range allTraceIDs` loop) results in O(n*m) complexity. Convert `idsFromSIDX` to
a map for O(1) lookups to improve performance.
```suggestion
// Convert idsFromSIDX to a map for
O(1) lookups
idsFromSIDXMap :=
make(map[string]struct{}, len(idsFromSIDX))
for _, id := range idsFromSIDX {
idsFromSIDXMap[id] = struct{}{}
}
var idsForPart []string
for _, traceID := range allTraceIDs {
if _, ok :=
idsFromSIDXMap[traceID]; ok ||
p.traceIDFilter.filter.MightContain(convert.StringToBytes(traceID)) {
```
##########
pkg/query/logical/trace/trace_plan_tag_filter.go:
##########
@@ -116,14 +127,31 @@ func (uis *unresolvedTraceTagFilter) Analyze(s
logical.Schema) (logical.Plan, er
return nil, errProject
}
}
- plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs, minVal, maxVal)
+ // Build tag filter and create matcher for SIDX
+ var tagFilterMatcher model.TagFilterMatcher
+ var tagFilter logical.TagFilter
if uis.criteria != nil {
- spanIDFilter := buildSpanIDFilter(uis.criteria,
uis.spanIDTagName)
- tagFilter, errFilter := logical.BuildTagFilter(uis.criteria,
entityDict, s, s, len(traceIDs) > 0, uis.traceIDTagName, uis.spanIDTagName)
- if errFilter != nil {
- return nil, errFilter
+ var orderByTags []string
+ if ok, indexRule := s.IndexRuleDefined(uis.orderByTag); ok {
+ orderByTags = indexRule.Tags
+ }
+ skippedTagNames := make([]string, 0, len(orderByTags)+2)
+ skippedTagNames = append(skippedTagNames, uis.traceIDTagName,
uis.spanIDTagName)
+ skippedTagNames = append(skippedTagNames, orderByTags...)
+ tagFilter, err = logical.BuildTagFilter(uis.criteria,
entityDict, s, s, len(traceIDs) > 0, skippedTagNames...)
+ if err != nil {
+ return nil, err
}
- if tagFilter != logical.DummyFilter || spanIDFilter != nil {
+ // Get the decoder from the execution context (trace module)
+ decoder :=
uis.ec.(model.TagValueDecoderProvider).GetTagValueDecoder()
Review Comment:
This type assertion could panic if `uis.ec` doesn't implement
`TagValueDecoderProvider`. Add a type check or document the assumption that
execution context always implements this interface.
##########
banyand/internal/sidx/sidx.go:
##########
@@ -364,21 +469,35 @@ func (bch *blockCursorHeap) merge(ctx context.Context,
batchSize int, resultsCh
// Initialize first batch
batch := &QueryResponse{
- Keys: make([]int64, 0),
- Data: make([][]byte, 0),
- Tags: make([][]Tag, 0),
- SIDs: make([]common.SeriesID, 0),
+ Keys: make([]int64, 0),
+ Data: make([][]byte, 0),
+ Tags: make([][]Tag, 0),
+ SIDs: make([]common.SeriesID, 0),
+ PartIDs: make([]uint64, 0),
}
+ // Track seen data for deduplication across all batches
+ seenData := make(map[string]struct{})
Review Comment:
The `seenData` map grows unbounded across all batches in the merge operation
and is never cleared. For long-running queries with many results, this could
lead to significant memory consumption. Consider implementing a size limit or
periodic cleanup strategy.
##########
banyand/trace/part_iter.go:
##########
@@ -63,6 +63,7 @@ func (pi *partIter) init(bma *blockMetadataArray, p *part,
tids []string) {
pi.bms = bma.arr
pi.tids = tids
+ sort.Strings(pi.tids)
Review Comment:
[nitpick] Sorting trace IDs on every `init` call could be expensive for
queries with many trace IDs. Consider requiring the caller to provide
pre-sorted IDs or documenting that callers should sort before passing to `init`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]