This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit dbd63c5b1268a75e05bdbeea0a7ba2b978988d15
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sun Aug 24 06:37:53 2025 +0800

    Add part_iter
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 banyand/internal/sidx/metadata.go  |  34 ++++-
 banyand/internal/sidx/part.go      |  81 +++---------
 banyand/internal/sidx/part_iter.go | 263 +++++++++++++++++++++++++++++++++++++
 banyand/internal/sidx/part_test.go |  62 +++++++--
 4 files changed, 363 insertions(+), 77 deletions(-)

diff --git a/banyand/internal/sidx/metadata.go 
b/banyand/internal/sidx/metadata.go
index 64c7d1a1..2a5a686d 100644
--- a/banyand/internal/sidx/metadata.go
+++ b/banyand/internal/sidx/metadata.go
@@ -57,9 +57,21 @@ type blockMetadata struct {
        count            uint64
 }
 
+type blockMetadataArray struct {
+       arr []blockMetadata
+}
+
+func (bma *blockMetadataArray) reset() {
+       for i := range bma.arr {
+               bma.arr[i].reset()
+       }
+       bma.arr = bma.arr[:0]
+}
+
 var (
-       partMetadataPool  = pool.Register[*partMetadata]("sidx-partMetadata")
-       blockMetadataPool = pool.Register[*blockMetadata]("sidx-blockMetadata")
+       partMetadataPool         = 
pool.Register[*partMetadata]("sidx-partMetadata")
+       blockMetadataPool        = 
pool.Register[*blockMetadata]("sidx-blockMetadata")
+       blockMetadataArrayPool   = 
pool.Register[*blockMetadataArray]("sidx-blockMetadataArray")
 )
 
 // generatePartMetadata gets partMetadata from pool or creates new.
@@ -100,6 +112,24 @@ func releaseBlockMetadata(bm *blockMetadata) {
        blockMetadataPool.Put(bm)
 }
 
+// generateBlockMetadataArray gets blockMetadataArray from pool or creates new.
+func generateBlockMetadataArray() *blockMetadataArray {
+       v := blockMetadataArrayPool.Get()
+       if v == nil {
+               return &blockMetadataArray{}
+       }
+       return v
+}
+
+// releaseBlockMetadataArray returns blockMetadataArray to pool after reset.
+func releaseBlockMetadataArray(bma *blockMetadataArray) {
+       if bma == nil {
+               return
+       }
+       bma.reset()
+       blockMetadataArrayPool.Put(bma)
+}
+
 // reset clears partMetadata for reuse in object pool.
 func (pm *partMetadata) reset() {
        pm.CompressedSizeBytes = 0
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index f4246e23..fb492bfe 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -55,16 +55,16 @@ const (
 // - <name>.tm: Tag metadata files (one per tag)
 // - <name>.tf: Tag filter files (bloom filters, one per tag).
 type part struct {
-       primary       fs.Reader
-       data          fs.Reader
-       keys          fs.Reader
-       fileSystem    fs.FileSystem
-       tagData       map[string]fs.Reader
-       tagMetadata   map[string]fs.Reader
-       tagFilters    map[string]fs.Reader
-       partMetadata  *partMetadata
-       path          string
-       blockMetadata []blockMetadata
+       primary              fs.Reader
+       data                 fs.Reader
+       keys                 fs.Reader
+       fileSystem           fs.FileSystem
+       tagData              map[string]fs.Reader
+       tagMetadata          map[string]fs.Reader
+       tagFilters           map[string]fs.Reader
+       partMetadata         *partMetadata
+       path                 string
+       primaryBlockMetadata []primaryBlockMetadata
 }
 
 // mustOpenPart opens a part from the specified path using the given file 
system.
@@ -87,10 +87,10 @@ func mustOpenPart(path string, fileSystem fs.FileSystem) 
*part {
                logger.GetLogger().Panic().Err(err).Str("path", 
path).Msg("failed to load part metadata")
        }
 
-       // Load block metadata from primary.bin.
-       if err := p.loadBlockMetadata(); err != nil {
+       // Load primary block metadata from primary.bin.
+       if err := p.loadPrimaryBlockMetadata(); err != nil {
                p.close()
-               logger.GetLogger().Panic().Err(err).Str("path", 
path).Msg("failed to load block metadata")
+               logger.GetLogger().Panic().Err(err).Str("path", 
path).Msg("failed to load primary block metadata")
        }
 
        // Discover and open tag files.
@@ -131,21 +131,10 @@ func (p *part) loadPartMetadata() error {
        return nil
 }
 
-// loadBlockMetadata reads and parses block metadata from primary.bin.
-func (p *part) loadBlockMetadata() error {
-       // Read the entire primary.bin file.
-       _, err := p.fileSystem.Read(filepath.Join(p.path, primaryFilename))
-       if err != nil {
-               return fmt.Errorf("failed to read primary.bin: %w", err)
-       }
-
-       // Parse block metadata (implementation would depend on the exact 
format).
-       // For now, we'll allocate space based on the part metadata.
-       p.blockMetadata = make([]blockMetadata, 0, p.partMetadata.BlocksCount)
-
-       // TODO: Implement actual primary.bin parsing when block format is 
defined.
-       // This is a placeholder for the structure.
-
+// loadBlockMetadata reads and parses primary block metadata from primary.bin.
+func (p *part) loadPrimaryBlockMetadata() error {
+       // Load primary block metadata from primary.bin file
+       p.primaryBlockMetadata = 
mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], p.primary)
        return nil
 }
 
@@ -254,11 +243,7 @@ func (p *part) close() {
                p.partMetadata = nil
        }
 
-       // Release block metadata.
-       for i := range p.blockMetadata {
-               releaseBlockMetadata(&p.blockMetadata[i])
-       }
-       p.blockMetadata = nil
+       // No block metadata to release since it's now passed as parameter
 }
 
 // mustOpenReader opens a file reader and panics if it fails.
@@ -283,10 +268,6 @@ func (p *part) getPartMetadata() *partMetadata {
        return p.partMetadata
 }
 
-// getBlockMetadata returns the block metadata slice.
-func (p *part) getBlockMetadata() []blockMetadata {
-       return p.blockMetadata
-}
 
 // getTagDataReader returns the tag data reader for the specified tag name.
 func (p *part) getTagDataReader(tagName string) (fs.Reader, bool) {
@@ -527,8 +508,7 @@ func openMemPart(mp *memPart) *part {
                *p.partMetadata = *mp.partMetadata
        }
 
-       // TODO: Read block metadata when blockWriter is implemented
-       // p.blockMetadata = mustReadBlockMetadata(p.blockMetadata[:0], 
&mp.primary)
+       // Block metadata is now handled via blockMetadataArray parameter
 
        // Open data files as readers from memory buffers
        p.primary = &mp.primary
@@ -554,29 +534,6 @@ func openMemPart(mp *memPart) *part {
        return p
 }
 
-// uncompressedElementSizeBytes calculates the uncompressed size of an element.
-// This is a utility function similar to the stream module.
-func uncompressedElementSizeBytes(index int, es *elements) uint64 {
-       // 8 bytes for user key
-       n := uint64(8)
-
-       // Add data payload size
-       if index < len(es.data) && es.data[index] != nil {
-               n += uint64(len(es.data[index]))
-       }
-
-       // Add tag sizes
-       if index < len(es.tags) {
-               for _, tag := range es.tags[index] {
-                       n += uint64(len(tag.name))
-                       if tag.value != nil {
-                               n += uint64(len(tag.value))
-                       }
-               }
-       }
-
-       return n
-}
 
 // partPath returns the path for a part with the given epoch.
 func partPath(root string, epoch uint64) string {
diff --git a/banyand/internal/sidx/part_iter.go 
b/banyand/internal/sidx/part_iter.go
new file mode 100644
index 00000000..abe6f09b
--- /dev/null
+++ b/banyand/internal/sidx/part_iter.go
@@ -0,0 +1,263 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "errors"
+       "fmt"
+       "io"
+       "maps"
+       "sort"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+type partIter struct {
+       err                      error
+       p                        *part
+       curBlock                 *blockMetadata
+       sids                     []common.SeriesID
+       primaryBlockMetadata     []primaryBlockMetadata
+       bms                      []blockMetadata
+       compressedPrimaryBuf     []byte
+       primaryBuf               []byte
+       sidIdx                   int
+       minKey                   int64
+       maxKey                   int64
+}
+
+func (pi *partIter) reset() {
+       pi.curBlock = nil
+       pi.p = nil
+       pi.sids = nil
+       pi.sidIdx = 0
+       pi.primaryBlockMetadata = nil
+       pi.bms = nil
+       pi.compressedPrimaryBuf = pi.compressedPrimaryBuf[:0]
+       pi.primaryBuf = pi.primaryBuf[:0]
+       pi.err = nil
+}
+
+func (pi *partIter) init(bma *blockMetadataArray, p *part, sids 
[]common.SeriesID, minKey, maxKey int64) {
+       pi.reset()
+       pi.curBlock = &blockMetadata{}
+       pi.p = p
+
+       pi.bms = bma.arr
+       pi.sids = sids
+       pi.minKey = minKey
+       pi.maxKey = maxKey
+
+       pi.primaryBlockMetadata = p.primaryBlockMetadata
+
+       pi.nextSeriesID()
+}
+
+func (pi *partIter) nextBlock() bool {
+       for {
+               if pi.err != nil {
+                       return false
+               }
+               if len(pi.bms) == 0 {
+                       if !pi.loadNextBlockMetadata() {
+                               return false
+                       }
+               }
+               if pi.findBlock() {
+                       return true
+               }
+       }
+}
+
+func (pi *partIter) error() error {
+       if errors.Is(pi.err, io.EOF) {
+               return nil
+       }
+       return pi.err
+}
+
+func (pi *partIter) nextSeriesID() bool {
+       if pi.sidIdx >= len(pi.sids) {
+               pi.err = io.EOF
+               return false
+       }
+       pi.curBlock.seriesID = pi.sids[pi.sidIdx]
+       pi.sidIdx++
+       return true
+}
+
+func (pi *partIter) searchTargetSeriesID(sid common.SeriesID) bool {
+       if pi.curBlock.seriesID >= sid {
+               return true
+       }
+       if !pi.nextSeriesID() {
+               return false
+       }
+       if pi.curBlock.seriesID >= sid {
+               return true
+       }
+       sids := pi.sids[pi.sidIdx:]
+       pi.sidIdx += sort.Search(len(sids), func(i int) bool {
+               return sid <= sids[i]
+       })
+       if pi.sidIdx >= len(pi.sids) {
+               pi.sidIdx = len(pi.sids)
+               pi.err = io.EOF
+               return false
+       }
+       pi.curBlock.seriesID = pi.sids[pi.sidIdx]
+       pi.sidIdx++
+       return true
+}
+
+func (pi *partIter) loadNextBlockMetadata() bool {
+       for len(pi.primaryBlockMetadata) > 0 {
+               if 
!pi.searchTargetSeriesID(pi.primaryBlockMetadata[0].seriesID) {
+                       return false
+               }
+               pi.primaryBlockMetadata = searchPBM(pi.primaryBlockMetadata, 
pi.curBlock.seriesID)
+
+               pbm := &pi.primaryBlockMetadata[0]
+               pi.primaryBlockMetadata = pi.primaryBlockMetadata[1:]
+               if pi.curBlock.seriesID < pbm.seriesID {
+                       logger.Panicf("invariant violation: 
pi.curBlock.seriesID cannot be smaller than pbm.seriesID; got %+v vs %+v", 
&pi.curBlock.seriesID, &pbm.seriesID)
+               }
+
+               if pbm.maxKey < pi.minKey || pbm.minKey > pi.maxKey {
+                       continue
+               }
+
+               var err error
+               pi.bms, err = pi.readPrimaryBlock(pi.bms[:0], pbm)
+               if err != nil {
+                       pi.err = fmt.Errorf("cannot read primary block for part 
%q at key range [%d, %d]: %w",
+                               pi.p.String(), pbm.minKey, pbm.maxKey, err)
+                       return false
+               }
+               return true
+       }
+       pi.err = io.EOF
+       return false
+}
+
+func searchPBM(pbmIndex []primaryBlockMetadata, sid common.SeriesID) 
[]primaryBlockMetadata {
+       if sid < pbmIndex[0].seriesID {
+               logger.Panicf("invariant violation: sid cannot be smaller than 
pbmIndex[0]; got %d vs %d", sid, &pbmIndex[0].seriesID)
+       }
+
+       if sid == pbmIndex[0].seriesID {
+               return pbmIndex
+       }
+
+       n := sort.Search(len(pbmIndex), func(i int) bool {
+               return sid <= pbmIndex[i].seriesID
+       })
+       if n == 0 {
+               logger.Panicf("invariant violation: sort.Search returned 0 for 
sid > pbmIndex[0].seriesID; sid=%+v; pbmIndex[0].seriesID=%+v",
+                       sid, &pbmIndex[0].seriesID)
+       }
+       return pbmIndex[n-1:]
+}
+
+func (pi *partIter) readPrimaryBlock(bms []blockMetadata, pbm 
*primaryBlockMetadata) ([]blockMetadata, error) {
+       pi.compressedPrimaryBuf = bytes.ResizeOver(pi.compressedPrimaryBuf, 
int(pbm.size))
+       fs.MustReadData(pi.p.primary, int64(pbm.offset), 
pi.compressedPrimaryBuf)
+
+       var err error
+       pi.primaryBuf, err = zstd.Decompress(pi.primaryBuf[:0], 
pi.compressedPrimaryBuf)
+       if err != nil {
+               return nil, fmt.Errorf("cannot decompress index block: %w", err)
+       }
+       bm, err := unmarshalBlockMetadata(pi.primaryBuf)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal index block: %w", err)
+       }
+       bms = append(bms, *bm)
+       return bms, nil
+}
+
+func (pi *partIter) findBlock() bool {
+       bhs := pi.bms
+       for len(bhs) > 0 {
+               sid := pi.curBlock.seriesID
+               if bhs[0].seriesID < sid {
+                       n := sort.Search(len(bhs), func(i int) bool {
+                               return sid <= bhs[i].seriesID
+                       })
+                       if n == len(bhs) {
+                               break
+                       }
+                       bhs = bhs[n:]
+               }
+               bm := &bhs[0]
+
+               if bm.seriesID != sid {
+                       if !pi.searchTargetSeriesID(bm.seriesID) {
+                               return false
+                       }
+                       continue
+               }
+
+               if bm.maxKey < pi.minKey {
+                       bhs = bhs[1:]
+                       continue
+               }
+
+               if bm.minKey > pi.maxKey {
+                       if !pi.nextSeriesID() {
+                               return false
+                       }
+                       continue
+               }
+
+               pi.curBlock.copyFrom(bm)
+
+               pi.bms = bhs[1:]
+               return true
+       }
+       pi.bms = nil
+       return false
+}
+
+
+
+func (bm *blockMetadata) copyFrom(other *blockMetadata) {
+       bm.seriesID = other.seriesID
+       bm.minKey = other.minKey
+       bm.maxKey = other.maxKey
+       bm.count = other.count
+       bm.uncompressedSize = other.uncompressedSize
+       bm.dataBlock = other.dataBlock
+       bm.keysBlock = other.keysBlock
+       
+       // Copy tag blocks
+       if bm.tagsBlocks == nil {
+               bm.tagsBlocks = make(map[string]dataBlock)
+       }
+       clear(bm.tagsBlocks)
+       maps.Copy(bm.tagsBlocks, other.tagsBlocks)
+       
+       // Copy tag projection
+       bm.tagProjection = bm.tagProjection[:0]
+       bm.tagProjection = append(bm.tagProjection, other.tagProjection...)
+}
+
diff --git a/banyand/internal/sidx/part_test.go 
b/banyand/internal/sidx/part_test.go
index f938f4d0..5b955b57 100644
--- a/banyand/internal/sidx/part_test.go
+++ b/banyand/internal/sidx/part_test.go
@@ -25,6 +25,7 @@ import (
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
+       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/fs"
 )
 
@@ -114,9 +115,24 @@ func TestPartLifecycleManagement(t *testing.T) {
        metaData, err := pm.marshal()
        require.NoError(t, err)
 
+       // Create valid primary block metadata
+       pbm := primaryBlockMetadata{
+               seriesID: 1,
+               minKey:   100,
+               maxKey:   999,
+               dataBlock: dataBlock{
+                       offset: 0,
+                       size:   1024,
+               },
+       }
+
+       // Marshal and compress primary block metadata
+       primaryData := pbm.marshal(nil)
+       compressedPrimaryData := zstd.Compress(nil, primaryData, 1)
+
        // Create required files
        testFiles := map[string][]byte{
-               primaryFilename: []byte("primary data"),
+               primaryFilename: compressedPrimaryData,
                dataFilename:    []byte("data content"),
                keysFilename:    []byte("keys content"),
                metaFilename:    metaData,
@@ -147,18 +163,12 @@ func TestPartLifecycleManagement(t *testing.T) {
        assert.Equal(t, int64(999), part.partMetadata.MaxKey)
        assert.Equal(t, uint64(2), part.partMetadata.BlocksCount)
 
-       // Verify block metadata was initialized
-       assert.NotNil(t, part.blockMetadata)
-       assert.Equal(t, 0, len(part.blockMetadata)) // Empty since we don't 
parse primary.bin yet
-       assert.Equal(t, 2, cap(part.blockMetadata)) // Capacity based on 
BlocksCount
-
        // Verify String method
        expectedString := fmt.Sprintf("sidx part %d at %s", pm.ID, tempDir)
        assert.Equal(t, expectedString, part.String())
 
        // Test accessors
        assert.Equal(t, part.partMetadata, part.getPartMetadata())
-       assert.Equal(t, part.blockMetadata, part.getBlockMetadata())
        assert.Equal(t, tempDir, part.Path())
 
        // Cleanup
@@ -177,8 +187,23 @@ func TestPartWithTagFiles(t *testing.T) {
        metaData, err := pm.marshal()
        require.NoError(t, err)
 
+       // Create valid primary block metadata
+       pbm := primaryBlockMetadata{
+               seriesID: 1,
+               minKey:   0,
+               maxKey:   100,
+               dataBlock: dataBlock{
+                       offset: 0,
+                       size:   512,
+               },
+       }
+
+       // Marshal and compress primary block metadata
+       primaryData := pbm.marshal(nil)
+       compressedPrimaryData := zstd.Compress(nil, primaryData, 1)
+
        requiredFiles := map[string][]byte{
-               primaryFilename: []byte("primary"),
+               primaryFilename: compressedPrimaryData,
                dataFilename:    []byte("data"),
                keysFilename:    []byte("keys"),
                metaFilename:    metaData,
@@ -334,8 +359,23 @@ func TestPartClosingBehavior(t *testing.T) {
        metaData, err := pm.marshal()
        require.NoError(t, err)
 
+       // Create valid primary block metadata
+       pbm := primaryBlockMetadata{
+               seriesID: 1,
+               minKey:   0,
+               maxKey:   100,
+               dataBlock: dataBlock{
+                       offset: 0,
+                       size:   256,
+               },
+       }
+
+       // Marshal and compress primary block metadata
+       primaryData := pbm.marshal(nil)
+       compressedPrimaryData := zstd.Compress(nil, primaryData, 1)
+
        testFiles := map[string][]byte{
-               primaryFilename: []byte("primary"),
+               primaryFilename: compressedPrimaryData,
                dataFilename:    []byte("data"),
                keysFilename:    []byte("keys"),
                metaFilename:    metaData,
@@ -370,7 +410,6 @@ func TestPartClosingBehavior(t *testing.T) {
        // Note: We can't directly test that files are closed since fs.Reader
        // doesn't expose that state, but we can verify that metadata is 
released
        assert.Nil(t, part.partMetadata)
-       assert.Nil(t, part.blockMetadata)
 
        // Test closing with defensive programming (nil check in close method)
        // The close method should handle nil pointers gracefully
@@ -410,15 +449,12 @@ func TestPartMemoryManagement(t *testing.T) {
 
                // Verify part was created correctly
                assert.NotNil(t, part.partMetadata)
-               assert.Equal(t, 0, len(part.blockMetadata))
-               assert.Equal(t, 3, cap(part.blockMetadata))
 
                // Close immediately
                part.close()
 
                // Verify cleanup
                assert.Nil(t, part.partMetadata)
-               assert.Nil(t, part.blockMetadata)
        }
 
        // Cleanup

Reply via email to