This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit dbd63c5b1268a75e05bdbeea0a7ba2b978988d15 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sun Aug 24 06:37:53 2025 +0800 Add part_iter Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/internal/sidx/metadata.go | 34 ++++- banyand/internal/sidx/part.go | 81 +++--------- banyand/internal/sidx/part_iter.go | 263 +++++++++++++++++++++++++++++++++++++ banyand/internal/sidx/part_test.go | 62 +++++++-- 4 files changed, 363 insertions(+), 77 deletions(-) diff --git a/banyand/internal/sidx/metadata.go b/banyand/internal/sidx/metadata.go index 64c7d1a1..2a5a686d 100644 --- a/banyand/internal/sidx/metadata.go +++ b/banyand/internal/sidx/metadata.go @@ -57,9 +57,21 @@ type blockMetadata struct { count uint64 } +type blockMetadataArray struct { + arr []blockMetadata +} + +func (bma *blockMetadataArray) reset() { + for i := range bma.arr { + bma.arr[i].reset() + } + bma.arr = bma.arr[:0] +} + var ( - partMetadataPool = pool.Register[*partMetadata]("sidx-partMetadata") - blockMetadataPool = pool.Register[*blockMetadata]("sidx-blockMetadata") + partMetadataPool = pool.Register[*partMetadata]("sidx-partMetadata") + blockMetadataPool = pool.Register[*blockMetadata]("sidx-blockMetadata") + blockMetadataArrayPool = pool.Register[*blockMetadataArray]("sidx-blockMetadataArray") ) // generatePartMetadata gets partMetadata from pool or creates new. @@ -100,6 +112,24 @@ func releaseBlockMetadata(bm *blockMetadata) { blockMetadataPool.Put(bm) } +// generateBlockMetadataArray gets blockMetadataArray from pool or creates new. +func generateBlockMetadataArray() *blockMetadataArray { + v := blockMetadataArrayPool.Get() + if v == nil { + return &blockMetadataArray{} + } + return v +} + +// releaseBlockMetadataArray returns blockMetadataArray to pool after reset. +func releaseBlockMetadataArray(bma *blockMetadataArray) { + if bma == nil { + return + } + bma.reset() + blockMetadataArrayPool.Put(bma) +} + // reset clears partMetadata for reuse in object pool. func (pm *partMetadata) reset() { pm.CompressedSizeBytes = 0 diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go index f4246e23..fb492bfe 100644 --- a/banyand/internal/sidx/part.go +++ b/banyand/internal/sidx/part.go @@ -55,16 +55,16 @@ const ( // - <name>.tm: Tag metadata files (one per tag) // - <name>.tf: Tag filter files (bloom filters, one per tag). type part struct { - primary fs.Reader - data fs.Reader - keys fs.Reader - fileSystem fs.FileSystem - tagData map[string]fs.Reader - tagMetadata map[string]fs.Reader - tagFilters map[string]fs.Reader - partMetadata *partMetadata - path string - blockMetadata []blockMetadata + primary fs.Reader + data fs.Reader + keys fs.Reader + fileSystem fs.FileSystem + tagData map[string]fs.Reader + tagMetadata map[string]fs.Reader + tagFilters map[string]fs.Reader + partMetadata *partMetadata + path string + primaryBlockMetadata []primaryBlockMetadata } // mustOpenPart opens a part from the specified path using the given file system. @@ -87,10 +87,10 @@ func mustOpenPart(path string, fileSystem fs.FileSystem) *part { logger.GetLogger().Panic().Err(err).Str("path", path).Msg("failed to load part metadata") } - // Load block metadata from primary.bin. - if err := p.loadBlockMetadata(); err != nil { + // Load primary block metadata from primary.bin. + if err := p.loadPrimaryBlockMetadata(); err != nil { p.close() - logger.GetLogger().Panic().Err(err).Str("path", path).Msg("failed to load block metadata") + logger.GetLogger().Panic().Err(err).Str("path", path).Msg("failed to load primary block metadata") } // Discover and open tag files. @@ -131,21 +131,10 @@ func (p *part) loadPartMetadata() error { return nil } -// loadBlockMetadata reads and parses block metadata from primary.bin. -func (p *part) loadBlockMetadata() error { - // Read the entire primary.bin file. - _, err := p.fileSystem.Read(filepath.Join(p.path, primaryFilename)) - if err != nil { - return fmt.Errorf("failed to read primary.bin: %w", err) - } - - // Parse block metadata (implementation would depend on the exact format). - // For now, we'll allocate space based on the part metadata. - p.blockMetadata = make([]blockMetadata, 0, p.partMetadata.BlocksCount) - - // TODO: Implement actual primary.bin parsing when block format is defined. - // This is a placeholder for the structure. - +// loadBlockMetadata reads and parses primary block metadata from primary.bin. +func (p *part) loadPrimaryBlockMetadata() error { + // Load primary block metadata from primary.bin file + p.primaryBlockMetadata = mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], p.primary) return nil } @@ -254,11 +243,7 @@ func (p *part) close() { p.partMetadata = nil } - // Release block metadata. - for i := range p.blockMetadata { - releaseBlockMetadata(&p.blockMetadata[i]) - } - p.blockMetadata = nil + // No block metadata to release since it's now passed as parameter } // mustOpenReader opens a file reader and panics if it fails. @@ -283,10 +268,6 @@ func (p *part) getPartMetadata() *partMetadata { return p.partMetadata } -// getBlockMetadata returns the block metadata slice. -func (p *part) getBlockMetadata() []blockMetadata { - return p.blockMetadata -} // getTagDataReader returns the tag data reader for the specified tag name. func (p *part) getTagDataReader(tagName string) (fs.Reader, bool) { @@ -527,8 +508,7 @@ func openMemPart(mp *memPart) *part { *p.partMetadata = *mp.partMetadata } - // TODO: Read block metadata when blockWriter is implemented - // p.blockMetadata = mustReadBlockMetadata(p.blockMetadata[:0], &mp.primary) + // Block metadata is now handled via blockMetadataArray parameter // Open data files as readers from memory buffers p.primary = &mp.primary @@ -554,29 +534,6 @@ func openMemPart(mp *memPart) *part { return p } -// uncompressedElementSizeBytes calculates the uncompressed size of an element. -// This is a utility function similar to the stream module. -func uncompressedElementSizeBytes(index int, es *elements) uint64 { - // 8 bytes for user key - n := uint64(8) - - // Add data payload size - if index < len(es.data) && es.data[index] != nil { - n += uint64(len(es.data[index])) - } - - // Add tag sizes - if index < len(es.tags) { - for _, tag := range es.tags[index] { - n += uint64(len(tag.name)) - if tag.value != nil { - n += uint64(len(tag.value)) - } - } - } - - return n -} // partPath returns the path for a part with the given epoch. func partPath(root string, epoch uint64) string { diff --git a/banyand/internal/sidx/part_iter.go b/banyand/internal/sidx/part_iter.go new file mode 100644 index 00000000..abe6f09b --- /dev/null +++ b/banyand/internal/sidx/part_iter.go @@ -0,0 +1,263 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "errors" + "fmt" + "io" + "maps" + "sort" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +type partIter struct { + err error + p *part + curBlock *blockMetadata + sids []common.SeriesID + primaryBlockMetadata []primaryBlockMetadata + bms []blockMetadata + compressedPrimaryBuf []byte + primaryBuf []byte + sidIdx int + minKey int64 + maxKey int64 +} + +func (pi *partIter) reset() { + pi.curBlock = nil + pi.p = nil + pi.sids = nil + pi.sidIdx = 0 + pi.primaryBlockMetadata = nil + pi.bms = nil + pi.compressedPrimaryBuf = pi.compressedPrimaryBuf[:0] + pi.primaryBuf = pi.primaryBuf[:0] + pi.err = nil +} + +func (pi *partIter) init(bma *blockMetadataArray, p *part, sids []common.SeriesID, minKey, maxKey int64) { + pi.reset() + pi.curBlock = &blockMetadata{} + pi.p = p + + pi.bms = bma.arr + pi.sids = sids + pi.minKey = minKey + pi.maxKey = maxKey + + pi.primaryBlockMetadata = p.primaryBlockMetadata + + pi.nextSeriesID() +} + +func (pi *partIter) nextBlock() bool { + for { + if pi.err != nil { + return false + } + if len(pi.bms) == 0 { + if !pi.loadNextBlockMetadata() { + return false + } + } + if pi.findBlock() { + return true + } + } +} + +func (pi *partIter) error() error { + if errors.Is(pi.err, io.EOF) { + return nil + } + return pi.err +} + +func (pi *partIter) nextSeriesID() bool { + if pi.sidIdx >= len(pi.sids) { + pi.err = io.EOF + return false + } + pi.curBlock.seriesID = pi.sids[pi.sidIdx] + pi.sidIdx++ + return true +} + +func (pi *partIter) searchTargetSeriesID(sid common.SeriesID) bool { + if pi.curBlock.seriesID >= sid { + return true + } + if !pi.nextSeriesID() { + return false + } + if pi.curBlock.seriesID >= sid { + return true + } + sids := pi.sids[pi.sidIdx:] + pi.sidIdx += sort.Search(len(sids), func(i int) bool { + return sid <= sids[i] + }) + if pi.sidIdx >= len(pi.sids) { + pi.sidIdx = len(pi.sids) + pi.err = io.EOF + return false + } + pi.curBlock.seriesID = pi.sids[pi.sidIdx] + pi.sidIdx++ + return true +} + +func (pi *partIter) loadNextBlockMetadata() bool { + for len(pi.primaryBlockMetadata) > 0 { + if !pi.searchTargetSeriesID(pi.primaryBlockMetadata[0].seriesID) { + return false + } + pi.primaryBlockMetadata = searchPBM(pi.primaryBlockMetadata, pi.curBlock.seriesID) + + pbm := &pi.primaryBlockMetadata[0] + pi.primaryBlockMetadata = pi.primaryBlockMetadata[1:] + if pi.curBlock.seriesID < pbm.seriesID { + logger.Panicf("invariant violation: pi.curBlock.seriesID cannot be smaller than pbm.seriesID; got %+v vs %+v", &pi.curBlock.seriesID, &pbm.seriesID) + } + + if pbm.maxKey < pi.minKey || pbm.minKey > pi.maxKey { + continue + } + + var err error + pi.bms, err = pi.readPrimaryBlock(pi.bms[:0], pbm) + if err != nil { + pi.err = fmt.Errorf("cannot read primary block for part %q at key range [%d, %d]: %w", + pi.p.String(), pbm.minKey, pbm.maxKey, err) + return false + } + return true + } + pi.err = io.EOF + return false +} + +func searchPBM(pbmIndex []primaryBlockMetadata, sid common.SeriesID) []primaryBlockMetadata { + if sid < pbmIndex[0].seriesID { + logger.Panicf("invariant violation: sid cannot be smaller than pbmIndex[0]; got %d vs %d", sid, &pbmIndex[0].seriesID) + } + + if sid == pbmIndex[0].seriesID { + return pbmIndex + } + + n := sort.Search(len(pbmIndex), func(i int) bool { + return sid <= pbmIndex[i].seriesID + }) + if n == 0 { + logger.Panicf("invariant violation: sort.Search returned 0 for sid > pbmIndex[0].seriesID; sid=%+v; pbmIndex[0].seriesID=%+v", + sid, &pbmIndex[0].seriesID) + } + return pbmIndex[n-1:] +} + +func (pi *partIter) readPrimaryBlock(bms []blockMetadata, pbm *primaryBlockMetadata) ([]blockMetadata, error) { + pi.compressedPrimaryBuf = bytes.ResizeOver(pi.compressedPrimaryBuf, int(pbm.size)) + fs.MustReadData(pi.p.primary, int64(pbm.offset), pi.compressedPrimaryBuf) + + var err error + pi.primaryBuf, err = zstd.Decompress(pi.primaryBuf[:0], pi.compressedPrimaryBuf) + if err != nil { + return nil, fmt.Errorf("cannot decompress index block: %w", err) + } + bm, err := unmarshalBlockMetadata(pi.primaryBuf) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal index block: %w", err) + } + bms = append(bms, *bm) + return bms, nil +} + +func (pi *partIter) findBlock() bool { + bhs := pi.bms + for len(bhs) > 0 { + sid := pi.curBlock.seriesID + if bhs[0].seriesID < sid { + n := sort.Search(len(bhs), func(i int) bool { + return sid <= bhs[i].seriesID + }) + if n == len(bhs) { + break + } + bhs = bhs[n:] + } + bm := &bhs[0] + + if bm.seriesID != sid { + if !pi.searchTargetSeriesID(bm.seriesID) { + return false + } + continue + } + + if bm.maxKey < pi.minKey { + bhs = bhs[1:] + continue + } + + if bm.minKey > pi.maxKey { + if !pi.nextSeriesID() { + return false + } + continue + } + + pi.curBlock.copyFrom(bm) + + pi.bms = bhs[1:] + return true + } + pi.bms = nil + return false +} + + + +func (bm *blockMetadata) copyFrom(other *blockMetadata) { + bm.seriesID = other.seriesID + bm.minKey = other.minKey + bm.maxKey = other.maxKey + bm.count = other.count + bm.uncompressedSize = other.uncompressedSize + bm.dataBlock = other.dataBlock + bm.keysBlock = other.keysBlock + + // Copy tag blocks + if bm.tagsBlocks == nil { + bm.tagsBlocks = make(map[string]dataBlock) + } + clear(bm.tagsBlocks) + maps.Copy(bm.tagsBlocks, other.tagsBlocks) + + // Copy tag projection + bm.tagProjection = bm.tagProjection[:0] + bm.tagProjection = append(bm.tagProjection, other.tagProjection...) +} + diff --git a/banyand/internal/sidx/part_test.go b/banyand/internal/sidx/part_test.go index f938f4d0..5b955b57 100644 --- a/banyand/internal/sidx/part_test.go +++ b/banyand/internal/sidx/part_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/fs" ) @@ -114,9 +115,24 @@ func TestPartLifecycleManagement(t *testing.T) { metaData, err := pm.marshal() require.NoError(t, err) + // Create valid primary block metadata + pbm := primaryBlockMetadata{ + seriesID: 1, + minKey: 100, + maxKey: 999, + dataBlock: dataBlock{ + offset: 0, + size: 1024, + }, + } + + // Marshal and compress primary block metadata + primaryData := pbm.marshal(nil) + compressedPrimaryData := zstd.Compress(nil, primaryData, 1) + // Create required files testFiles := map[string][]byte{ - primaryFilename: []byte("primary data"), + primaryFilename: compressedPrimaryData, dataFilename: []byte("data content"), keysFilename: []byte("keys content"), metaFilename: metaData, @@ -147,18 +163,12 @@ func TestPartLifecycleManagement(t *testing.T) { assert.Equal(t, int64(999), part.partMetadata.MaxKey) assert.Equal(t, uint64(2), part.partMetadata.BlocksCount) - // Verify block metadata was initialized - assert.NotNil(t, part.blockMetadata) - assert.Equal(t, 0, len(part.blockMetadata)) // Empty since we don't parse primary.bin yet - assert.Equal(t, 2, cap(part.blockMetadata)) // Capacity based on BlocksCount - // Verify String method expectedString := fmt.Sprintf("sidx part %d at %s", pm.ID, tempDir) assert.Equal(t, expectedString, part.String()) // Test accessors assert.Equal(t, part.partMetadata, part.getPartMetadata()) - assert.Equal(t, part.blockMetadata, part.getBlockMetadata()) assert.Equal(t, tempDir, part.Path()) // Cleanup @@ -177,8 +187,23 @@ func TestPartWithTagFiles(t *testing.T) { metaData, err := pm.marshal() require.NoError(t, err) + // Create valid primary block metadata + pbm := primaryBlockMetadata{ + seriesID: 1, + minKey: 0, + maxKey: 100, + dataBlock: dataBlock{ + offset: 0, + size: 512, + }, + } + + // Marshal and compress primary block metadata + primaryData := pbm.marshal(nil) + compressedPrimaryData := zstd.Compress(nil, primaryData, 1) + requiredFiles := map[string][]byte{ - primaryFilename: []byte("primary"), + primaryFilename: compressedPrimaryData, dataFilename: []byte("data"), keysFilename: []byte("keys"), metaFilename: metaData, @@ -334,8 +359,23 @@ func TestPartClosingBehavior(t *testing.T) { metaData, err := pm.marshal() require.NoError(t, err) + // Create valid primary block metadata + pbm := primaryBlockMetadata{ + seriesID: 1, + minKey: 0, + maxKey: 100, + dataBlock: dataBlock{ + offset: 0, + size: 256, + }, + } + + // Marshal and compress primary block metadata + primaryData := pbm.marshal(nil) + compressedPrimaryData := zstd.Compress(nil, primaryData, 1) + testFiles := map[string][]byte{ - primaryFilename: []byte("primary"), + primaryFilename: compressedPrimaryData, dataFilename: []byte("data"), keysFilename: []byte("keys"), metaFilename: metaData, @@ -370,7 +410,6 @@ func TestPartClosingBehavior(t *testing.T) { // Note: We can't directly test that files are closed since fs.Reader // doesn't expose that state, but we can verify that metadata is released assert.Nil(t, part.partMetadata) - assert.Nil(t, part.blockMetadata) // Test closing with defensive programming (nil check in close method) // The close method should handle nil pointers gracefully @@ -410,15 +449,12 @@ func TestPartMemoryManagement(t *testing.T) { // Verify part was created correctly assert.NotNil(t, part.partMetadata) - assert.Equal(t, 0, len(part.blockMetadata)) - assert.Equal(t, 3, cap(part.blockMetadata)) // Close immediately part.close() // Verify cleanup assert.Nil(t, part.partMetadata) - assert.Nil(t, part.blockMetadata) } // Cleanup