This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 54bf20b9 Remove Unnecessary Slice Allocation of Sidx Query (#847)
54bf20b9 is described below
commit 54bf20b960f29f7604f1b50802036b6c224d6ccf
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Nov 17 13:04:24 2025 +0800
Remove Unnecessary Slice Allocation of Sidx Query (#847)
* Remove Tags field from QueryResponse and related methods to streamline
data structure and improve performance.
* Refactor tag value decoding to support additional value array parameter
---
banyand/internal/sidx/block.go | 2 +-
banyand/internal/sidx/interfaces.go | 35 ---------------------
banyand/internal/sidx/query_result.go | 59 -----------------------------------
banyand/internal/sidx/query_test.go | 15 ---------
banyand/internal/sidx/sidx.go | 23 ++------------
banyand/internal/sidx/sidx_test.go | 14 +--------
banyand/internal/sidx/tag.go | 11 +++++--
banyand/trace/query.go | 20 ++++++++++--
pkg/query/model/tag_filter_matcher.go | 2 +-
9 files changed, 32 insertions(+), 149 deletions(-)
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index e5e77d2f..e136524e 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -261,7 +261,7 @@ func (b *block) mustWriteTag(tagName string, td *tagData,
bm *blockMetadata, ww
td.tmpBytes = td.tmpBytes[:len(td.values)]
}
for i := range td.values {
- td.tmpBytes[i] = marshalTagRow(&td.values[i], td.valueType)
+ td.tmpBytes[i] = marshalTagRow(td.tmpBytes[i][:0],
&td.values[i], td.valueType)
}
// Write tag values to data file
diff --git a/banyand/internal/sidx/interfaces.go
b/banyand/internal/sidx/interfaces.go
index f1570e53..cbe1e41b 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -93,7 +93,6 @@ type QueryResponse struct {
Error error
Keys []int64
Data [][]byte
- Tags [][]Tag
SIDs []common.SeriesID
PartIDs []uint64
Metadata ResponseMetadata
@@ -109,7 +108,6 @@ func (qr *QueryResponse) Reset() {
qr.Error = nil
qr.Keys = qr.Keys[:0]
qr.Data = qr.Data[:0]
- qr.Tags = qr.Tags[:0]
qr.SIDs = qr.SIDs[:0]
qr.PartIDs = qr.PartIDs[:0]
qr.Metadata = ResponseMetadata{}
@@ -132,20 +130,6 @@ func (qr *QueryResponse) Validate() error {
return fmt.Errorf("inconsistent array lengths: keys=%d,
partIDs=%d", keysLen, partIDsLen)
}
- // Validate Tags structure if present
- if len(qr.Tags) > 0 {
- if len(qr.Tags) != keysLen {
- return fmt.Errorf("tags length=%d, expected=%d",
len(qr.Tags), keysLen)
- }
- for i, tagGroup := range qr.Tags {
- for j, tag := range tagGroup {
- if tag.Name == "" {
- return fmt.Errorf("tags[%d][%d] name
cannot be empty", i, j)
- }
- }
- }
- }
-
return nil
}
@@ -168,25 +152,6 @@ func (qr *QueryResponse) CopyFrom(other *QueryResponse) {
qr.Data[i] = append(qr.Data[i][:0], data...)
}
- // Deep copy tags
- if cap(qr.Tags) < len(other.Tags) {
- qr.Tags = make([][]Tag, len(other.Tags))
- } else {
- qr.Tags = qr.Tags[:len(other.Tags)]
- }
- for i, tagGroup := range other.Tags {
- if cap(qr.Tags[i]) < len(tagGroup) {
- qr.Tags[i] = make([]Tag, len(tagGroup))
- } else {
- qr.Tags[i] = qr.Tags[i][:len(tagGroup)]
- }
- for j, tag := range tagGroup {
- qr.Tags[i][j].Name = tag.Name
- qr.Tags[i][j].Value = append(qr.Tags[i][j].Value[:0],
tag.Value...)
- qr.Tags[i][j].ValueType = tag.ValueType
- }
- }
-
// Copy metadata
qr.Metadata = other.Metadata
}
diff --git a/banyand/internal/sidx/query_result.go
b/banyand/internal/sidx/query_result.go
index 3669a291..c483058f 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -213,60 +213,9 @@ func (qr *queryResult) convertBlockToResponse(block
*block, seriesID common.Seri
result.Data = append(result.Data, block.data[i])
result.SIDs = append(result.SIDs, seriesID)
result.PartIDs = append(result.PartIDs, partID)
-
- // Convert tag map to tag slice for this element
- elementTags := qr.extractElementTags(block, i)
- result.Tags = append(result.Tags, elementTags)
}
}
-// extractElementTags extracts tags for a specific element with projection
support.
-func (qr *queryResult) extractElementTags(block *block, elemIndex int) []Tag {
- var elementTags []Tag
-
- // Apply tag projection from request
- if len(qr.request.TagProjection) > 0 {
- elementTags = make([]Tag, 0, len(qr.request.TagProjection))
- for _, proj := range qr.request.TagProjection {
- for _, tagName := range proj.Names {
- if tagData, exists := block.tags[tagName];
exists && elemIndex < len(tagData.values) {
- row := &tagData.values[elemIndex]
- tag := Tag{
- Name: tagName,
- ValueType: tagData.valueType,
- }
- if len(row.valueArr) > 0 {
- tag.ValueArr = row.valueArr
- } else if len(row.value) > 0 {
- tag.Value = row.value
- }
- elementTags = append(elementTags, tag)
- }
- }
- }
- } else {
- // Include all tags if no projection specified
- elementTags = make([]Tag, 0, len(block.tags))
- for tagName, tagData := range block.tags {
- if elemIndex < len(tagData.values) {
- row := &tagData.values[elemIndex]
- tag := Tag{
- Name: tagName,
- ValueType: tagData.valueType,
- }
- if len(row.valueArr) > 0 {
- tag.ValueArr = row.valueArr
- } else if len(row.value) > 0 {
- tag.Value = row.value
- }
- elementTags = append(elementTags, tag)
- }
- }
- }
-
- return elementTags
-}
-
// mergeQueryResponseShards merges multiple QueryResponse shards.
func mergeQueryResponseShards(shards []*QueryResponse, maxElements int)
*QueryResponse {
// Create heap for ascending merge
@@ -286,7 +235,6 @@ func mergeQueryResponseShards(shards []*QueryResponse,
maxElements int) *QueryRe
return &QueryResponse{
Keys: make([]int64, 0),
Data: make([][]byte, 0),
- Tags: make([][]Tag, 0),
SIDs: make([]common.SeriesID, 0),
PartIDs: make([]uint64, 0),
}
@@ -323,7 +271,6 @@ func mergeQueryResponseShardsDesc(shards []*QueryResponse,
maxElements int) *Que
return &QueryResponse{
Keys: make([]int64, 0),
Data: make([][]byte, 0),
- Tags: make([][]Tag, 0),
SIDs: make([]common.SeriesID, 0),
PartIDs: make([]uint64, 0),
}
@@ -393,7 +340,6 @@ func (qrh *QueryResponseHeap) mergeWithHeap(limit int)
*QueryResponse {
result := &QueryResponse{
Keys: make([]int64, 0, limit),
Data: make([][]byte, 0, limit),
- Tags: make([][]Tag, 0, limit),
SIDs: make([]common.SeriesID, 0, limit),
PartIDs: make([]uint64, 0, limit),
}
@@ -419,16 +365,12 @@ func (qrh *QueryResponseHeap) mergeWithHeap(limit int)
*QueryResponse {
var (
data []byte
- tags []Tag
sid common.SeriesID
partID uint64
)
if idx < len(resp.Data) {
data = resp.Data[idx]
}
- if idx < len(resp.Tags) {
- tags = resp.Tags[idx]
- }
if idx < len(resp.SIDs) {
sid = resp.SIDs[idx]
}
@@ -439,7 +381,6 @@ func (qrh *QueryResponseHeap) mergeWithHeap(limit int)
*QueryResponse {
// Copy element from top cursor
result.Keys = append(result.Keys, resp.Keys[idx])
result.Data = append(result.Data, data)
- result.Tags = append(result.Tags, tags)
result.SIDs = append(result.SIDs, sid)
result.PartIDs = append(result.PartIDs, partID)
diff --git a/banyand/internal/sidx/query_test.go
b/banyand/internal/sidx/query_test.go
index 225f2587..09431e03 100644
--- a/banyand/internal/sidx/query_test.go
+++ b/banyand/internal/sidx/query_test.go
@@ -307,25 +307,15 @@ func TestSIDX_Query_WithArrValues(t *testing.T) {
resultsCh, errCh := sidx.StreamingQuery(ctx, queryReq)
var keys []int64
- var tags [][]Tag
for res := range resultsCh {
require.NoError(t, res.Error)
keys = append(keys, res.Keys...)
- tags = append(tags, res.Tags...)
}
if err, ok := <-errCh; ok {
require.NoError(t, err)
}
assert.Equal(t, 3, len(keys))
- for i := 0; i < len(keys); i++ {
- if keys[i] == 100 {
- assert.Equal(t, "arr_tag", tags[i][0].Name)
- assert.Equal(t, 2, len(tags[i][0].ValueArr))
- assert.Equal(t, "a", string(tags[i][0].ValueArr[0]))
- assert.Equal(t, "b", string(tags[i][0].ValueArr[1]))
- }
- }
}
func TestSIDX_Query_Validation(t *testing.T) {
@@ -467,7 +457,6 @@ func TestSIDX_StreamingQuery_MatchesBlockingQuery(t
*testing.T) {
var (
expectedKeys []int64
expectedData [][]byte
- expectedTags [][]Tag
expectedSIDs []common.SeriesID
)
@@ -476,7 +465,6 @@ func TestSIDX_StreamingQuery_MatchesBlockingQuery(t
*testing.T) {
require.NoError(t, res.Error)
expectedKeys = append(expectedKeys, res.Keys...)
expectedData = append(expectedData, res.Data...)
- expectedTags = append(expectedTags, res.Tags...)
expectedSIDs = append(expectedSIDs, res.SIDs...)
}
if err, ok := <-errCh; ok {
@@ -487,7 +475,6 @@ func TestSIDX_StreamingQuery_MatchesBlockingQuery(t
*testing.T) {
var (
gotKeys []int64
gotData [][]byte
- gotTags [][]Tag
gotSIDs []common.SeriesID
)
@@ -496,7 +483,6 @@ func TestSIDX_StreamingQuery_MatchesBlockingQuery(t
*testing.T) {
require.NoError(t, res.Error)
gotKeys = append(gotKeys, res.Keys...)
gotData = append(gotData, res.Data...)
- gotTags = append(gotTags, res.Tags...)
gotSIDs = append(gotSIDs, res.SIDs...)
}
if err, ok := <-errCh2; ok {
@@ -506,7 +492,6 @@ func TestSIDX_StreamingQuery_MatchesBlockingQuery(t
*testing.T) {
require.Equal(t, expectedKeys, gotKeys)
require.Equal(t, expectedData, gotData)
require.Equal(t, expectedSIDs, gotSIDs)
- require.Equal(t, expectedTags, gotTags)
require.Equal(t, len(expectedKeys), len(gotKeys))
})
}
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 370e4204..7844087b 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -307,23 +307,16 @@ func (b *blockCursorBuilder) processWithoutFilter() {
}
}
-func (b *blockCursorBuilder) collectTagsForFilter(buf []*modelv1.Tag, decoder
func(pbv1.ValueType, []byte) *modelv1.TagValue, index int) []*modelv1.Tag {
+func (b *blockCursorBuilder) collectTagsForFilter(buf []*modelv1.Tag, decoder
func(pbv1.ValueType, []byte, [][]byte) *modelv1.TagValue, index int)
[]*modelv1.Tag {
buf = buf[:0]
+
for tagName, tagData := range b.block.tags {
if index >= len(tagData.values) {
continue
}
row := &tagData.values[index]
- var marshaledValue []byte
- if row.valueArr != nil || row.value != nil {
- marshaledValue = marshalTagRow(row, tagData.valueType)
- }
- if marshaledValue == nil {
- continue
- }
-
- tagValue := decoder(tagData.valueType, marshaledValue)
+ tagValue := decoder(tagData.valueType, row.value, row.valueArr)
if tagValue != nil {
buf = append(buf, &modelv1.Tag{
Key: tagName,
@@ -503,14 +496,6 @@ func (bc *blockCursor) copyTo(result *QueryResponse) bool {
result.SIDs = append(result.SIDs, bc.seriesID)
result.PartIDs = append(result.PartIDs, bc.p.ID())
- // Copy tags for this element
- var elementTags []Tag
- for _, tagSlice := range bc.tags {
- if bc.idx < len(tagSlice) {
- elementTags = append(elementTags, tagSlice[bc.idx])
- }
- }
- result.Tags = append(result.Tags, elementTags)
return true
}
@@ -600,7 +585,6 @@ func (bch *blockCursorHeap) merge(ctx context.Context,
batchSize int, resultsCh
batch := &QueryResponse{
Keys: make([]int64, 0),
Data: make([][]byte, 0),
- Tags: make([][]Tag, 0),
SIDs: make([]common.SeriesID, 0),
PartIDs: make([]uint64, 0),
}
@@ -668,7 +652,6 @@ func (bch *blockCursorHeap) merge(ctx context.Context,
batchSize int, resultsCh
batch = &QueryResponse{
Keys: make([]int64, 0),
Data: make([][]byte, 0),
- Tags: make([][]Tag, 0),
SIDs: make([]common.SeriesID, 0),
PartIDs: make([]uint64, 0),
}
diff --git a/banyand/internal/sidx/sidx_test.go
b/banyand/internal/sidx/sidx_test.go
index ebd9a0c8..f727faa7 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -645,7 +645,6 @@ func TestQueryResult_MaxBatchSize(t *testing.T) {
result := &QueryResponse{
Keys: make([]int64, 0),
Data: make([][]byte, 0),
- Tags: make([][]Tag, 0),
SIDs: make([]common.SeriesID, 0),
PartIDs: make([]uint64, 0),
}
@@ -657,7 +656,6 @@ func TestQueryResult_MaxBatchSize(t *testing.T) {
assert.Equal(t, len(result.Keys), len(result.Data),
"Keys and Data arrays should have same length")
assert.Equal(t, len(result.Keys), len(result.SIDs),
"Keys and SIDs arrays should have same length")
- assert.Equal(t, len(result.Keys), len(result.Tags),
"Keys and Tags arrays should have same length")
assert.Equal(t, len(result.Keys), len(result.PartIDs),
"Keys and PartIDs arrays should have same length")
})
}
@@ -674,7 +672,6 @@ func
TestQueryResult_ConvertBlockToResponse_RespectsLimitAcrossCalls(t *testing.
result := &QueryResponse{
Keys: make([]int64, 0),
Data: make([][]byte, 0),
- Tags: make([][]Tag, 0),
SIDs: make([]common.SeriesID, 0),
PartIDs: make([]uint64, 0),
}
@@ -729,13 +726,11 @@ func TestQueryResponseHeap_MergeWithHeap(t *testing.T) {
{
Keys: []int64{100, 300},
Data: [][]byte{[]byte("trace1"),
[]byte("trace3")},
- Tags: [][]Tag{{}, {}},
SIDs: []common.SeriesID{1, 3},
},
{
Keys: []int64{200, 400},
Data: [][]byte{[]byte("trace2"),
[]byte("trace4")},
- Tags: [][]Tag{{}, {}},
SIDs: []common.SeriesID{2, 4},
},
},
@@ -749,13 +744,11 @@ func TestQueryResponseHeap_MergeWithHeap(t *testing.T) {
{
Keys: []int64{100, 300},
Data: [][]byte{[]byte("trace1"),
[]byte("trace3")},
- Tags: [][]Tag{{}, {}},
SIDs: []common.SeriesID{1, 3},
},
{
Keys: []int64{200, 400},
Data: [][]byte{[]byte("trace2"),
[]byte("trace4")},
- Tags: [][]Tag{{}, {}},
SIDs: []common.SeriesID{2, 4},
},
},
@@ -777,7 +770,6 @@ func TestQueryResponseHeap_MergeWithHeap(t *testing.T) {
assert.Equal(t, len(result.Keys), len(result.Data),
"Keys and Data arrays should have same length")
assert.Equal(t, len(result.Keys), len(result.SIDs),
"Keys and SIDs arrays should have same length")
- assert.Equal(t, len(result.Keys), len(result.Tags),
"Keys and Tags arrays should have same length")
})
}
}
@@ -788,13 +780,11 @@ func TestQueryResponseHeap_MergeDescending(t *testing.T) {
{
Keys: []int64{200, 400}, // stored ascending
Data: [][]byte{[]byte("trace2"), []byte("trace4")},
- Tags: [][]Tag{{}, {}},
SIDs: []common.SeriesID{2, 4},
},
{
Keys: []int64{150, 300}, // stored ascending
Data: [][]byte{[]byte("trace1"), []byte("trace3")},
- Tags: [][]Tag{{}, {}},
SIDs: []common.SeriesID{1, 3},
},
}
@@ -815,7 +805,6 @@ func TestQueryResponse_Reset(t *testing.T) {
result := &QueryResponse{
Keys: []int64{100, 200},
Data: [][]byte{[]byte("trace1"), []byte("trace2")},
- Tags: [][]Tag{{}, {}},
SIDs: []common.SeriesID{1, 2},
Metadata: ResponseMetadata{
Warnings: []string{"warn"},
@@ -829,7 +818,6 @@ func TestQueryResponse_Reset(t *testing.T) {
assert.Nil(t, result.Error)
assert.Equal(t, 0, len(result.Keys))
assert.Equal(t, 0, len(result.Data))
- assert.Equal(t, 0, len(result.Tags))
assert.Equal(t, 0, len(result.SIDs))
assert.Equal(t, ResponseMetadata{}, result.Metadata)
}
@@ -850,7 +838,7 @@ func (m *mockTagFilterMatcher) GetDecoder()
model.TagValueDecoder {
return m.decoder
}
-func testTagValueDecoder(valueType pbv1.ValueType, value []byte)
*modelv1.TagValue {
+func testTagValueDecoder(valueType pbv1.ValueType, value []byte, _ [][]byte)
*modelv1.TagValue {
if value == nil {
return pbv1.NullTagValue
}
diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go
index 0314500f..b4c1d510 100644
--- a/banyand/internal/sidx/tag.go
+++ b/banyand/internal/sidx/tag.go
@@ -60,6 +60,11 @@ type tagRow struct {
func (tr *tagRow) reset() {
tr.value = nil
+ if tr.valueArr != nil {
+ for i := range tr.valueArr {
+ tr.valueArr[i] = nil
+ }
+ }
tr.valueArr = tr.valueArr[:0]
}
@@ -190,9 +195,8 @@ func decodeBloomFilter(src []byte) (*filter.BloomFilter,
error) {
}
// marshalTagRow marshals the tagRow value to a byte slice.
-func marshalTagRow(tr *tagRow, valueType pbv1.ValueType) []byte {
+func marshalTagRow(dst []byte, tr *tagRow, valueType pbv1.ValueType) []byte {
if tr.valueArr != nil {
- var dst []byte
for i := range tr.valueArr {
if valueType == pbv1.ValueTypeInt64Arr {
dst = append(dst, tr.valueArr[i]...)
@@ -202,7 +206,8 @@ func marshalTagRow(tr *tagRow, valueType pbv1.ValueType)
[]byte {
}
return dst
}
- return tr.value
+ dst = append(dst, tr.value...)
+ return dst
}
// decodeAndConvertTagValues decodes encoded tag values and converts them to
tagRow format.
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 6fa20fae..8aecc80f 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -134,7 +134,7 @@ func validateTraceQueryOptions(tqo model.TraceQueryOptions)
error {
}
func (t *trace) GetTagValueDecoder() model.TagValueDecoder {
- return mustDecodeTagValue
+ return mustDecodeTagValueAndArray
}
func (t *trace) ensureTSDB() (storage.TSDB[*tsTable, option], error) {
@@ -501,7 +501,11 @@ func (qr *queryResult) Release() {
}
func mustDecodeTagValue(valueType pbv1.ValueType, value []byte)
*modelv1.TagValue {
- if value == nil {
+ return mustDecodeTagValueAndArray(valueType, value, nil)
+}
+
+func mustDecodeTagValueAndArray(valueType pbv1.ValueType, value []byte,
valueArr [][]byte) *modelv1.TagValue {
+ if value == nil && valueArr == nil {
return pbv1.NullTagValue
}
switch valueType {
@@ -513,12 +517,24 @@ func mustDecodeTagValue(valueType pbv1.ValueType, value
[]byte) *modelv1.TagValu
return binaryDataTagValue(value)
case pbv1.ValueTypeInt64Arr:
var values []int64
+ if valueArr != nil {
+ for _, v := range valueArr {
+ values = append(values, convert.BytesToInt64(v))
+ }
+ return int64ArrTagValue(values)
+ }
for i := 0; i < len(value); i += 8 {
values = append(values,
convert.BytesToInt64(value[i:i+8]))
}
return int64ArrTagValue(values)
case pbv1.ValueTypeStrArr:
var values []string
+ if valueArr != nil {
+ for _, v := range valueArr {
+ values = append(values, string(v))
+ }
+ return strArrTagValue(values)
+ }
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
var err error
diff --git a/pkg/query/model/tag_filter_matcher.go
b/pkg/query/model/tag_filter_matcher.go
index 993285a0..aae6e1f0 100644
--- a/pkg/query/model/tag_filter_matcher.go
+++ b/pkg/query/model/tag_filter_matcher.go
@@ -23,7 +23,7 @@ import (
)
// TagValueDecoder decodes a byte slice to a TagValue based on the value type.
-type TagValueDecoder func(valueType pbv1.ValueType, value []byte)
*modelv1.TagValue
+type TagValueDecoder func(valueType pbv1.ValueType, value []byte, valueArr
[][]byte) *modelv1.TagValue
// TagValueDecoderProvider provides a decoder for tag values.
type TagValueDecoderProvider interface {