This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 688464dcb02a4a1cbd63c8d041ea8ab725ac2181
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sun Aug 24 16:07:02 2025 +0800

    Add block filtering capabilities to part iterator: Enhance the part 
iterator by introducing a block filter mechanism, allowing for conditional 
block processing based on user-defined criteria. Update tests to validate the 
behavior of the iterator with various block filter scenarios, including nil 
filters and mock filters that allow or skip blocks, ensuring robust error 
handling and expected outcomes.
---
 banyand/internal/sidx/part_iter.go      | 103 ++++++++++++++-
 banyand/internal/sidx/part_iter_test.go | 221 +++++++++++++++++++++++++++++++-
 2 files changed, 319 insertions(+), 5 deletions(-)

diff --git a/banyand/internal/sidx/part_iter.go 
b/banyand/internal/sidx/part_iter.go
index a090bae5..46285bcf 100644
--- a/banyand/internal/sidx/part_iter.go
+++ b/banyand/internal/sidx/part_iter.go
@@ -28,7 +28,9 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type partIter struct {
@@ -36,6 +38,7 @@ type partIter struct {
        p                    *part
        curBlock             *blockMetadata
        sids                 []common.SeriesID
+       blockFilter          index.Filter
        primaryBlockMetadata []primaryBlockMetadata
        bms                  []blockMetadata
        compressedPrimaryBuf []byte
@@ -49,6 +52,7 @@ func (pi *partIter) reset() {
        pi.curBlock = nil
        pi.p = nil
        pi.sids = nil
+       pi.blockFilter = nil
        pi.sidIdx = 0
        pi.primaryBlockMetadata = nil
        pi.bms = nil
@@ -57,13 +61,14 @@ func (pi *partIter) reset() {
        pi.err = nil
 }
 
-func (pi *partIter) init(bma *blockMetadataArray, p *part, sids 
[]common.SeriesID, minKey, maxKey int64) {
+func (pi *partIter) init(bma *blockMetadataArray, p *part, sids 
[]common.SeriesID, minKey, maxKey int64, blockFilter index.Filter) {
        pi.reset()
        pi.curBlock = &blockMetadata{}
        pi.p = p
 
        pi.bms = bma.arr
        pi.sids = sids
+       pi.blockFilter = blockFilter
        pi.minKey = minKey
        pi.maxKey = maxKey
 
@@ -229,6 +234,24 @@ func (pi *partIter) findBlock() bool {
                        continue
                }
 
+               if pi.blockFilter != nil {
+                       shouldSkip, err := func() (bool, error) {
+                               tfo := generateTagFilterOp(bm, pi.p)
+                               defer releaseTagFilterOp(tfo)
+                               return pi.blockFilter.ShouldSkip(tfo)
+                       }()
+                       if err != nil {
+                               pi.err = err
+                               return false
+                       }
+                       if shouldSkip {
+                               if !pi.nextSeriesID() {
+                                       return false
+                               }
+                               continue
+                       }
+               }
+
                pi.curBlock.copyFrom(bm)
 
                pi.bms = bhs[1:]
@@ -258,3 +281,81 @@ func (bm *blockMetadata) copyFrom(other *blockMetadata) {
        bm.tagProjection = bm.tagProjection[:0]
        bm.tagProjection = append(bm.tagProjection, other.tagProjection...)
 }
+
+// tagFilterOp provides a FilterOp implementation for tag filtering in sidx.
+type tagFilterOp struct {
+       blockMetadata *blockMetadata
+       part          *part
+       tagCache      map[string][]byte
+}
+
+// Eq checks if a tag equals a specific value.
+func (tfo *tagFilterOp) Eq(tagName string, tagValue string) bool {
+       // For sidx, we need to check if the tag value exists in the block's 
tags
+       // This is a simplified implementation that can be enhanced
+       if tfo.blockMetadata == nil || tfo.part == nil {
+               return false
+       }
+
+       // Check if the tag exists in the block
+       if _, exists := tfo.blockMetadata.tagsBlocks[tagName]; !exists {
+               return false
+       }
+
+       // For now, return true indicating the tag exists
+       // In a full implementation, this would check the actual tag values
+       // against bloom filters or other indexing structures
+       return true
+}
+
+// Range checks if a tag is within a specific range.
+func (tfo *tagFilterOp) Range(tagName string, rangeOpts index.RangeOpts) 
(bool, error) {
+       // For sidx, we need to check if the tag values fall within the range
+       // This is a simplified implementation that can be enhanced
+       if tfo.blockMetadata == nil || tfo.part == nil {
+               return false, nil
+       }
+
+       // Check if the tag exists in the block
+       if _, exists := tfo.blockMetadata.tagsBlocks[tagName]; !exists {
+               return false, nil
+       }
+
+       // For now, return true indicating the tag exists within range
+       // In a full implementation, this would check the actual tag value 
ranges
+       // against the min/max metadata stored in the block
+       return true, nil
+}
+
+// reset resets the tagFilterOp for reuse.
+func (tfo *tagFilterOp) reset() {
+       tfo.blockMetadata = nil
+       tfo.part = nil
+       for k := range tfo.tagCache {
+               delete(tfo.tagCache, k)
+       }
+}
+
+// generateTagFilterOp gets a tagFilterOp from pool or creates new.
+func generateTagFilterOp(bm *blockMetadata, p *part) *tagFilterOp {
+       v := tagFilterOpPool.Get()
+       if v == nil {
+               return &tagFilterOp{
+                       blockMetadata: bm,
+                       part:          p,
+                       tagCache:      make(map[string][]byte),
+               }
+       }
+       tfo := v
+       tfo.blockMetadata = bm
+       tfo.part = p
+       return tfo
+}
+
+// releaseTagFilterOp returns tagFilterOp to pool after reset.
+func releaseTagFilterOp(tfo *tagFilterOp) {
+       tfo.reset()
+       tagFilterOpPool.Put(tfo)
+}
+
+var tagFilterOpPool = pool.Register[*tagFilterOp]("sidx-tagFilterOp")
diff --git a/banyand/internal/sidx/part_iter_test.go 
b/banyand/internal/sidx/part_iter_test.go
index 95cdb9a7..0f5f6677 100644
--- a/banyand/internal/sidx/part_iter_test.go
+++ b/banyand/internal/sidx/part_iter_test.go
@@ -27,6 +27,8 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/posting"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
@@ -261,7 +263,7 @@ func TestPartIterVerification(t *testing.T) {
 
                // Initialize partIter with clean blockMetadataArray
                bma.reset() // Keep blockMetadataArray clean before passing to 
init
-               pi.init(bma, part, tt.querySids, tt.minKey, tt.maxKey)
+               pi.init(bma, part, tt.querySids, tt.minKey, tt.maxKey, nil)
 
                // Iterate through blocks and collect results
                var foundElements []testElement
@@ -396,7 +398,7 @@ func TestPartIterEdgeCases(t *testing.T) {
                pi := &partIter{}
 
                bma.reset()
-               pi.init(bma, part, []common.SeriesID{}, 0, 1000)
+               pi.init(bma, part, []common.SeriesID{}, 0, 1000, nil)
 
                // Should not find any blocks with empty series list
                foundAny := pi.nextBlock()
@@ -438,7 +440,7 @@ func TestPartIterEdgeCases(t *testing.T) {
                pi := &partIter{}
 
                bma.reset()
-               pi.init(bma, part, []common.SeriesID{1}, 200, 300) // No 
overlap with key 100
+               pi.init(bma, part, []common.SeriesID{1}, 200, 300, nil) // No 
overlap with key 100
 
                // Should not find any blocks
                foundAny := pi.nextBlock()
@@ -480,10 +482,221 @@ func TestPartIterEdgeCases(t *testing.T) {
                pi := &partIter{}
 
                bma.reset()
-               pi.init(bma, part, []common.SeriesID{2}, 0, 200) // Different 
series ID
+               pi.init(bma, part, []common.SeriesID{2}, 0, 200, nil) // 
Different series ID
 
                // Should not find any blocks
                foundAny := pi.nextBlock()
                assert.False(t, foundAny, "should not find any blocks with 
non-matching series ID")
        })
 }
+
+func TestPartIterBlockFilter(t *testing.T) {
+       tempDir := t.TempDir()
+       testFS := fs.NewLocalFileSystem()
+
+       t.Run("blockFilter nil should not filter blocks", func(t *testing.T) {
+               // Create test elements
+               elements := createTestElements([]testElement{
+                       {
+                               seriesID: 1,
+                               userKey:  100,
+                               data:     []byte("data1"),
+                               tags: []tag{
+                                       {
+                                               name:      "service",
+                                               value:     
[]byte("test-service"),
+                                               valueType: pbv1.ValueTypeStr,
+                                               indexed:   true,
+                                       },
+                               },
+                       },
+               })
+               defer releaseElements(elements)
+
+               mp := generateMemPart()
+               defer releaseMemPart(mp)
+               mp.mustInitFromElements(elements)
+
+               partDir := filepath.Join(tempDir, "nil_filter")
+               mp.mustFlush(testFS, partDir)
+
+               part := mustOpenPart(partDir, testFS)
+               defer part.close()
+
+               // Test with nil blockFilter
+               bma := &blockMetadataArray{}
+               defer bma.reset()
+               pi := &partIter{}
+
+               bma.reset()
+               pi.init(bma, part, []common.SeriesID{1}, 0, 200, nil) // nil 
blockFilter
+
+               // Should find the block
+               foundAny := pi.nextBlock()
+               assert.True(t, foundAny, "should find blocks when blockFilter 
is nil")
+               assert.NoError(t, pi.error())
+       })
+
+       t.Run("blockFilter with mock filter that allows all", func(t 
*testing.T) {
+               // Create test elements
+               elements := createTestElements([]testElement{
+                       {
+                               seriesID: 1,
+                               userKey:  100,
+                               data:     []byte("data1"),
+                               tags: []tag{
+                                       {
+                                               name:      "service",
+                                               value:     
[]byte("test-service"),
+                                               valueType: pbv1.ValueTypeStr,
+                                               indexed:   true,
+                                       },
+                               },
+                       },
+               })
+               defer releaseElements(elements)
+
+               mp := generateMemPart()
+               defer releaseMemPart(mp)
+               mp.mustInitFromElements(elements)
+
+               partDir := filepath.Join(tempDir, "allow_all_filter")
+               mp.mustFlush(testFS, partDir)
+
+               part := mustOpenPart(partDir, testFS)
+               defer part.close()
+
+               // Create a mock filter that allows all blocks
+               mockFilter := &mockBlockFilter{shouldSkip: false}
+
+               // Test with blockFilter that allows all
+               bma := &blockMetadataArray{}
+               defer bma.reset()
+               pi := &partIter{}
+
+               bma.reset()
+               pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter)
+
+               // Should find the block
+               foundAny := pi.nextBlock()
+               assert.True(t, foundAny, "should find blocks when blockFilter 
allows all")
+               assert.NoError(t, pi.error())
+       })
+
+       t.Run("blockFilter with mock filter that skips all", func(t *testing.T) 
{
+               // Create test elements
+               elements := createTestElements([]testElement{
+                       {
+                               seriesID: 1,
+                               userKey:  100,
+                               data:     []byte("data1"),
+                               tags: []tag{
+                                       {
+                                               name:      "service",
+                                               value:     
[]byte("test-service"),
+                                               valueType: pbv1.ValueTypeStr,
+                                               indexed:   true,
+                                       },
+                               },
+                       },
+               })
+               defer releaseElements(elements)
+
+               mp := generateMemPart()
+               defer releaseMemPart(mp)
+               mp.mustInitFromElements(elements)
+
+               partDir := filepath.Join(tempDir, "skip_all_filter")
+               mp.mustFlush(testFS, partDir)
+
+               part := mustOpenPart(partDir, testFS)
+               defer part.close()
+
+               // Create a mock filter that skips all blocks
+               mockFilter := &mockBlockFilter{shouldSkip: true}
+
+               // Test with blockFilter that skips all
+               bma := &blockMetadataArray{}
+               defer bma.reset()
+               pi := &partIter{}
+
+               bma.reset()
+               pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter)
+
+               // Should not find any blocks
+               foundAny := pi.nextBlock()
+               assert.False(t, foundAny, "should not find blocks when 
blockFilter skips all")
+               assert.NoError(t, pi.error())
+       })
+
+       t.Run("blockFilter with error should propagate error", func(t 
*testing.T) {
+               // Create test elements
+               elements := createTestElements([]testElement{
+                       {
+                               seriesID: 1,
+                               userKey:  100,
+                               data:     []byte("data1"),
+                               tags: []tag{
+                                       {
+                                               name:      "service",
+                                               value:     
[]byte("test-service"),
+                                               valueType: pbv1.ValueTypeStr,
+                                               indexed:   true,
+                                       },
+                               },
+                       },
+               })
+               defer releaseElements(elements)
+
+               mp := generateMemPart()
+               defer releaseMemPart(mp)
+               mp.mustInitFromElements(elements)
+
+               partDir := filepath.Join(tempDir, "error_filter")
+               mp.mustFlush(testFS, partDir)
+
+               part := mustOpenPart(partDir, testFS)
+               defer part.close()
+
+               // Create a mock filter that returns an error
+               expectedErr := fmt.Errorf("test filter error")
+               mockFilter := &mockBlockFilter{shouldSkip: false, err: 
expectedErr}
+
+               // Test with blockFilter that returns an error
+               bma := &blockMetadataArray{}
+               defer bma.reset()
+               pi := &partIter{}
+
+               bma.reset()
+               pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter)
+
+               // Should not find any blocks and should have error
+               foundAny := pi.nextBlock()
+               assert.False(t, foundAny, "should not find blocks when 
blockFilter returns error")
+               assert.Error(t, pi.error())
+               assert.Contains(t, pi.error().Error(), "test filter error")
+       })
+}
+
+// mockBlockFilter is a mock implementation of index.Filter for testing.
+type mockBlockFilter struct {
+       err        error
+       shouldSkip bool
+}
+
+func (mbf *mockBlockFilter) ShouldSkip(filterOp index.FilterOp) (bool, error) {
+       if mbf.err != nil {
+               return false, mbf.err
+       }
+       return mbf.shouldSkip, nil
+}
+
+// These methods are required to satisfy the index.Filter interface.
+func (mbf *mockBlockFilter) String() string {
+       return "mockBlockFilter"
+}
+
+func (mbf *mockBlockFilter) Execute(getSearcher index.GetSearcher, seriesID 
common.SeriesID, timeRange *index.RangeOpts) (posting.List, posting.List, 
error) {
+       // Not used in our tests, return empty implementation
+       return nil, nil, nil
+}

Reply via email to