This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 688464dcb02a4a1cbd63c8d041ea8ab725ac2181 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sun Aug 24 16:07:02 2025 +0800 Add block filtering capabilities to part iterator: Enhance the part iterator by introducing a block filter mechanism, allowing for conditional block processing based on user-defined criteria. Update tests to validate the behavior of the iterator with various block filter scenarios, including nil filters and mock filters that allow or skip blocks, ensuring robust error handling and expected outcomes. --- banyand/internal/sidx/part_iter.go | 103 ++++++++++++++- banyand/internal/sidx/part_iter_test.go | 221 +++++++++++++++++++++++++++++++- 2 files changed, 319 insertions(+), 5 deletions(-) diff --git a/banyand/internal/sidx/part_iter.go b/banyand/internal/sidx/part_iter.go index a090bae5..46285bcf 100644 --- a/banyand/internal/sidx/part_iter.go +++ b/banyand/internal/sidx/part_iter.go @@ -28,7 +28,9 @@ import ( "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type partIter struct { @@ -36,6 +38,7 @@ type partIter struct { p *part curBlock *blockMetadata sids []common.SeriesID + blockFilter index.Filter primaryBlockMetadata []primaryBlockMetadata bms []blockMetadata compressedPrimaryBuf []byte @@ -49,6 +52,7 @@ func (pi *partIter) reset() { pi.curBlock = nil pi.p = nil pi.sids = nil + pi.blockFilter = nil pi.sidIdx = 0 pi.primaryBlockMetadata = nil pi.bms = nil @@ -57,13 +61,14 @@ func (pi *partIter) reset() { pi.err = nil } -func (pi *partIter) init(bma *blockMetadataArray, p *part, sids []common.SeriesID, minKey, maxKey int64) { +func (pi *partIter) init(bma *blockMetadataArray, p *part, sids []common.SeriesID, minKey, maxKey int64, blockFilter index.Filter) { pi.reset() pi.curBlock = &blockMetadata{} pi.p = p pi.bms = bma.arr pi.sids = sids + pi.blockFilter = blockFilter pi.minKey = minKey pi.maxKey = maxKey @@ -229,6 +234,24 @@ func (pi *partIter) findBlock() bool { continue } + if pi.blockFilter != nil { + shouldSkip, err := func() (bool, error) { + tfo := generateTagFilterOp(bm, pi.p) + defer releaseTagFilterOp(tfo) + return pi.blockFilter.ShouldSkip(tfo) + }() + if err != nil { + pi.err = err + return false + } + if shouldSkip { + if !pi.nextSeriesID() { + return false + } + continue + } + } + pi.curBlock.copyFrom(bm) pi.bms = bhs[1:] @@ -258,3 +281,81 @@ func (bm *blockMetadata) copyFrom(other *blockMetadata) { bm.tagProjection = bm.tagProjection[:0] bm.tagProjection = append(bm.tagProjection, other.tagProjection...) } + +// tagFilterOp provides a FilterOp implementation for tag filtering in sidx. +type tagFilterOp struct { + blockMetadata *blockMetadata + part *part + tagCache map[string][]byte +} + +// Eq checks if a tag equals a specific value. +func (tfo *tagFilterOp) Eq(tagName string, tagValue string) bool { + // For sidx, we need to check if the tag value exists in the block's tags + // This is a simplified implementation that can be enhanced + if tfo.blockMetadata == nil || tfo.part == nil { + return false + } + + // Check if the tag exists in the block + if _, exists := tfo.blockMetadata.tagsBlocks[tagName]; !exists { + return false + } + + // For now, return true indicating the tag exists + // In a full implementation, this would check the actual tag values + // against bloom filters or other indexing structures + return true +} + +// Range checks if a tag is within a specific range. +func (tfo *tagFilterOp) Range(tagName string, rangeOpts index.RangeOpts) (bool, error) { + // For sidx, we need to check if the tag values fall within the range + // This is a simplified implementation that can be enhanced + if tfo.blockMetadata == nil || tfo.part == nil { + return false, nil + } + + // Check if the tag exists in the block + if _, exists := tfo.blockMetadata.tagsBlocks[tagName]; !exists { + return false, nil + } + + // For now, return true indicating the tag exists within range + // In a full implementation, this would check the actual tag value ranges + // against the min/max metadata stored in the block + return true, nil +} + +// reset resets the tagFilterOp for reuse. +func (tfo *tagFilterOp) reset() { + tfo.blockMetadata = nil + tfo.part = nil + for k := range tfo.tagCache { + delete(tfo.tagCache, k) + } +} + +// generateTagFilterOp gets a tagFilterOp from pool or creates new. +func generateTagFilterOp(bm *blockMetadata, p *part) *tagFilterOp { + v := tagFilterOpPool.Get() + if v == nil { + return &tagFilterOp{ + blockMetadata: bm, + part: p, + tagCache: make(map[string][]byte), + } + } + tfo := v + tfo.blockMetadata = bm + tfo.part = p + return tfo +} + +// releaseTagFilterOp returns tagFilterOp to pool after reset. +func releaseTagFilterOp(tfo *tagFilterOp) { + tfo.reset() + tagFilterOpPool.Put(tfo) +} + +var tagFilterOpPool = pool.Register[*tagFilterOp]("sidx-tagFilterOp") diff --git a/banyand/internal/sidx/part_iter_test.go b/banyand/internal/sidx/part_iter_test.go index 95cdb9a7..0f5f6677 100644 --- a/banyand/internal/sidx/part_iter_test.go +++ b/banyand/internal/sidx/part_iter_test.go @@ -27,6 +27,8 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/posting" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) @@ -261,7 +263,7 @@ func TestPartIterVerification(t *testing.T) { // Initialize partIter with clean blockMetadataArray bma.reset() // Keep blockMetadataArray clean before passing to init - pi.init(bma, part, tt.querySids, tt.minKey, tt.maxKey) + pi.init(bma, part, tt.querySids, tt.minKey, tt.maxKey, nil) // Iterate through blocks and collect results var foundElements []testElement @@ -396,7 +398,7 @@ func TestPartIterEdgeCases(t *testing.T) { pi := &partIter{} bma.reset() - pi.init(bma, part, []common.SeriesID{}, 0, 1000) + pi.init(bma, part, []common.SeriesID{}, 0, 1000, nil) // Should not find any blocks with empty series list foundAny := pi.nextBlock() @@ -438,7 +440,7 @@ func TestPartIterEdgeCases(t *testing.T) { pi := &partIter{} bma.reset() - pi.init(bma, part, []common.SeriesID{1}, 200, 300) // No overlap with key 100 + pi.init(bma, part, []common.SeriesID{1}, 200, 300, nil) // No overlap with key 100 // Should not find any blocks foundAny := pi.nextBlock() @@ -480,10 +482,221 @@ func TestPartIterEdgeCases(t *testing.T) { pi := &partIter{} bma.reset() - pi.init(bma, part, []common.SeriesID{2}, 0, 200) // Different series ID + pi.init(bma, part, []common.SeriesID{2}, 0, 200, nil) // Different series ID // Should not find any blocks foundAny := pi.nextBlock() assert.False(t, foundAny, "should not find any blocks with non-matching series ID") }) } + +func TestPartIterBlockFilter(t *testing.T) { + tempDir := t.TempDir() + testFS := fs.NewLocalFileSystem() + + t.Run("blockFilter nil should not filter blocks", func(t *testing.T) { + // Create test elements + elements := createTestElements([]testElement{ + { + seriesID: 1, + userKey: 100, + data: []byte("data1"), + tags: []tag{ + { + name: "service", + value: []byte("test-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + }, + }, + }) + defer releaseElements(elements) + + mp := generateMemPart() + defer releaseMemPart(mp) + mp.mustInitFromElements(elements) + + partDir := filepath.Join(tempDir, "nil_filter") + mp.mustFlush(testFS, partDir) + + part := mustOpenPart(partDir, testFS) + defer part.close() + + // Test with nil blockFilter + bma := &blockMetadataArray{} + defer bma.reset() + pi := &partIter{} + + bma.reset() + pi.init(bma, part, []common.SeriesID{1}, 0, 200, nil) // nil blockFilter + + // Should find the block + foundAny := pi.nextBlock() + assert.True(t, foundAny, "should find blocks when blockFilter is nil") + assert.NoError(t, pi.error()) + }) + + t.Run("blockFilter with mock filter that allows all", func(t *testing.T) { + // Create test elements + elements := createTestElements([]testElement{ + { + seriesID: 1, + userKey: 100, + data: []byte("data1"), + tags: []tag{ + { + name: "service", + value: []byte("test-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + }, + }, + }) + defer releaseElements(elements) + + mp := generateMemPart() + defer releaseMemPart(mp) + mp.mustInitFromElements(elements) + + partDir := filepath.Join(tempDir, "allow_all_filter") + mp.mustFlush(testFS, partDir) + + part := mustOpenPart(partDir, testFS) + defer part.close() + + // Create a mock filter that allows all blocks + mockFilter := &mockBlockFilter{shouldSkip: false} + + // Test with blockFilter that allows all + bma := &blockMetadataArray{} + defer bma.reset() + pi := &partIter{} + + bma.reset() + pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter) + + // Should find the block + foundAny := pi.nextBlock() + assert.True(t, foundAny, "should find blocks when blockFilter allows all") + assert.NoError(t, pi.error()) + }) + + t.Run("blockFilter with mock filter that skips all", func(t *testing.T) { + // Create test elements + elements := createTestElements([]testElement{ + { + seriesID: 1, + userKey: 100, + data: []byte("data1"), + tags: []tag{ + { + name: "service", + value: []byte("test-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + }, + }, + }) + defer releaseElements(elements) + + mp := generateMemPart() + defer releaseMemPart(mp) + mp.mustInitFromElements(elements) + + partDir := filepath.Join(tempDir, "skip_all_filter") + mp.mustFlush(testFS, partDir) + + part := mustOpenPart(partDir, testFS) + defer part.close() + + // Create a mock filter that skips all blocks + mockFilter := &mockBlockFilter{shouldSkip: true} + + // Test with blockFilter that skips all + bma := &blockMetadataArray{} + defer bma.reset() + pi := &partIter{} + + bma.reset() + pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter) + + // Should not find any blocks + foundAny := pi.nextBlock() + assert.False(t, foundAny, "should not find blocks when blockFilter skips all") + assert.NoError(t, pi.error()) + }) + + t.Run("blockFilter with error should propagate error", func(t *testing.T) { + // Create test elements + elements := createTestElements([]testElement{ + { + seriesID: 1, + userKey: 100, + data: []byte("data1"), + tags: []tag{ + { + name: "service", + value: []byte("test-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + }, + }, + }) + defer releaseElements(elements) + + mp := generateMemPart() + defer releaseMemPart(mp) + mp.mustInitFromElements(elements) + + partDir := filepath.Join(tempDir, "error_filter") + mp.mustFlush(testFS, partDir) + + part := mustOpenPart(partDir, testFS) + defer part.close() + + // Create a mock filter that returns an error + expectedErr := fmt.Errorf("test filter error") + mockFilter := &mockBlockFilter{shouldSkip: false, err: expectedErr} + + // Test with blockFilter that returns an error + bma := &blockMetadataArray{} + defer bma.reset() + pi := &partIter{} + + bma.reset() + pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter) + + // Should not find any blocks and should have error + foundAny := pi.nextBlock() + assert.False(t, foundAny, "should not find blocks when blockFilter returns error") + assert.Error(t, pi.error()) + assert.Contains(t, pi.error().Error(), "test filter error") + }) +} + +// mockBlockFilter is a mock implementation of index.Filter for testing. +type mockBlockFilter struct { + err error + shouldSkip bool +} + +func (mbf *mockBlockFilter) ShouldSkip(filterOp index.FilterOp) (bool, error) { + if mbf.err != nil { + return false, mbf.err + } + return mbf.shouldSkip, nil +} + +// These methods are required to satisfy the index.Filter interface. +func (mbf *mockBlockFilter) String() string { + return "mockBlockFilter" +} + +func (mbf *mockBlockFilter) Execute(getSearcher index.GetSearcher, seriesID common.SeriesID, timeRange *index.RangeOpts) (posting.List, posting.List, error) { + // Not used in our tests, return empty implementation + return nil, nil, nil +}