This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit f4b4d939f2f593b19d6200362b64c54ec0b1aefb Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sun Aug 24 19:27:23 2025 +0800 Implement tag filtering operations: Introduce the tagFilterOp struct for efficient tag filtering in the secondary index, including equality and range checks. Add comprehensive unit tests for tag filtering functionality, covering various scenarios and edge cases. Ensure robust error handling and performance validation through benchmarks. --- banyand/internal/sidx/TODO.md | 20 +- banyand/internal/sidx/part_iter.go | 79 ----- banyand/internal/sidx/tag_filter_op.go | 271 ++++++++++++++ banyand/internal/sidx/tag_filter_op_test.go | 528 ++++++++++++++++++++++++++++ 4 files changed, 809 insertions(+), 89 deletions(-) diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md index 1b3a69e7..fedea097 100644 --- a/banyand/internal/sidx/TODO.md +++ b/banyand/internal/sidx/TODO.md @@ -26,16 +26,16 @@ This document tracks the implementation progress of the Secondary Index File Sys ## Phase 6: Query Path (Following Stream Architecture) ### 6.1 Part Iterator (`part_iter.go` or extend `part.go`) -- [ ] **Implementation Tasks**: - - [ ] Create `partIter` struct for single part iteration - - [ ] Implement `init(part, seriesIDs, minKey, maxKey, filter)` - - [ ] Add `nextBlock() bool` method for block advancement - - [ ] Create `curBlock` access and `error()` handling -- [ ] **Test Cases**: - - [ ] Part iteration finds all matching blocks in correct order - - [ ] Key range filtering works at block level - - [ ] Error handling for corrupted parts - - [ ] Performance meets single-part iteration targets +- [x] **Implementation Tasks**: + - [x] Create `partIter` struct for single part iteration + - [x] Implement `init(part, seriesIDs, minKey, maxKey, filter)` + - [x] Add `nextBlock() bool` method for block advancement + - [x] Create `curBlock` access and `error()` handling +- [x] **Test Cases**: + - [x] Part iteration finds all matching blocks in correct order + - [x] Key range filtering works at block level + - [x] Error handling for corrupted parts + - [x] Performance meets single-part iteration targets ### 6.2 Multi-Part Iterator (`iter.go` - like stream's `tstIter`) - [ ] **Implementation Tasks**: diff --git a/banyand/internal/sidx/part_iter.go b/banyand/internal/sidx/part_iter.go index 46285bcf..3c0cb801 100644 --- a/banyand/internal/sidx/part_iter.go +++ b/banyand/internal/sidx/part_iter.go @@ -30,7 +30,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/pool" ) type partIter struct { @@ -281,81 +280,3 @@ func (bm *blockMetadata) copyFrom(other *blockMetadata) { bm.tagProjection = bm.tagProjection[:0] bm.tagProjection = append(bm.tagProjection, other.tagProjection...) } - -// tagFilterOp provides a FilterOp implementation for tag filtering in sidx. -type tagFilterOp struct { - blockMetadata *blockMetadata - part *part - tagCache map[string][]byte -} - -// Eq checks if a tag equals a specific value. -func (tfo *tagFilterOp) Eq(tagName string, tagValue string) bool { - // For sidx, we need to check if the tag value exists in the block's tags - // This is a simplified implementation that can be enhanced - if tfo.blockMetadata == nil || tfo.part == nil { - return false - } - - // Check if the tag exists in the block - if _, exists := tfo.blockMetadata.tagsBlocks[tagName]; !exists { - return false - } - - // For now, return true indicating the tag exists - // In a full implementation, this would check the actual tag values - // against bloom filters or other indexing structures - return true -} - -// Range checks if a tag is within a specific range. -func (tfo *tagFilterOp) Range(tagName string, rangeOpts index.RangeOpts) (bool, error) { - // For sidx, we need to check if the tag values fall within the range - // This is a simplified implementation that can be enhanced - if tfo.blockMetadata == nil || tfo.part == nil { - return false, nil - } - - // Check if the tag exists in the block - if _, exists := tfo.blockMetadata.tagsBlocks[tagName]; !exists { - return false, nil - } - - // For now, return true indicating the tag exists within range - // In a full implementation, this would check the actual tag value ranges - // against the min/max metadata stored in the block - return true, nil -} - -// reset resets the tagFilterOp for reuse. -func (tfo *tagFilterOp) reset() { - tfo.blockMetadata = nil - tfo.part = nil - for k := range tfo.tagCache { - delete(tfo.tagCache, k) - } -} - -// generateTagFilterOp gets a tagFilterOp from pool or creates new. -func generateTagFilterOp(bm *blockMetadata, p *part) *tagFilterOp { - v := tagFilterOpPool.Get() - if v == nil { - return &tagFilterOp{ - blockMetadata: bm, - part: p, - tagCache: make(map[string][]byte), - } - } - tfo := v - tfo.blockMetadata = bm - tfo.part = p - return tfo -} - -// releaseTagFilterOp returns tagFilterOp to pool after reset. -func releaseTagFilterOp(tfo *tagFilterOp) { - tfo.reset() - tagFilterOpPool.Put(tfo) -} - -var tagFilterOpPool = pool.Register[*tagFilterOp]("sidx-tagFilterOp") diff --git a/banyand/internal/sidx/tag_filter_op.go b/banyand/internal/sidx/tag_filter_op.go new file mode 100644 index 00000000..36b2e64b --- /dev/null +++ b/banyand/internal/sidx/tag_filter_op.go @@ -0,0 +1,271 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "bytes" + "fmt" + + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/filter" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" +) + +// tagFilterOp provides a FilterOp implementation for tag filtering in sidx. +type tagFilterOp struct { + blockMetadata *blockMetadata + part *part + tagCache map[string]*tagFilterCache +} + +// tagFilterCache caches tag filter data for a specific tag. +type tagFilterCache struct { + bloomFilter *filter.BloomFilter + min []byte + max []byte + valueType pbv1.ValueType +} + +// Eq checks if a tag equals a specific value by reading tag data and checking bloom filter. +func (tfo *tagFilterOp) Eq(tagName string, tagValue string) bool { + if tfo.blockMetadata == nil || tfo.part == nil { + return false + } + + // Check if the tag exists in the block + tagBlock, exists := tfo.blockMetadata.tagsBlocks[tagName] + if !exists { + return false + } + + // Get or create cached tag filter data + cache, err := tfo.getTagFilterCache(tagName, tagBlock) + if err != nil { + logger.Errorf("failed to get tag filter cache for %s: %v", tagName, err) + return true // Conservative approach - don't filter out + } + + // Use bloom filter to check if the value might exist + if cache.bloomFilter != nil { + return cache.bloomFilter.MightContain([]byte(tagValue)) + } + + // If no bloom filter, conservatively return true + return true +} + +// Range checks if a tag is within a specific range using min/max metadata. +func (tfo *tagFilterOp) Range(tagName string, rangeOpts index.RangeOpts) (bool, error) { + if tfo.blockMetadata == nil || tfo.part == nil { + return false, nil + } + + // Check if the tag exists in the block + tagBlock, exists := tfo.blockMetadata.tagsBlocks[tagName] + if !exists { + return false, nil + } + + // Get or create cached tag filter data + cache, err := tfo.getTagFilterCache(tagName, tagBlock) + if err != nil { + return false, fmt.Errorf("failed to get tag filter cache for %s: %w", tagName, err) + } + + // Only perform range check for numeric types with min/max values + if cache.valueType != pbv1.ValueTypeInt64 || len(cache.min) == 0 || len(cache.max) == 0 { + return true, nil // Conservative approach for non-numeric or missing min/max + } + + // Check lower bound + if rangeOpts.Lower != nil { + lower, ok := rangeOpts.Lower.(*index.FloatTermValue) + if !ok { + return false, fmt.Errorf("lower bound is not a float value: %v", rangeOpts.Lower) + } + value := make([]byte, 0) + value = encoding.Int64ToBytes(value, int64(lower.Value)) + if bytes.Compare(cache.max, value) == -1 || (!rangeOpts.IncludesLower && bytes.Equal(cache.max, value)) { + return false, nil + } + } + + // Check upper bound + if rangeOpts.Upper != nil { + upper, ok := rangeOpts.Upper.(*index.FloatTermValue) + if !ok { + return false, fmt.Errorf("upper bound is not a float value: %v", rangeOpts.Upper) + } + value := make([]byte, 0) + value = encoding.Int64ToBytes(value, int64(upper.Value)) + if bytes.Compare(cache.min, value) == 1 || (!rangeOpts.IncludesUpper && bytes.Equal(cache.min, value)) { + return false, nil + } + } + + return true, nil +} + +// getTagFilterCache retrieves or creates cached tag filter data. +func (tfo *tagFilterOp) getTagFilterCache(tagName string, tagBlock dataBlock) (*tagFilterCache, error) { + // Check cache first + if cache, exists := tfo.tagCache[tagName]; exists { + return cache, nil + } + + // Read tag metadata to get filter information + tagMetadata, err := tfo.readTagMetadata(tagName, tagBlock) + if err != nil { + return nil, fmt.Errorf("failed to read tag metadata: %w", err) + } + defer releaseTagMetadata(tagMetadata) + + // Create cache entry + cache := &tagFilterCache{ + valueType: tagMetadata.valueType, + min: make([]byte, len(tagMetadata.min)), + max: make([]byte, len(tagMetadata.max)), + } + copy(cache.min, tagMetadata.min) + copy(cache.max, tagMetadata.max) + + // Read bloom filter if available + if tagMetadata.filterBlock.size > 0 { + bf, err := tfo.readBloomFilter(tagName, tagMetadata.filterBlock) + if err != nil { + return nil, fmt.Errorf("failed to read bloom filter: %w", err) + } + cache.bloomFilter = bf + } + + // Cache the result + tfo.tagCache[tagName] = cache + + return cache, nil +} + +// readTagMetadata reads tag metadata from the tag metadata files. +func (tfo *tagFilterOp) readTagMetadata(tagName string, tagBlock dataBlock) (*tagMetadata, error) { + if tagBlock.size == 0 { + return nil, fmt.Errorf("empty tag block for %s", tagName) + } + + // Get tag metadata reader + metaReader, exists := tfo.part.getTagMetadataReader(tagName) + if !exists { + return nil, fmt.Errorf("no metadata reader for tag %s", tagName) + } + + // Read compressed metadata + compressedData := make([]byte, tagBlock.size) + fs.MustReadData(metaReader, int64(tagBlock.offset), compressedData) + + // Decompress data + decompressedData, err := zstd.Decompress(nil, compressedData) + if err != nil { + return nil, fmt.Errorf("failed to decompress tag metadata: %w", err) + } + + // Unmarshal tag metadata + tm, err := unmarshalTagMetadata(decompressedData) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal tag metadata: %w", err) + } + + return tm, nil +} + +// readBloomFilter reads and decodes a bloom filter from filter data. +func (tfo *tagFilterOp) readBloomFilter(tagName string, filterBlock dataBlock) (*filter.BloomFilter, error) { + if filterBlock.size == 0 { + return nil, fmt.Errorf("empty filter block") + } + + // Get filter reader + filterReader, exists := tfo.part.getTagFilterReader(tagName) + if !exists { + return nil, fmt.Errorf("no filter reader for tag %s", tagName) + } + + // Read filter data + filterData := make([]byte, filterBlock.size) + fs.MustReadData(filterReader, int64(filterBlock.offset), filterData) + + // Decode bloom filter (following stream module pattern) + bf := filter.NewBloomFilter(0) + bf = decodeBloomFilterFromBytes(filterData, bf) + + return bf, nil +} + +// decodeBloomFilterFromBytes decodes bloom filter data (similar to stream module). +func decodeBloomFilterFromBytes(src []byte, bf *filter.BloomFilter) *filter.BloomFilter { + n := encoding.BytesToInt64(src) + bf.SetN(int(n)) + + m := n * filter.B + bits := make([]uint64, 0) + bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:], uint64((m+63)/64)) + if err != nil { + logger.Panicf("failed to decode Bloom filter: %v", err) + } + bf.SetBits(bits) + + return bf +} + +// reset resets the tagFilterOp for reuse. +func (tfo *tagFilterOp) reset() { + tfo.blockMetadata = nil + tfo.part = nil + for key, cache := range tfo.tagCache { + if cache.bloomFilter != nil { + cache.bloomFilter.Reset() + } + delete(tfo.tagCache, key) + } +} + +// generateTagFilterOp gets a tagFilterOp from pool or creates new. +func generateTagFilterOp(bm *blockMetadata, p *part) *tagFilterOp { + v := tagFilterOpPool.Get() + if v == nil { + return &tagFilterOp{ + blockMetadata: bm, + part: p, + tagCache: make(map[string]*tagFilterCache), + } + } + tfo := v + tfo.blockMetadata = bm + tfo.part = p + return tfo +} + +// releaseTagFilterOp returns tagFilterOp to pool after reset. +func releaseTagFilterOp(tfo *tagFilterOp) { + tfo.reset() + tagFilterOpPool.Put(tfo) +} + +var tagFilterOpPool = pool.Register[*tagFilterOp]("sidx-tagFilterOp") diff --git a/banyand/internal/sidx/tag_filter_op_test.go b/banyand/internal/sidx/tag_filter_op_test.go new file mode 100644 index 00000000..1f5eff87 --- /dev/null +++ b/banyand/internal/sidx/tag_filter_op_test.go @@ -0,0 +1,528 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/filter" + "github.com/apache/skywalking-banyandb/pkg/index" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" +) + +func TestTagFilterOpPooling(t *testing.T) { + t.Run("pool allocation and release", func(t *testing.T) { + // Create mock block metadata and part + bm := &blockMetadata{ + tagsBlocks: make(map[string]dataBlock), + } + p := &part{} + + // Get from pool + tfo1 := generateTagFilterOp(bm, p) + require.NotNil(t, tfo1) + assert.Equal(t, bm, tfo1.blockMetadata) + assert.Equal(t, p, tfo1.part) + assert.NotNil(t, tfo1.tagCache) + + // Get another from pool + tfo2 := generateTagFilterOp(bm, p) + require.NotNil(t, tfo2) + assert.NotSame(t, tfo1, tfo2, "pool should provide different instances when available") + + // Release back to pool + releaseTagFilterOp(tfo1) + releaseTagFilterOp(tfo2) + + // Verify reset was called + assert.Nil(t, tfo1.blockMetadata) + assert.Nil(t, tfo1.part) + assert.Empty(t, tfo1.tagCache) + }) +} + +func TestTagFilterOpReset(t *testing.T) { + // Create a tagFilterOp with data + tfo := &tagFilterOp{ + blockMetadata: &blockMetadata{}, + part: &part{}, + tagCache: map[string]*tagFilterCache{ + "tag1": { + bloomFilter: filter.NewBloomFilter(100), + min: []byte("min"), + max: []byte("max"), + valueType: pbv1.ValueTypeStr, + }, + }, + } + + // Call reset + tfo.reset() + + // Verify everything is reset + assert.Nil(t, tfo.blockMetadata) + assert.Nil(t, tfo.part) + assert.Empty(t, tfo.tagCache) +} + +func TestTagFilterOpEq(t *testing.T) { + tests := []struct { + blockMetadata *blockMetadata + part *part + name string + tagName string + tagValue string + description string + expectedResult bool + }{ + { + name: "nil block metadata", + blockMetadata: nil, + part: &part{}, + tagName: "service", + tagValue: "order-service", + expectedResult: false, + description: "should return false when blockMetadata is nil", + }, + { + name: "nil part", + blockMetadata: &blockMetadata{tagsBlocks: make(map[string]dataBlock)}, + part: nil, + tagName: "service", + tagValue: "order-service", + expectedResult: false, + description: "should return false when part is nil", + }, + { + name: "tag not in block", + blockMetadata: &blockMetadata{ + tagsBlocks: map[string]dataBlock{ + "other_tag": {offset: 0, size: 100}, + }, + }, + part: &part{}, + tagName: "service", + tagValue: "order-service", + expectedResult: false, + description: "should return false when tag doesn't exist in block", + }, + { + name: "tag exists but no cache data", + blockMetadata: &blockMetadata{ + tagsBlocks: map[string]dataBlock{ + "service": {offset: 0, size: 100}, + }, + }, + part: &part{}, + tagName: "service", + tagValue: "order-service", + expectedResult: true, + description: "should return true (conservative) when tag exists but cache fails", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tfo := &tagFilterOp{ + blockMetadata: tt.blockMetadata, + part: tt.part, + tagCache: make(map[string]*tagFilterCache), + } + + result := tfo.Eq(tt.tagName, tt.tagValue) + assert.Equal(t, tt.expectedResult, result, tt.description) + }) + } +} + +func TestTagFilterOpRange(t *testing.T) { + tests := []struct { + blockMetadata *blockMetadata + part *part + rangeOpts index.RangeOpts + name string + tagName string + description string + expectedResult bool + expectError bool + }{ + { + name: "nil block metadata", + blockMetadata: nil, + part: &part{}, + tagName: "duration", + rangeOpts: index.RangeOpts{}, + expectedResult: false, + expectError: false, + description: "should return false when blockMetadata is nil", + }, + { + name: "nil part", + blockMetadata: &blockMetadata{tagsBlocks: make(map[string]dataBlock)}, + part: nil, + tagName: "duration", + rangeOpts: index.RangeOpts{}, + expectedResult: false, + expectError: false, + description: "should return false when part is nil", + }, + { + name: "tag not in block", + blockMetadata: &blockMetadata{ + tagsBlocks: map[string]dataBlock{ + "other_tag": {offset: 0, size: 100}, + }, + }, + part: &part{}, + tagName: "duration", + rangeOpts: index.RangeOpts{}, + expectedResult: false, + expectError: false, + description: "should return false when tag doesn't exist in block", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tfo := &tagFilterOp{ + blockMetadata: tt.blockMetadata, + part: tt.part, + tagCache: make(map[string]*tagFilterCache), + } + + result, err := tfo.Range(tt.tagName, tt.rangeOpts) + + if tt.expectError { + assert.Error(t, err, tt.description) + } else { + assert.NoError(t, err, tt.description) + } + + assert.Equal(t, tt.expectedResult, result, tt.description) + }) + } +} + +func TestTagFilterOpRangeWithCache(t *testing.T) { + // Create a cache with numeric data + cache := &tagFilterCache{ + valueType: pbv1.ValueTypeInt64, + min: encoding.Int64ToBytes(nil, 100), + max: encoding.Int64ToBytes(nil, 500), + } + + tfo := &tagFilterOp{ + blockMetadata: &blockMetadata{ + tagsBlocks: map[string]dataBlock{ + "duration": {offset: 0, size: 100}, + }, + }, + part: &part{}, + tagCache: map[string]*tagFilterCache{ + "duration": cache, + }, + } + + tests := []struct { + rangeOpts index.RangeOpts + name string + description string + expectedResult bool + expectError bool + }{ + { + name: "range completely below", + rangeOpts: index.RangeOpts{ + Lower: &index.FloatTermValue{Value: 10}, + Upper: &index.FloatTermValue{Value: 50}, + IncludesLower: true, + IncludesUpper: true, + }, + expectedResult: false, + expectError: false, + description: "should return false when range is completely below min", + }, + { + name: "range completely above", + rangeOpts: index.RangeOpts{ + Lower: &index.FloatTermValue{Value: 600}, + Upper: &index.FloatTermValue{Value: 800}, + IncludesLower: true, + IncludesUpper: true, + }, + expectedResult: false, + expectError: false, + description: "should return false when range is completely above max", + }, + { + name: "range overlaps", + rangeOpts: index.RangeOpts{ + Lower: &index.FloatTermValue{Value: 200}, + Upper: &index.FloatTermValue{Value: 300}, + IncludesLower: true, + IncludesUpper: true, + }, + expectedResult: true, + expectError: false, + description: "should return true when range overlaps with min/max", + }, + { + name: "range contains all", + rangeOpts: index.RangeOpts{ + Lower: &index.FloatTermValue{Value: 50}, + Upper: &index.FloatTermValue{Value: 600}, + IncludesLower: true, + IncludesUpper: true, + }, + expectedResult: true, + expectError: false, + description: "should return true when range contains all values", + }, + { + name: "lower boundary exclusive miss", + rangeOpts: index.RangeOpts{ + Lower: &index.FloatTermValue{Value: 500}, + IncludesLower: false, + }, + expectedResult: false, + expectError: false, + description: "should return false when lower boundary is exclusive and equals max", + }, + { + name: "upper boundary exclusive miss", + rangeOpts: index.RangeOpts{ + Upper: &index.FloatTermValue{Value: 100}, + IncludesUpper: false, + }, + expectedResult: false, + expectError: false, + description: "should return false when upper boundary is exclusive and equals min", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := tfo.Range("duration", tt.rangeOpts) + + if tt.expectError { + assert.Error(t, err, tt.description) + } else { + assert.NoError(t, err, tt.description) + } + + assert.Equal(t, tt.expectedResult, result, tt.description) + }) + } +} + +func TestTagFilterOpRangeNonNumeric(t *testing.T) { + // Create a cache with string data (no min/max) + cache := &tagFilterCache{ + valueType: pbv1.ValueTypeStr, + min: nil, + max: nil, + } + + tfo := &tagFilterOp{ + blockMetadata: &blockMetadata{ + tagsBlocks: map[string]dataBlock{ + "service": {offset: 0, size: 100}, + }, + }, + part: &part{}, + tagCache: map[string]*tagFilterCache{ + "service": cache, + }, + } + + // For non-numeric types, should always return true (conservative approach) + result, err := tfo.Range("service", index.RangeOpts{ + Lower: &index.FloatTermValue{Value: 100}, + Upper: &index.FloatTermValue{Value: 200}, + IncludesLower: true, + IncludesUpper: true, + }) + + assert.NoError(t, err) + assert.True(t, result, "should return true for non-numeric types") +} + +func TestDecodeBloomFilterFromBytes(t *testing.T) { + // Create test bloom filter data + originalBF := filter.NewBloomFilter(1000) + originalBF.Add([]byte("test-value-1")) + originalBF.Add([]byte("test-value-2")) + originalBF.Add([]byte("test-value-3")) + + // Encode bloom filter data (similar to stream module) + n := int64(originalBF.N()) + bits := originalBF.Bits() + + var buf bytes.Buffer + buf.Write(encoding.Int64ToBytes(nil, n)) + + encodedBits := encoding.EncodeUint64Block(nil, bits) + buf.Write(encodedBits) + + // Test decoding + decodedBF := filter.NewBloomFilter(0) + result := decodeBloomFilterFromBytes(buf.Bytes(), decodedBF) + + // Verify decoded bloom filter + assert.Equal(t, originalBF.N(), result.N()) + assert.True(t, result.MightContain([]byte("test-value-1"))) + assert.True(t, result.MightContain([]byte("test-value-2"))) + assert.True(t, result.MightContain([]byte("test-value-3"))) + assert.False(t, result.MightContain([]byte("non-existent-value"))) +} + +func TestTagFilterCacheIntegration(t *testing.T) { + // This test would require setting up actual file system and tag metadata + // For now, we test the basic structure and error handling + t.Run("cache creation structure", func(t *testing.T) { + cache := &tagFilterCache{ + bloomFilter: filter.NewBloomFilter(100), + min: []byte("min_value"), + max: []byte("max_value"), + valueType: pbv1.ValueTypeStr, + } + + assert.NotNil(t, cache.bloomFilter) + assert.Equal(t, []byte("min_value"), cache.min) + assert.Equal(t, []byte("max_value"), cache.max) + assert.Equal(t, pbv1.ValueTypeStr, cache.valueType) + }) +} + +// Benchmark tests for performance verification. +func BenchmarkTagFilterOpEq(b *testing.B) { + // Setup + bm := &blockMetadata{ + tagsBlocks: map[string]dataBlock{ + "service": {offset: 0, size: 100}, + }, + } + p := &part{} + + tfo := generateTagFilterOp(bm, p) + defer releaseTagFilterOp(tfo) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + tfo.Eq("service", "order-service") + } +} + +func BenchmarkTagFilterOpRange(b *testing.B) { + // Setup + cache := &tagFilterCache{ + valueType: pbv1.ValueTypeInt64, + min: encoding.Int64ToBytes(nil, 100), + max: encoding.Int64ToBytes(nil, 500), + } + + tfo := &tagFilterOp{ + blockMetadata: &blockMetadata{ + tagsBlocks: map[string]dataBlock{ + "duration": {offset: 0, size: 100}, + }, + }, + part: &part{}, + tagCache: map[string]*tagFilterCache{ + "duration": cache, + }, + } + + rangeOpts := index.RangeOpts{ + Lower: &index.FloatTermValue{Value: 200}, + Upper: &index.FloatTermValue{Value: 300}, + IncludesLower: true, + IncludesUpper: true, + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + tfo.Range("duration", rangeOpts) + } +} + +func BenchmarkDecodeBloomFilterFromBytes(b *testing.B) { + // Setup test data + bf := filter.NewBloomFilter(1000) + bf.Add([]byte("test-value")) + + n := int64(bf.N()) + bits := bf.Bits() + + var buf bytes.Buffer + buf.Write(encoding.Int64ToBytes(nil, n)) + encodedBits := encoding.EncodeUint64Block(nil, bits) + buf.Write(encodedBits) + + testData := buf.Bytes() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + decodedBF := filter.NewBloomFilter(0) + decodeBloomFilterFromBytes(testData, decodedBF) + } +} + +func TestTagFilterOpErrorHandling(t *testing.T) { + t.Run("invalid range bounds", func(t *testing.T) { + cache := &tagFilterCache{ + valueType: pbv1.ValueTypeInt64, + min: encoding.Int64ToBytes(nil, 100), + max: encoding.Int64ToBytes(nil, 500), + } + + tfo := &tagFilterOp{ + blockMetadata: &blockMetadata{ + tagsBlocks: map[string]dataBlock{ + "duration": {offset: 0, size: 100}, + }, + }, + part: &part{}, + tagCache: map[string]*tagFilterCache{ + "duration": cache, + }, + } + + // Test with invalid lower bound type (using BytesTermValue instead of expected FloatTermValue) + _, err := tfo.Range("duration", index.RangeOpts{ + Lower: &index.BytesTermValue{Value: []byte("invalid")}, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "lower bound is not a float value") + + // Test with invalid upper bound type (using BytesTermValue instead of expected FloatTermValue) + _, err = tfo.Range("duration", index.RangeOpts{ + Upper: &index.BytesTermValue{Value: []byte("invalid")}, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "upper bound is not a float value") + }) +}