This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/element in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 6ce707be3d85689bbaebdab5d0e9286ff560ea3a Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sun Aug 17 00:48:09 2025 +0800 Refactor block metadata structures and update validation tests - Replaced `primaryBlockMetadata` with `blockMetadata` to streamline metadata handling. - Enhanced `blockMetadata` to include additional fields for uncompressed size and count. - Revised TODO.md to reflect the completion of block metadata design and testing. --- banyand/internal/sidx/DESIGN.md | 1038 +++++++++++++++++++++++++++++++- banyand/internal/sidx/TODO.md | 138 +++-- banyand/internal/sidx/metadata.go | 210 +++---- banyand/internal/sidx/metadata_test.go | 130 ++-- 4 files changed, 1287 insertions(+), 229 deletions(-) diff --git a/banyand/internal/sidx/DESIGN.md b/banyand/internal/sidx/DESIGN.md index cc4f28cc..22816e7a 100644 --- a/banyand/internal/sidx/DESIGN.md +++ b/banyand/internal/sidx/DESIGN.md @@ -125,12 +125,12 @@ type part struct { tagFilters map[string]fs.Reader // tag_<name>.tf files // Cached metadata for performance - primaryBlockMetadata []primaryBlockMetadata - partMetadata partMetadata + blockMetadata []blockMetadata + partMetadata partMetadata } -// Enhanced metadata with integrity and versioning +// Enhanced metadata with integrity type partMetadata struct { // Size information CompressedSizeBytes uint64 @@ -312,7 +312,7 @@ type tagMetadata struct { ##### Standardized File Format **Core Files (per part):** -- `primary.bin` - Block metadata with version headers +- `primary.bin` - Block metadata - `data.bin` - User data payloads with compression - `keys.bin` - User-provided int64 ordering keys - `meta.bin` - Part metadata @@ -324,8 +324,7 @@ type tagMetadata struct { - `tag_<name>.tf` - Bloom filters for fast lookups **File Format Features:** -- **Version Control**: Format versioning for backward compatibility -- **Version Control**: Format versioning for backward compatibility +- **No Version Control**: Do not support format versioning - **Format Validation**: File format and structure verification - **Atomic Updates**: Transactional multi-file operations - **Compression Support**: Configurable compression algorithms @@ -351,20 +350,19 @@ sidx separates user data payloads from metadata to enable efficient querying and ##### Data Block Structure ```go -// Metadata for a block of user data in data.bin -type dataBlockMetadata struct { - offset uint64 // Offset in data.bin file - size uint64 // Compressed size in bytes -} - -// Enhanced primary block metadata with data references -type primaryBlockMetadata struct { +// Enhanced block metadata with data references +type blockMetadata struct { seriesID common.SeriesID minKey int64 // Minimum user key in block maxKey int64 // Maximum user key in block - dataBlock dataBlockMetadata // Reference to data in data.bin - keysBlock dataBlockMetadata // Reference to keys in keys.bin - tagsBlocks map[string]dataBlockMetadata // References to tag files + dataBlock dataBlock // Reference to data in data.bin + keysBlock dataBlock // Reference to keys in keys.bin + tagsBlocks map[string]dataBlock // References to tag files + + // Additional metadata for query processing + tagProjection []string // Tags to load for queries + uncompressedSize uint64 // Uncompressed size of block + count uint64 // Number of elements in block } ``` @@ -375,7 +373,1007 @@ type primaryBlockMetadata struct { - **Clean Separation**: Metadata operations don't require data I/O - **Better Cache Utilization**: Metadata fits better in memory caches -#### 5. Snapshot Management +#### 5. Block Architecture + +The block design in sidx provides a comprehensive system for organizing, reading, writing, and scanning data blocks within parts. This architecture is inspired by the stream module but adapted for user-provided int64 keys instead of timestamps. + +##### Block Architecture Overview + +Blocks are the fundamental units of data organization within parts, providing: +- **Efficient Storage**: Elements are organized into blocks for optimal compression and access +- **Key-based Organization**: Sorted by seriesID first, then by user-provided int64 keys +- **Memory Management**: Object pooling and reference counting for production use +- **I/O Optimization**: Separate files for different data types enable selective loading + +##### Core Components + +###### A. Block Structure (`block`) + +The core block structure organizes elements for efficient storage and retrieval: + +```go +// block represents a collection of elements organized for storage +type block struct { + // Core data arrays (all same length) + userKeys []int64 // User-provided ordering keys + elementIDs []uint64 // Unique element identifiers + data [][]byte // User payload data + + // Tag data organized by tag name + tags map[string]*tagData // Runtime tag data with filtering + + // Internal state + pooled bool // Whether this block came from pool +} + +// Pool management for memory efficiency +var blockPool = pool.Register[*block]("sidx-block") + +func generateBlock() *block { + v := blockPool.Get() + if v == nil { + return &block{ + tags: make(map[string]*tagData), + } + } + return v +} + +func releaseBlock(b *block) { + // Release tag filters back to pool + for _, tag := range b.tags { + if tag.filter != nil { + releaseBloomFilter(tag.filter) + } + releaseTagData(tag) + } + b.reset() + blockPool.Put(b) +} + +// reset clears block for reuse in object pool +func (b *block) reset() { + b.userKeys = b.userKeys[:0] + b.elementIDs = b.elementIDs[:0] + + // Reset data slices if not too large + if cap(b.data) <= maxPooledSliceCount { + for i := range b.data { + if cap(b.data[i]) <= maxPooledSliceSize { + b.data[i] = b.data[i][:0] // Reuse slice + } + } + b.data = b.data[:0] + } else { + b.data = nil // Release oversized slice + } + + // Clear tag map but keep the map itself + for k := range b.tags { + delete(b.tags, k) + } + + b.pooled = false +} + +// mustInitFromElements initializes block from sorted elements +func (b *block) mustInitFromElements(elements *elements) { + b.reset() + if elements.len() == 0 { + return + } + + // Verify elements are sorted + elements.assertSorted() + + // Copy core data + b.userKeys = append(b.userKeys, elements.userKeys...) + b.elementIDs = append(b.elementIDs, elements.elementIDs...) + b.data = append(b.data, elements.data...) + + // Process tags + b.mustInitFromTags(elements.tags) +} + +// mustInitFromTags processes tag data for the block +func (b *block) mustInitFromTags(elementTags [][]tag) { + if len(elementTags) == 0 { + return + } + + // Collect all unique tag names + tagNames := make(map[string]struct{}) + for _, tags := range elementTags { + for _, tag := range tags { + tagNames[tag.name] = struct{}{} + } + } + + // Process each tag + for tagName := range tagNames { + b.processTag(tagName, elementTags) + } +} + +// processTag creates tag data structure for a specific tag +func (b *block) processTag(tagName string, elementTags [][]tag) { + td := generateTagData() + td.name = tagName + td.values = make([][]byte, len(b.userKeys)) + + var valueType pbv1.ValueType + var indexed bool + + // Collect values for this tag across all elements + for i, tags := range elementTags { + found := false + for _, tag := range tags { + if tag.name == tagName { + td.values[i] = tag.value + valueType = tag.valueType + indexed = tag.indexed + found = true + break + } + } + if !found { + td.values[i] = nil // Missing tag value + } + } + + td.valueType = valueType + td.indexed = indexed + + // Create bloom filter for indexed tags + if indexed { + td.filter = generateBloomFilter() + td.filter.SetN(len(b.userKeys)) + td.filter.ResizeBits((len(b.userKeys)*filter.B + 63) / 64) + + for _, value := range td.values { + if value != nil { + td.filter.Add(value) + } + } + } + + // Track min/max for int64 tags + if valueType == pbv1.ValueTypeInt64 { + for _, value := range td.values { + if value == nil { + continue + } + if len(td.min) == 0 || bytes.Compare(value, td.min) < 0 { + td.min = value + } + if len(td.max) == 0 || bytes.Compare(value, td.max) > 0 { + td.max = value + } + } + } + + b.tags[tagName] = td +} + +// validate ensures block data consistency +func (b *block) validate() error { + count := len(b.userKeys) + if count != len(b.elementIDs) || count != len(b.data) { + return fmt.Errorf("inconsistent block arrays: keys=%d, ids=%d, data=%d", + len(b.userKeys), len(b.elementIDs), len(b.data)) + } + + // Verify sorting + for i := 1; i < count; i++ { + if b.userKeys[i] < b.userKeys[i-1] { + return fmt.Errorf("block not sorted by userKey at index %d", i) + } + } + + // Verify tag consistency + for tagName, tagData := range b.tags { + if len(tagData.values) != count { + return fmt.Errorf("tag %s has %d values but block has %d elements", + tagName, len(tagData.values), count) + } + } + + return nil +} + +// uncompressedSizeBytes calculates the uncompressed size of the block +func (b *block) uncompressedSizeBytes() uint64 { + count := uint64(len(b.userKeys)) + size := count * (8 + 8) // userKey + elementID + + // Add data payload sizes + for _, payload := range b.data { + size += uint64(len(payload)) + } + + // Add tag data sizes + for tagName, tagData := range b.tags { + nameSize := uint64(len(tagName)) + for _, value := range tagData.values { + if value != nil { + size += nameSize + uint64(len(value)) + } + } + } + + return size +} +``` + +###### B. Block Metadata (`blockMetadata`) + +Block metadata provides references to data stored in files: + +```go +// blockMetadata contains metadata for a block within a part +type blockMetadata struct { + // Block references to files + tagsBlocks map[string]dataBlock // References to tag files + dataBlock dataBlock // Reference to data in data.bin + keysBlock dataBlock // Reference to keys in keys.bin + + // Block identification + seriesID common.SeriesID + + // Key range within block + minKey int64 // Minimum user key in block + maxKey int64 // Maximum user key in block + + // Additional metadata for query processing + tagProjection []string // Tags to load for queries + uncompressedSize uint64 // Uncompressed size of block + count uint64 // Number of elements in block +} + +// copyFrom creates a deep copy of block metadata +func (bm *blockMetadata) copyFrom(src *blockMetadata) { + bm.seriesID = src.seriesID + bm.minKey = src.minKey + bm.maxKey = src.maxKey + bm.dataBlock = src.dataBlock + bm.keysBlock = src.keysBlock + bm.uncompressedSize = src.uncompressedSize + bm.count = src.count + + // Deep copy tag blocks + if bm.tagsBlocks == nil { + bm.tagsBlocks = make(map[string]dataBlock) + } + for k, v := range src.tagsBlocks { + bm.tagsBlocks[k] = v + } + + // Deep copy tag projection + bm.tagProjection = make([]string, len(src.tagProjection)) + copy(bm.tagProjection, src.tagProjection) +} + +// reset clears blockMetadata for reuse in object pool +func (bm *blockMetadata) reset() { + bm.seriesID = 0 + bm.minKey = 0 + bm.maxKey = 0 + bm.dataBlock = dataBlock{} + bm.keysBlock = dataBlock{} + bm.uncompressedSize = 0 + bm.count = 0 + + // Clear maps but keep them allocated + for k := range bm.tagsBlocks { + delete(bm.tagsBlocks, k) + } + bm.tagProjection = bm.tagProjection[:0] +} + +// overlapsKeyRange checks if block overlaps with query key range +func (bm *blockMetadata) overlapsKeyRange(minKey, maxKey int64) bool { + return !(maxKey < bm.minKey || minKey > bm.maxKey) +} + +// overlapsSeriesID checks if block contains the specified series +func (bm *blockMetadata) overlapsSeriesID(seriesID common.SeriesID) bool { + return bm.seriesID == seriesID +} + +var blockMetadataPool = pool.Register[*blockMetadata]("sidx-blockMetadata") + +func generateBlockMetadata() *blockMetadata { + v := blockMetadataPool.Get() + if v == nil { + return &blockMetadata{ + tagsBlocks: make(map[string]dataBlock), + } + } + return v +} + +func releaseBlockMetadata(bm *blockMetadata) { + bm.reset() + blockMetadataPool.Put(bm) +} +``` + +###### C. Block Reader (`block_reader`) + +The block reader provides efficient reading of blocks from disk: + +```go +// blockReader reads blocks from parts with memory optimization +type blockReader struct { + // File readers + dataReader fs.Reader // Reads from data.bin + keysReader fs.Reader // Reads from keys.bin + tagReaders map[string]fs.Reader // Reads from tag_*.td files + + // Decoders for efficient processing + decoder *encoding.BytesBlockDecoder + + // Pool management + pooled bool +} + +var blockReaderPool = pool.Register[*blockReader]("sidx-blockReader") + +func generateBlockReader() *blockReader { + v := blockReaderPool.Get() + if v == nil { + return &blockReader{ + tagReaders: make(map[string]fs.Reader), + decoder: &encoding.BytesBlockDecoder{}, + } + } + return v +} + +func releaseBlockReader(br *blockReader) { + br.reset() + blockReaderPool.Put(br) +} + +// reset clears reader for reuse +func (br *blockReader) reset() { + br.dataReader = nil + br.keysReader = nil + + // Clear tag readers map + for k := range br.tagReaders { + delete(br.tagReaders, k) + } + + br.decoder.Reset() + br.pooled = false +} + +// init initializes reader with part files +func (br *blockReader) init(p *part, tagProjection []string) error { + br.reset() + + br.dataReader = p.data + br.keysReader = p.userKeys + + // Initialize tag readers for projected tags only + for _, tagName := range tagProjection { + if tagReader, ok := p.tags[tagName]; ok { + br.tagReaders[tagName] = tagReader + } + } + + return nil +} + +// mustReadFrom loads block data from files +func (br *blockReader) mustReadFrom(bm *blockMetadata, dst *block) error { + dst.reset() + + // Read user keys + if err := br.readUserKeys(bm, dst); err != nil { + return fmt.Errorf("failed to read user keys: %w", err) + } + + // Read data payloads + if err := br.readData(bm, dst); err != nil { + return fmt.Errorf("failed to read data: %w", err) + } + + // Read tag data + if err := br.readTags(bm, dst); err != nil { + return fmt.Errorf("failed to read tags: %w", err) + } + + // Validate loaded block + if err := dst.validate(); err != nil { + return fmt.Errorf("loaded block validation failed: %w", err) + } + + return nil +} + +// readUserKeys reads user keys from keys.bin +func (br *blockReader) readUserKeys(bm *blockMetadata, dst *block) error { + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) + + // Read keys block + bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(bm.keysBlock.size)) + fs.MustReadData(br.keysReader, int64(bm.keysBlock.offset), bb.Buf) + + // Decode keys + dst.userKeys = encoding.ExtendListCapacity(dst.userKeys, int(bm.count)) + dst.userKeys = dst.userKeys[:bm.count] + + _, err := encoding.BytesToInt64List(dst.userKeys, bb.Buf) + if err != nil { + return fmt.Errorf("failed to decode user keys: %w", err) + } + + return nil +} + +// readData reads data payloads from data.bin +func (br *blockReader) readData(bm *blockMetadata, dst *block) error { + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) + + // Read data block + bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(bm.dataBlock.size)) + fs.MustReadData(br.dataReader, int64(bm.dataBlock.offset), bb.Buf) + + // Decompress if necessary + decompressed, err := zstd.DecompressBytes(bb.Buf) + if err != nil { + return fmt.Errorf("failed to decompress data block: %w", err) + } + + // Decode data payloads + dst.data = encoding.ExtendSliceCapacity(dst.data, int(bm.count)) + dst.data = dst.data[:bm.count] + + if err := br.decoder.DecodeBytesList(dst.data, decompressed); err != nil { + return fmt.Errorf("failed to decode data payloads: %w", err) + } + + return nil +} + +// readTags reads tag data from tag files +func (br *blockReader) readTags(bm *blockMetadata, dst *block) error { + for tagName, tagBlock := range bm.tagsBlocks { + tagReader, ok := br.tagReaders[tagName] + if !ok { + continue // Tag not in projection + } + + if err := br.readTag(tagName, tagBlock, tagReader, dst, int(bm.count)); err != nil { + return fmt.Errorf("failed to read tag %s: %w", tagName, err) + } + } + + return nil +} + +// readTag reads a single tag's data +func (br *blockReader) readTag(tagName string, tagBlock dataBlock, + tagReader fs.Reader, dst *block, count int) error { + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) + + // Read tag data block + bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tagBlock.size)) + fs.MustReadData(tagReader, int64(tagBlock.offset), bb.Buf) + + // Create tag data + td := generateTagData() + td.name = tagName + td.values = make([][]byte, count) + + // Decode tag values + if err := br.decoder.DecodeBytesList(td.values, bb.Buf); err != nil { + releaseTagData(td) + return fmt.Errorf("failed to decode tag values: %w", err) + } + + dst.tags[tagName] = td + return nil +} +``` + +###### D. Block Scanner (`block_scanner`) + +The block scanner provides efficient scanning for queries: + +```go +// blockScanner scans blocks for query processing +type blockScanner struct { + // Query parameters + minKey int64 + maxKey int64 + seriesFilter map[common.SeriesID]struct{} + tagFilters map[string][]byte // Tag filters for optimization + + // Current scan state + currentBlock *block + currentIndex int + + // Resources + reader *blockReader + tmpBlock *block + + // Pool management + pooled bool +} + +var blockScannerPool = pool.Register[*blockScanner]("sidx-blockScanner") + +func generateBlockScanner() *blockScanner { + v := blockScannerPool.Get() + if v == nil { + return &blockScanner{ + seriesFilter: make(map[common.SeriesID]struct{}), + tagFilters: make(map[string][]byte), + reader: generateBlockReader(), + tmpBlock: generateBlock(), + } + } + return v +} + +func releaseBlockScanner(bs *blockScanner) { + bs.reset() + blockScannerPool.Put(bs) +} + +// reset clears scanner for reuse +func (bs *blockScanner) reset() { + bs.minKey = 0 + bs.maxKey = 0 + bs.currentIndex = 0 + + // Clear filters + for k := range bs.seriesFilter { + delete(bs.seriesFilter, k) + } + for k := range bs.tagFilters { + delete(bs.tagFilters, k) + } + + // Reset resources + if bs.reader != nil { + releaseBlockReader(bs.reader) + bs.reader = generateBlockReader() + } + + if bs.tmpBlock != nil { + releaseBlock(bs.tmpBlock) + bs.tmpBlock = generateBlock() + } + + bs.currentBlock = nil + bs.pooled = false +} + +// init initializes scanner with query parameters +func (bs *blockScanner) init(minKey, maxKey int64, + seriesIDs []common.SeriesID, + tagFilters map[string][]byte) { + bs.reset() + + bs.minKey = minKey + bs.maxKey = maxKey + + // Convert series slice to map for fast lookup + for _, id := range seriesIDs { + bs.seriesFilter[id] = struct{}{} + } + + // Copy tag filters + for k, v := range tagFilters { + bs.tagFilters[k] = v + } +} + +// scanBlock scans a block and returns matching elements +func (bs *blockScanner) scanBlock(bm *blockMetadata, p *part) ([]*element, error) { + // Quick check: does block overlap with query range? + if !bm.overlapsKeyRange(bs.minKey, bs.maxKey) { + return nil, nil + } + + // Quick check: does block contain relevant series? + if len(bs.seriesFilter) > 0 { + if _, ok := bs.seriesFilter[bm.seriesID]; !ok { + return nil, nil + } + } + + // Initialize reader and load block + if err := bs.reader.init(p, bm.tagProjection); err != nil { + return nil, fmt.Errorf("failed to init block reader: %w", err) + } + + if err := bs.reader.mustReadFrom(bm, bs.tmpBlock); err != nil { + return nil, fmt.Errorf("failed to read block: %w", err) + } + + // Scan block for matching elements + return bs.scanBlockElements(bs.tmpBlock) +} + +// scanBlockElements scans elements within a loaded block +func (bs *blockScanner) scanBlockElements(block *block) ([]*element, error) { + var results []*element + + for i := 0; i < len(block.userKeys); i++ { + // Check key range + if block.userKeys[i] < bs.minKey || block.userKeys[i] > bs.maxKey { + continue + } + + // Check tag filters + if !bs.matchesTagFilters(block, i) { + continue + } + + // Create matching element + elem := generateElement() + elem.userKey = block.userKeys[i] + elem.data = block.data[i] + + // Copy tag values + for tagName, tagData := range block.tags { + if i < len(tagData.values) && tagData.values[i] != nil { + tag := generateTag() + tag.name = tagName + tag.value = tagData.values[i] + tag.valueType = tagData.valueType + elem.tags = append(elem.tags, tag) + } + } + + results = append(results, elem) + } + + return results, nil +} + +// matchesTagFilters checks if element matches tag filters +func (bs *blockScanner) matchesTagFilters(block *block, index int) bool { + for filterTagName, filterValue := range bs.tagFilters { + tagData, ok := block.tags[filterTagName] + if !ok { + return false // Tag not present + } + + if index >= len(tagData.values) { + return false // No value for this element + } + + elementValue := tagData.values[index] + if elementValue == nil { + return false // Null value + } + + if !bytes.Equal(elementValue, filterValue) { + return false // Value doesn't match + } + } + + return true +} +``` + +###### E. Block Writer (`block_writer`) + +The block writer handles efficient writing of blocks to disk: + +```go +// blockWriter writes blocks to part files with compression +type blockWriter struct { + // File writers + dataWriter fs.Writer // Writes to data.bin + keysWriter fs.Writer // Writes to keys.bin + tagWriters map[string]fs.Writer // Writes to tag_*.td files + + // Compression + compressor *zstd.Compressor + + // Write tracking + bytesWritten map[string]uint64 // Track bytes written per file + + // Pool management + pooled bool +} + +var blockWriterPool = pool.Register[*blockWriter]("sidx-blockWriter") + +func generateBlockWriter() *blockWriter { + v := blockWriterPool.Get() + if v == nil { + return &blockWriter{ + tagWriters: make(map[string]fs.Writer), + bytesWritten: make(map[string]uint64), + compressor: zstd.NewCompressor(), + } + } + return v +} + +func releaseBlockWriter(bw *blockWriter) { + bw.reset() + blockWriterPool.Put(bw) +} + +// reset clears writer for reuse +func (bw *blockWriter) reset() { + bw.dataWriter = nil + bw.keysWriter = nil + + // Clear writers and tracking + for k := range bw.tagWriters { + delete(bw.tagWriters, k) + } + for k := range bw.bytesWritten { + delete(bw.bytesWritten, k) + } + + if bw.compressor != nil { + bw.compressor.Reset() + } + + bw.pooled = false +} + +// init initializes writer with part writers +func (bw *blockWriter) init(dataWriter, keysWriter fs.Writer, + tagWriters map[string]fs.Writer) { + bw.reset() + + bw.dataWriter = dataWriter + bw.keysWriter = keysWriter + + // Copy tag writers + for tagName, writer := range tagWriters { + bw.tagWriters[tagName] = writer + } +} + +// mustWriteTo writes block to files and updates metadata +func (bw *blockWriter) mustWriteTo(block *block, bm *blockMetadata) error { + if err := block.validate(); err != nil { + return fmt.Errorf("block validation failed: %w", err) + } + + bm.reset() + + // Set basic metadata + bm.minKey = block.userKeys[0] + bm.maxKey = block.userKeys[len(block.userKeys)-1] + bm.count = uint64(len(block.userKeys)) + bm.uncompressedSize = block.uncompressedSizeBytes() + + // Write user keys + if err := bw.writeUserKeys(block, bm); err != nil { + return fmt.Errorf("failed to write user keys: %w", err) + } + + // Write data payloads + if err := bw.writeData(block, bm); err != nil { + return fmt.Errorf("failed to write data: %w", err) + } + + // Write tags + if err := bw.writeTags(block, bm); err != nil { + return fmt.Errorf("failed to write tags: %w", err) + } + + return nil +} + +// writeUserKeys writes user keys to keys.bin +func (bw *blockWriter) writeUserKeys(block *block, bm *blockMetadata) error { + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) + + // Encode user keys + bb.Buf = encoding.Int64ListToBytes(bb.Buf[:0], block.userKeys) + + // Track write position + offset := bw.bytesWritten["keys"] + size := uint64(len(bb.Buf)) + + // Write to file + fs.MustWriteData(bw.keysWriter.SequentialWrite(), bb.Buf) + + // Update metadata + bm.keysBlock = dataBlock{offset: offset, size: size} + bw.bytesWritten["keys"] = offset + size + + return nil +} + +// writeData writes data payloads to data.bin +func (bw *blockWriter) writeData(block *block, bm *blockMetadata) error { + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) + + // Encode data payloads + bb.Buf = encoding.BytesListToBytes(bb.Buf[:0], block.data) + + // Compress data + compressed, err := bw.compressor.CompressBytes(bb.Buf) + if err != nil { + return fmt.Errorf("failed to compress data: %w", err) + } + + // Track write position + offset := bw.bytesWritten["data"] + size := uint64(len(compressed)) + + // Write to file + fs.MustWriteData(bw.dataWriter.SequentialWrite(), compressed) + + // Update metadata + bm.dataBlock = dataBlock{offset: offset, size: size} + bw.bytesWritten["data"] = offset + size + + return nil +} + +// writeTags writes tag data to tag files +func (bw *blockWriter) writeTags(block *block, bm *blockMetadata) error { + for tagName, tagData := range block.tags { + writer, ok := bw.tagWriters[tagName] + if !ok { + continue // Tag writer not available + } + + if err := bw.writeTag(tagName, tagData, writer, bm); err != nil { + return fmt.Errorf("failed to write tag %s: %w", tagName, err) + } + } + + return nil +} + +// writeTag writes a single tag's data +func (bw *blockWriter) writeTag(tagName string, tagData *tagData, + writer fs.Writer, bm *blockMetadata) error { + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) + + // Encode tag values + bb.Buf = encoding.BytesListToBytes(bb.Buf[:0], tagData.values) + + // Track write position + fileKey := "tag_" + tagName + offset := bw.bytesWritten[fileKey] + size := uint64(len(bb.Buf)) + + // Write to file + fs.MustWriteData(writer.SequentialWrite(), bb.Buf) + + // Update metadata + bm.tagsBlocks[tagName] = dataBlock{offset: offset, size: size} + bw.bytesWritten[fileKey] = offset + size + + return nil +} + +// totalBytesWritten returns total bytes written across all files +func (bw *blockWriter) totalBytesWritten() uint64 { + var total uint64 + for _, bytes := range bw.bytesWritten { + total += bytes + } + return total +} + +// mustClose closes all writers +func (bw *blockWriter) mustClose() { + if bw.dataWriter != nil { + fs.MustClose(bw.dataWriter.SequentialWrite()) + } + if bw.keysWriter != nil { + fs.MustClose(bw.keysWriter.SequentialWrite()) + } + for _, writer := range bw.tagWriters { + fs.MustClose(writer.SequentialWrite()) + } +} +``` + +##### Component Dependency Relationships + +``` +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ Block │ │ Block Metadata │ │ Block Writer │ +│ (Runtime) │◄──►│ (Storage) │◄───│ (Persist) │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ + ▲ ▲ ▲ + │ │ │ + │ │ │ +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ Block Reader │ │ Block Scanner │ │ Parts │ +│ (Load) │────│ (Query) │────│ (Storage) │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ +``` + +**Key Dependencies:** + +1. **Block → Block Metadata**: Blocks generate metadata during write operations +2. **Block Metadata → Block Reader**: Metadata guides what and how to read +3. **Block Reader → Block**: Reader loads data into block structures +4. **Block Scanner → Block Reader**: Scanner uses reader to load blocks for queries +5. **Block Writer → Parts**: Writer creates files within part directories +6. **Parts → Block Scanner**: Scanner operates on blocks within parts + +##### Key Design Features + +###### Block Size Management +```go +const ( + maxUncompressedBlockSize = 8 * 1024 * 1024 // 8MB uncompressed + maxPooledSliceSize = 1024 * 1024 // 1MB max pooled slice + maxPooledSliceCount = 1000 // Max pooled slice count +) + +// isFull checks if block has reached size limit +func (b *block) isFull() bool { + return b.uncompressedSizeBytes() >= maxUncompressedBlockSize +} +``` + +###### Compression Strategy +- **Data Payloads**: zstd compression for user data +- **User Keys**: Specialized int64 encoding for optimal space usage +- **Tag Values**: Type-specific encoding with bloom filters for indexed tags +- **Metadata**: JSON for readability, binary for performance-critical paths + +###### Memory Management +- **Object Pooling**: All major structures use sync.Pool for allocation efficiency +- **Reference Counting**: Safe concurrent access with atomic operations +- **Resource Limits**: Configurable limits prevent memory exhaustion +- **Reset Methods**: Proper cleanup enables safe object reuse + +###### Error Handling +- **Validation**: Comprehensive validation at all levels +- **Recovery**: Graceful handling of corruption and I/O errors +- **Logging**: Detailed error context for debugging +- **Consistency**: Atomic operations maintain data integrity + +##### File Organization in Parts + +``` +000000001234abcd/ # Part directory (epoch-based name) +├── manifest.json # Part metadata and statistics +├── primary.bin # Block metadata (references to all files) +├── data.bin # User data payloads (compressed) +├── keys.bin # User-provided int64 ordering keys +├── meta.bin # Part-level metadata +├── tag_service_id.td # Service ID tag data +├── tag_service_id.tm # Service ID tag metadata +├── tag_service_id.tf # Service ID tag filter (bloom) +├── tag_endpoint.td # Endpoint tag data +├── tag_endpoint.tm # Endpoint tag metadata +├── tag_endpoint.tf # Endpoint tag filter +└── ... # Additional tag files +``` + +**File Format Details:** +- **primary.bin**: Contains array of blockMetadata structures +- **data.bin**: Compressed user payloads with block boundaries +- **keys.bin**: Int64-encoded user keys with block boundaries +- **tag_*.td**: Tag value data with type-specific encoding +- **tag_*.tm**: Tag metadata with bloom filter parameters +- **tag_*.tf**: Bloom filter data for fast tag filtering + +This block architecture provides efficient, scalable storage for user-key-based data while maintaining consistency with the existing sidx design principles and production requirements. + +#### 6. Snapshot Management ```go type snapshot struct { parts []*partWrapper @@ -400,7 +1398,7 @@ func (s *snapshot) getParts(dst []*part, minKey, maxKey int64) ([]*part, int) { } ``` -#### 6. Enhanced API Design +#### 7. Enhanced API Design **Write Interface:** - Context support for cancellation and timeouts @@ -490,7 +1488,7 @@ func (s *snapshot) getParts(dst []*part, minKey, maxKey int64) ([]*part, int) { #### Block Metadata Validation ```go // Validation ensures monotonic ordering of blocks -func validatePrimaryBlockMetadata(blocks []primaryBlockMetadata) error { +func validateBlockMetadata(blocks []blockMetadata) error { for i := 1; i < len(blocks); i++ { if blocks[i].seriesID < blocks[i-1].seriesID { return fmt.Errorf("unexpected block with smaller seriesID") diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md index 0273586d..36463c83 100644 --- a/banyand/internal/sidx/TODO.md +++ b/banyand/internal/sidx/TODO.md @@ -43,7 +43,7 @@ This document tracks the implementation progress of the Secondary Index File Sys ### 1.3 Metadata Structures (`metadata.go`) ✅ - [x] partMetadata with MinKey/MaxKey (replacing timestamps) -- [x] primaryBlockMetadata with block offsets and key ranges +- [x] blockMetadata with block offsets and key ranges - [x] Validation methods for metadata integrity - [x] **Test Cases**: - [x] Metadata serialization/deserialization @@ -51,21 +51,26 @@ This document tracks the implementation progress of the Secondary Index File Sys - [x] Version compatibility checks - [x] Corruption detection in metadata -### 1.4 Block Structure (`block.go`) 🔥 -- [ ] **Core block organization for elements within parts** -- [ ] Contains userKeys[], seriesIDs[], data[], tags[] -- [ ] Methods: reset(), mustInitFromElements(), validation -- [ ] Sorting validation for elements within blocks -- [ ] **Tag encoding/decoding**: Uses shared encoding module from `banyand/internal/encoding/tag_encoder.go` - - [ ] Implement `block.marshal()` and `block.unmarshal()` methods - - [ ] Use `EncodeTagValues()` and `DecodeTagValues()` for tag serialization - - [ ] Apply sophisticated encoding: delta for int64, dictionary for strings, zstd for plain +### 1.4 Block Structure (`block.go`) 🔥 - DESIGN COMPLETED ✅ +- [ ] **Core block structure**: userKeys[], elementIDs[], data[], tags map +- [ ] **Block components design**: Block, Block Metadata, Block Reader, Block Scanner, Block Writer +- [ ] **Memory management**: Object pooling with reset() methods +- [ ] **Block operations**: mustInitFromElements(), validate(), uncompressedSizeBytes() +- [ ] **Tag processing**: processTag() for individual tag handling within blocks +- [ ] **Component relationships**: Dependency diagram and interaction patterns +- [ ] **File organization**: Block storage within part directories +- [ ] **Implementation Tasks**: + - [ ] Create block.go with core block structure + - [ ] Implement reset() and validation methods + - [ ] Add mustInitFromElements() for block initialization + - [ ] Implement processTag() for tag data organization + - [ ] Add size calculation methods - [ ] **Test Cases**: - [ ] Block initialization from sorted elements - - [ ] Element organization by seriesID and userKey - [ ] Key ordering validation within blocks - [ ] Block reset and reuse functionality - - [ ] Tag encoding/decoding with various value types + - [ ] Tag processing and bloom filter generation + - [ ] Memory pooling effectiveness ### 1.5 Part Structure (`part.go`) - [ ] File readers for primary.bin, data.bin, keys.bin, meta.bin @@ -101,10 +106,18 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Element addition and retrieval - [ ] Memory usage tracking accuracy -### 2.2 Block Writer (`block_writer.go`) 🔥 -- [ ] **Uses block.go to serialize block data** -- [ ] Compression support (zstd) -- [ ] Write blocks to memory/disk buffers +### 2.2 Block Writer (`block_writer.go`) 🔥 - DESIGN COMPLETED ✅ +- [ ] **Complete block writer design added to DESIGN.md** +- [ ] **Multi-file writing**: data.bin, keys.bin, tag_*.td files +- [ ] **Compression**: zstd compression for data payloads +- [ ] **Write tracking**: Track bytes written per file +- [ ] **Memory management**: Object pooling with reset() methods +- [ ] **Atomic operations**: mustWriteTo() for block serialization +- [ ] **Implementation Tasks**: + - [ ] Create block_writer.go with core writer structure + - [ ] Implement writeUserKeys(), writeData(), writeTags() + - [ ] Add compression/decompression support + - [ ] Implement write tracking and position management - [ ] **Test Cases**: - [ ] Block serialization with various data sizes - [ ] Compression ratios meet expectations @@ -121,10 +134,18 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Performance benchmarks for large datasets - [ ] Edge cases (empty, single element, duplicate keys) -### 2.4 Block Initialization (`block.go` methods) 🔥 -- [ ] **mustInitFromElements() processes sorted elements into blocks** -- [ ] Validate key ordering within blocks -- [ ] Group elements by seriesID efficiently +### 2.4 Block Initialization (`block.go` methods) 🔥 - DESIGN COMPLETED ✅ +- [ ] **Complete block initialization design added to DESIGN.md** +- [ ] **mustInitFromElements()**: Process sorted elements into blocks +- [ ] **mustInitFromTags()**: Process tag data for blocks +- [ ] **processTag()**: Create tag data structures with bloom filters +- [ ] **Key validation**: Verify sorting and consistency +- [ ] **Tag optimization**: Bloom filters for indexed tags, min/max for int64 tags +- [ ] **Implementation Tasks**: + - [ ] Implement mustInitFromElements() with element processing + - [ ] Add mustInitFromTags() for tag data organization + - [ ] Create processTag() with bloom filter generation + - [ ] Add validation for element ordering - [ ] **Test Cases**: - [ ] Block building from various element configurations - [ ] Validation errors for unsorted elements @@ -209,10 +230,17 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Error messages are helpful for debugging - [ ] Edge cases (duplicate keys, negative keys) -### 4.4 Block Building (`writer.go` + `block.go`) 🔥 -- [ ] **When memory part reaches threshold, organize elements into blocks** -- [ ] **Call block.mustInitFromElements() with sorted elements** -- [ ] Create blocks with configured size limits +### 4.4 Block Building (`writer.go` + `block.go`) 🔥 - DESIGN COMPLETED ✅ +- [ ] **Complete block building design added to DESIGN.md** +- [ ] **Element organization**: Sort elements by seriesID then userKey +- [ ] **Block creation**: mustInitFromElements() with sorted elements +- [ ] **Size management**: maxUncompressedBlockSize limits +- [ ] **Memory efficiency**: Object pooling and resource management +- [ ] **Implementation Tasks**: + - [ ] Integrate block building into write path + - [ ] Add threshold detection for block creation + - [ ] Implement size limit enforcement + - [ ] Add element distribution logic - [ ] **Test Cases**: - [ ] Block creation triggers at correct thresholds - [ ] Size limits are enforced properly @@ -253,10 +281,17 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] File format compatibility - [ ] Performance of tag file generation -### 5.4 Block Serialization (`flusher.go` + `block_writer.go`) 🔥 -- [ ] **Serialize blocks from memory parts to disk** -- [ ] **Use block structure to write primary.bin** -- [ ] Compress block data before writing +### 5.4 Block Serialization (`flusher.go` + `block_writer.go`) 🔥 - DESIGN COMPLETED ✅ +- [ ] **Complete block serialization design added to DESIGN.md** +- [ ] **Multi-file output**: primary.bin, data.bin, keys.bin, tag files +- [ ] **Block writer integration**: mustWriteTo() for block persistence +- [ ] **Compression strategy**: zstd for data, specialized encoding for keys +- [ ] **Metadata generation**: Block metadata with file references +- [ ] **Implementation Tasks**: + - [ ] Integrate block writer into flush operations + - [ ] Add primary.bin writing with block metadata + - [ ] Implement compression for block data + - [ ] Add file reference management - [ ] **Test Cases**: - [ ] Block persistence maintains data integrity - [ ] Compression reduces storage footprint @@ -297,10 +332,17 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Key ordering is maintained across parts - [ ] Merged part metadata is accurate -### 6.4 Block Merging (`merger.go` + `block.go`) 🔥 -- [ ] **Read blocks from multiple parts** -- [ ] **Merge blocks maintaining seriesID/key ordering** -- [ ] **Create new blocks for merged part** +### 6.4 Block Merging (`merger.go` + `block.go`) 🔥 - DESIGN COMPLETED ✅ +- [ ] **Complete block merging design added to DESIGN.md** +- [ ] **Block reader integration**: Read blocks from multiple parts +- [ ] **Merge strategy**: Maintain key ordering across merged blocks +- [ ] **Block writer output**: Create new blocks for merged parts +- [ ] **Memory management**: Efficient block processing with pooling +- [ ] **Implementation Tasks**: + - [ ] Integrate block reader for loading blocks from parts + - [ ] Add block merging logic with ordering preservation + - [ ] Implement merged block creation + - [ ] Add memory-efficient merge processing - [ ] **Test Cases**: - [ ] Block merge correctness across parts - [ ] Ordering preservation during merge @@ -331,10 +373,18 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Boundary conditions handled correctly - [ ] Empty result sets handled gracefully -### 7.3 Block Scanner (`block_scanner.go`) 🔥 -- [ ] **Read and scan blocks from parts** -- [ ] **Deserialize block data using block structure** -- [ ] Apply tag filters using bloom filters +### 7.3 Block Scanner (`block_scanner.go`) 🔥 - DESIGN COMPLETED ✅ +- [ ] **Complete block scanner design added to DESIGN.md** +- [ ] **Query processing**: scanBlock() with range and tag filtering +- [ ] **Memory management**: Object pooling with reset() methods +- [ ] **Efficient filtering**: Quick checks before loading block data +- [ ] **Element matching**: scanBlockElements() with tag filter matching +- [ ] **Resource management**: Block reader and temporary block handling +- [ ] **Implementation Tasks**: + - [ ] Create block_scanner.go with scanner structure + - [ ] Implement scanBlock() with filtering logic + - [ ] Add scanBlockElements() for element processing + - [ ] Create matchesTagFilters() for tag filtering - [ ] **Test Cases**: - [ ] Scan completeness finds all matching data - [ ] Filter effectiveness reduces false positives @@ -351,10 +401,18 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Ordering is maintained across parts - [ ] Iterator cleanup prevents resource leaks -### 7.5 Block Reader (`block_reader.go`) 🔥 -- [ ] **Deserialize blocks from disk** -- [ ] **Reconstruct block structure with elements** -- [ ] Decompress block data efficiently +### 7.5 Block Reader (`block_reader.go`) 🔥 - DESIGN COMPLETED ✅ +- [x] **Complete block reader design added to DESIGN.md** +- [x] **Multi-file reading**: data.bin, keys.bin, tag_*.td files +- [x] **Decompression**: zstd decompression for data payloads +- [x] **Memory management**: Object pooling with reset() methods +- [x] **Selective loading**: Tag projection for efficient I/O +- [x] **Block reconstruction**: mustReadFrom() for complete block loading +- [ ] **Implementation Tasks**: + - [ ] Create block_reader.go with reader structure + - [ ] Implement readUserKeys(), readData(), readTags() + - [ ] Add decompression support + - [ ] Create mustReadFrom() for block loading - [ ] **Test Cases**: - [ ] Block reading maintains data integrity - [ ] Decompression works correctly diff --git a/banyand/internal/sidx/metadata.go b/banyand/internal/sidx/metadata.go index 5f78f028..ba3af05f 100644 --- a/banyand/internal/sidx/metadata.go +++ b/banyand/internal/sidx/metadata.go @@ -44,24 +44,22 @@ type partMetadata struct { ID uint64 `json:"id"` // Unique part identifier } -// primaryBlockMetadata contains metadata for a block within a part. -type primaryBlockMetadata struct { - // Block references to files - tagsBlocks map[string]dataBlock // References to tag files - dataBlock dataBlock // Reference to data in data.bin - keysBlock dataBlock // Reference to keys in keys.bin - - // Block identification - seriesID common.SeriesID - - // Key range within block - minKey int64 // Minimum user key in block - maxKey int64 // Maximum user key in block +// blockMetadata contains metadata for a block within a part. +type blockMetadata struct { + tagsBlocks map[string]dataBlock + tagProjection []string + dataBlock dataBlock + keysBlock dataBlock + seriesID common.SeriesID + minKey int64 + maxKey int64 + uncompressedSize uint64 + count uint64 } var ( - partMetadataPool = pool.Register[*partMetadata]("sidx-partMetadata") - primaryBlockMetadataPool = pool.Register[*primaryBlockMetadata]("sidx-primaryBlockMetadata") + partMetadataPool = pool.Register[*partMetadata]("sidx-partMetadata") + blockMetadataPool = pool.Register[*blockMetadata]("sidx-blockMetadata") ) // generatePartMetadata gets partMetadata from pool or creates new. @@ -82,24 +80,24 @@ func releasePartMetadata(pm *partMetadata) { partMetadataPool.Put(pm) } -// generatePrimaryBlockMetadata gets primaryBlockMetadata from pool or creates new. -func generatePrimaryBlockMetadata() *primaryBlockMetadata { - v := primaryBlockMetadataPool.Get() +// generateBlockMetadata gets blockMetadata from pool or creates new. +func generateBlockMetadata() *blockMetadata { + v := blockMetadataPool.Get() if v == nil { - return &primaryBlockMetadata{ + return &blockMetadata{ tagsBlocks: make(map[string]dataBlock), } } return v } -// releasePrimaryBlockMetadata returns primaryBlockMetadata to pool after reset. -func releasePrimaryBlockMetadata(pbm *primaryBlockMetadata) { - if pbm == nil { +// releaseBlockMetadata returns blockMetadata to pool after reset. +func releaseBlockMetadata(bm *blockMetadata) { + if bm == nil { return } - pbm.reset() - primaryBlockMetadataPool.Put(pbm) + bm.reset() + blockMetadataPool.Put(bm) } // reset clears partMetadata for reuse in object pool. @@ -113,17 +111,21 @@ func (pm *partMetadata) reset() { pm.ID = 0 } -// reset clears primaryBlockMetadata for reuse in object pool. -func (pbm *primaryBlockMetadata) reset() { - pbm.seriesID = 0 - pbm.minKey = 0 - pbm.maxKey = 0 - pbm.dataBlock = dataBlock{} - pbm.keysBlock = dataBlock{} - // Clear the map instead of creating a new one - for k := range pbm.tagsBlocks { - delete(pbm.tagsBlocks, k) - } +// reset clears blockMetadata for reuse in object pool. +func (bm *blockMetadata) reset() { + bm.seriesID = 0 + bm.minKey = 0 + bm.maxKey = 0 + bm.dataBlock = dataBlock{} + bm.keysBlock = dataBlock{} + bm.uncompressedSize = 0 + bm.count = 0 + + // Clear maps but keep them allocated + for k := range bm.tagsBlocks { + delete(bm.tagsBlocks, k) + } + bm.tagProjection = bm.tagProjection[:0] } // validate validates the partMetadata for consistency. @@ -141,25 +143,25 @@ func (pm *partMetadata) validate() error { return nil } -// validate validates the primaryBlockMetadata for consistency. -func (pbm *primaryBlockMetadata) validate() error { - if pbm.minKey > pbm.maxKey { - return fmt.Errorf("invalid block key range: minKey (%d) > maxKey (%d)", pbm.minKey, pbm.maxKey) +// validate validates the blockMetadata for consistency. +func (bm *blockMetadata) validate() error { + if bm.minKey > bm.maxKey { + return fmt.Errorf("invalid block key range: minKey (%d) > maxKey (%d)", bm.minKey, bm.maxKey) } - if pbm.seriesID == 0 { + if bm.seriesID == 0 { return fmt.Errorf("invalid seriesID: cannot be zero") } - if pbm.dataBlock.size == 0 { + if bm.dataBlock.size == 0 { return fmt.Errorf("invalid data block: size cannot be zero") } - if pbm.keysBlock.size == 0 { + if bm.keysBlock.size == 0 { return fmt.Errorf("invalid keys block: size cannot be zero") } return nil } -// validatePrimaryBlockMetadata validates ordering of blocks within a part. -func validatePrimaryBlockMetadata(blocks []primaryBlockMetadata) error { +// validateBlockMetadata validates ordering of blocks within a part. +func validateBlockMetadata(blocks []blockMetadata) error { if len(blocks) == 0 { return nil } @@ -232,46 +234,46 @@ func unmarshalPartMetadata(data []byte) (*partMetadata, error) { return pm, nil } -// marshal serializes primaryBlockMetadata to bytes. -func (pbm *primaryBlockMetadata) marshal() ([]byte, error) { +// marshal serializes blockMetadata to bytes. +func (bm *blockMetadata) marshal() ([]byte, error) { buf := &bytes.Buffer{} // Write seriesID - if err := binary.Write(buf, binary.LittleEndian, pbm.seriesID); err != nil { + if err := binary.Write(buf, binary.LittleEndian, bm.seriesID); err != nil { return nil, fmt.Errorf("failed to write seriesID: %w", err) } // Write key range - if err := binary.Write(buf, binary.LittleEndian, pbm.minKey); err != nil { + if err := binary.Write(buf, binary.LittleEndian, bm.minKey); err != nil { return nil, fmt.Errorf("failed to write minKey: %w", err) } - if err := binary.Write(buf, binary.LittleEndian, pbm.maxKey); err != nil { + if err := binary.Write(buf, binary.LittleEndian, bm.maxKey); err != nil { return nil, fmt.Errorf("failed to write maxKey: %w", err) } // Write data block - if err := binary.Write(buf, binary.LittleEndian, pbm.dataBlock.offset); err != nil { + if err := binary.Write(buf, binary.LittleEndian, bm.dataBlock.offset); err != nil { return nil, fmt.Errorf("failed to write data block offset: %w", err) } - if err := binary.Write(buf, binary.LittleEndian, pbm.dataBlock.size); err != nil { + if err := binary.Write(buf, binary.LittleEndian, bm.dataBlock.size); err != nil { return nil, fmt.Errorf("failed to write data block size: %w", err) } // Write keys block - if err := binary.Write(buf, binary.LittleEndian, pbm.keysBlock.offset); err != nil { + if err := binary.Write(buf, binary.LittleEndian, bm.keysBlock.offset); err != nil { return nil, fmt.Errorf("failed to write keys block offset: %w", err) } - if err := binary.Write(buf, binary.LittleEndian, pbm.keysBlock.size); err != nil { + if err := binary.Write(buf, binary.LittleEndian, bm.keysBlock.size); err != nil { return nil, fmt.Errorf("failed to write keys block size: %w", err) } // Write tag blocks count - if err := binary.Write(buf, binary.LittleEndian, uint32(len(pbm.tagsBlocks))); err != nil { + if err := binary.Write(buf, binary.LittleEndian, uint32(len(bm.tagsBlocks))); err != nil { return nil, fmt.Errorf("failed to write tag blocks count: %w", err) } // Write tag blocks - for tagName, tagBlock := range pbm.tagsBlocks { + for tagName, tagBlock := range bm.tagsBlocks { // Write tag name length and name nameBytes := []byte(tagName) if err := binary.Write(buf, binary.LittleEndian, uint32(len(nameBytes))); err != nil { @@ -293,51 +295,51 @@ func (pbm *primaryBlockMetadata) marshal() ([]byte, error) { return buf.Bytes(), nil } -// unmarshalPrimaryBlockMetadata deserializes primaryBlockMetadata from bytes. -func unmarshalPrimaryBlockMetadata(data []byte) (*primaryBlockMetadata, error) { - pbm := generatePrimaryBlockMetadata() +// unmarshalBlockMetadata deserializes blockMetadata from bytes. +func unmarshalBlockMetadata(data []byte) (*blockMetadata, error) { + bm := generateBlockMetadata() buf := bytes.NewReader(data) // Read seriesID - if err := binary.Read(buf, binary.LittleEndian, &pbm.seriesID); err != nil { - releasePrimaryBlockMetadata(pbm) + if err := binary.Read(buf, binary.LittleEndian, &bm.seriesID); err != nil { + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read seriesID: %w", err) } // Read key range - if err := binary.Read(buf, binary.LittleEndian, &pbm.minKey); err != nil { - releasePrimaryBlockMetadata(pbm) + if err := binary.Read(buf, binary.LittleEndian, &bm.minKey); err != nil { + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read minKey: %w", err) } - if err := binary.Read(buf, binary.LittleEndian, &pbm.maxKey); err != nil { - releasePrimaryBlockMetadata(pbm) + if err := binary.Read(buf, binary.LittleEndian, &bm.maxKey); err != nil { + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read maxKey: %w", err) } // Read data block - if err := binary.Read(buf, binary.LittleEndian, &pbm.dataBlock.offset); err != nil { - releasePrimaryBlockMetadata(pbm) + if err := binary.Read(buf, binary.LittleEndian, &bm.dataBlock.offset); err != nil { + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read data block offset: %w", err) } - if err := binary.Read(buf, binary.LittleEndian, &pbm.dataBlock.size); err != nil { - releasePrimaryBlockMetadata(pbm) + if err := binary.Read(buf, binary.LittleEndian, &bm.dataBlock.size); err != nil { + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read data block size: %w", err) } // Read keys block - if err := binary.Read(buf, binary.LittleEndian, &pbm.keysBlock.offset); err != nil { - releasePrimaryBlockMetadata(pbm) + if err := binary.Read(buf, binary.LittleEndian, &bm.keysBlock.offset); err != nil { + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read keys block offset: %w", err) } - if err := binary.Read(buf, binary.LittleEndian, &pbm.keysBlock.size); err != nil { - releasePrimaryBlockMetadata(pbm) + if err := binary.Read(buf, binary.LittleEndian, &bm.keysBlock.size); err != nil { + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read keys block size: %w", err) } // Read tag blocks count var tagBlocksCount uint32 if err := binary.Read(buf, binary.LittleEndian, &tagBlocksCount); err != nil { - releasePrimaryBlockMetadata(pbm) + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read tag blocks count: %w", err) } @@ -346,12 +348,12 @@ func unmarshalPrimaryBlockMetadata(data []byte) (*primaryBlockMetadata, error) { // Read tag name var nameLen uint32 if err := binary.Read(buf, binary.LittleEndian, &nameLen); err != nil { - releasePrimaryBlockMetadata(pbm) + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read tag name length: %w", err) } nameBytes := make([]byte, nameLen) if _, err := io.ReadFull(buf, nameBytes); err != nil { - releasePrimaryBlockMetadata(pbm) + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read tag name: %w", err) } tagName := string(nameBytes) @@ -359,78 +361,78 @@ func unmarshalPrimaryBlockMetadata(data []byte) (*primaryBlockMetadata, error) { // Read tag block var tagBlock dataBlock if err := binary.Read(buf, binary.LittleEndian, &tagBlock.offset); err != nil { - releasePrimaryBlockMetadata(pbm) + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read tag block offset: %w", err) } if err := binary.Read(buf, binary.LittleEndian, &tagBlock.size); err != nil { - releasePrimaryBlockMetadata(pbm) + releaseBlockMetadata(bm) return nil, fmt.Errorf("failed to read tag block size: %w", err) } - pbm.tagsBlocks[tagName] = tagBlock + bm.tagsBlocks[tagName] = tagBlock } // Validate the metadata - if err := pbm.validate(); err != nil { - releasePrimaryBlockMetadata(pbm) + if err := bm.validate(); err != nil { + releaseBlockMetadata(bm) return nil, fmt.Errorf("block metadata validation failed: %w", err) } - return pbm, nil + return bm, nil } // SeriesID returns the seriesID of the block. -func (pbm *primaryBlockMetadata) SeriesID() common.SeriesID { - return pbm.seriesID +func (bm *blockMetadata) SeriesID() common.SeriesID { + return bm.seriesID } // MinKey returns the minimum user key in the block. -func (pbm *primaryBlockMetadata) MinKey() int64 { - return pbm.minKey +func (bm *blockMetadata) MinKey() int64 { + return bm.minKey } // MaxKey returns the maximum user key in the block. -func (pbm *primaryBlockMetadata) MaxKey() int64 { - return pbm.maxKey +func (bm *blockMetadata) MaxKey() int64 { + return bm.maxKey } // DataBlock returns the data block reference. -func (pbm *primaryBlockMetadata) DataBlock() dataBlock { - return pbm.dataBlock +func (bm *blockMetadata) DataBlock() dataBlock { + return bm.dataBlock } // KeysBlock returns the keys block reference. -func (pbm *primaryBlockMetadata) KeysBlock() dataBlock { - return pbm.keysBlock +func (bm *blockMetadata) KeysBlock() dataBlock { + return bm.keysBlock } // TagsBlocks returns the tag blocks references. -func (pbm *primaryBlockMetadata) TagsBlocks() map[string]dataBlock { - return pbm.tagsBlocks +func (bm *blockMetadata) TagsBlocks() map[string]dataBlock { + return bm.tagsBlocks } // setSeriesID sets the seriesID of the block. -func (pbm *primaryBlockMetadata) setSeriesID(seriesID common.SeriesID) { - pbm.seriesID = seriesID +func (bm *blockMetadata) setSeriesID(seriesID common.SeriesID) { + bm.seriesID = seriesID } // setKeyRange sets the key range of the block. -func (pbm *primaryBlockMetadata) setKeyRange(minKey, maxKey int64) { - pbm.minKey = minKey - pbm.maxKey = maxKey +func (bm *blockMetadata) setKeyRange(minKey, maxKey int64) { + bm.minKey = minKey + bm.maxKey = maxKey } // setDataBlock sets the data block reference. -func (pbm *primaryBlockMetadata) setDataBlock(offset, size uint64) { - pbm.dataBlock = dataBlock{offset: offset, size: size} +func (bm *blockMetadata) setDataBlock(offset, size uint64) { + bm.dataBlock = dataBlock{offset: offset, size: size} } // setKeysBlock sets the keys block reference. -func (pbm *primaryBlockMetadata) setKeysBlock(offset, size uint64) { - pbm.keysBlock = dataBlock{offset: offset, size: size} +func (bm *blockMetadata) setKeysBlock(offset, size uint64) { + bm.keysBlock = dataBlock{offset: offset, size: size} } // addTagBlock adds a tag block reference. -func (pbm *primaryBlockMetadata) addTagBlock(tagName string, offset, size uint64) { - pbm.tagsBlocks[tagName] = dataBlock{offset: offset, size: size} +func (bm *blockMetadata) addTagBlock(tagName string, offset, size uint64) { + bm.tagsBlocks[tagName] = dataBlock{offset: offset, size: size} } diff --git a/banyand/internal/sidx/metadata_test.go b/banyand/internal/sidx/metadata_test.go index 99f6c205..73346b96 100644 --- a/banyand/internal/sidx/metadata_test.go +++ b/banyand/internal/sidx/metadata_test.go @@ -117,16 +117,16 @@ func TestPartMetadata_Validation(t *testing.T) { } } -func TestPrimaryBlockMetadata_Validation(t *testing.T) { +func TestBlockMetadata_Validation(t *testing.T) { tests := []struct { - metadata *primaryBlockMetadata + metadata *blockMetadata name string errMsg string expectErr bool }{ { name: "valid block metadata", - metadata: &primaryBlockMetadata{ + metadata: &blockMetadata{ seriesID: 1, minKey: 1, maxKey: 100, @@ -138,7 +138,7 @@ func TestPrimaryBlockMetadata_Validation(t *testing.T) { }, { name: "invalid key range - minKey > maxKey", - metadata: &primaryBlockMetadata{ + metadata: &blockMetadata{ seriesID: 1, minKey: 100, maxKey: 1, @@ -151,7 +151,7 @@ func TestPrimaryBlockMetadata_Validation(t *testing.T) { }, { name: "invalid seriesID - zero", - metadata: &primaryBlockMetadata{ + metadata: &blockMetadata{ seriesID: 0, minKey: 1, maxKey: 100, @@ -164,7 +164,7 @@ func TestPrimaryBlockMetadata_Validation(t *testing.T) { }, { name: "invalid data block - zero size", - metadata: &primaryBlockMetadata{ + metadata: &blockMetadata{ seriesID: 1, minKey: 1, maxKey: 100, @@ -177,7 +177,7 @@ func TestPrimaryBlockMetadata_Validation(t *testing.T) { }, { name: "invalid keys block - zero size", - metadata: &primaryBlockMetadata{ + metadata: &blockMetadata{ seriesID: 1, minKey: 1, maxKey: 100, @@ -203,21 +203,21 @@ func TestPrimaryBlockMetadata_Validation(t *testing.T) { } } -func TestValidatePrimaryBlockMetadata(t *testing.T) { +func TestValidateBlockMetadata(t *testing.T) { tests := []struct { name string errMsg string - blocks []primaryBlockMetadata + blocks []blockMetadata expectErr bool }{ { name: "empty blocks", - blocks: []primaryBlockMetadata{}, + blocks: []blockMetadata{}, expectErr: false, }, { name: "single valid block", - blocks: []primaryBlockMetadata{ + blocks: []blockMetadata{ { seriesID: 1, minKey: 1, @@ -231,7 +231,7 @@ func TestValidatePrimaryBlockMetadata(t *testing.T) { }, { name: "properly ordered blocks by seriesID", - blocks: []primaryBlockMetadata{ + blocks: []blockMetadata{ { seriesID: 1, minKey: 1, @@ -253,7 +253,7 @@ func TestValidatePrimaryBlockMetadata(t *testing.T) { }, { name: "properly ordered blocks by key within same seriesID", - blocks: []primaryBlockMetadata{ + blocks: []blockMetadata{ { seriesID: 1, minKey: 1, @@ -275,7 +275,7 @@ func TestValidatePrimaryBlockMetadata(t *testing.T) { }, { name: "improperly ordered blocks by seriesID", - blocks: []primaryBlockMetadata{ + blocks: []blockMetadata{ { seriesID: 2, minKey: 1, @@ -298,7 +298,7 @@ func TestValidatePrimaryBlockMetadata(t *testing.T) { }, { name: "improperly ordered blocks by key within same seriesID", - blocks: []primaryBlockMetadata{ + blocks: []blockMetadata{ { seriesID: 1, minKey: 51, @@ -321,7 +321,7 @@ func TestValidatePrimaryBlockMetadata(t *testing.T) { }, { name: "overlapping key ranges within same seriesID", - blocks: []primaryBlockMetadata{ + blocks: []blockMetadata{ { seriesID: 1, minKey: 1, @@ -344,7 +344,7 @@ func TestValidatePrimaryBlockMetadata(t *testing.T) { }, { name: "adjacent key ranges within same seriesID (valid)", - blocks: []primaryBlockMetadata{ + blocks: []blockMetadata{ { seriesID: 1, minKey: 1, @@ -368,7 +368,7 @@ func TestValidatePrimaryBlockMetadata(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := validatePrimaryBlockMetadata(tt.blocks) + err := validateBlockMetadata(tt.blocks) if tt.expectErr { require.Error(t, err) assert.Contains(t, err.Error(), tt.errMsg) @@ -410,8 +410,8 @@ func TestPartMetadata_Serialization(t *testing.T) { assert.Equal(t, original.ID, restored.ID) } -func TestPrimaryBlockMetadata_Serialization(t *testing.T) { - original := &primaryBlockMetadata{ +func TestBlockMetadata_Serialization(t *testing.T) { + original := &blockMetadata{ seriesID: common.SeriesID(123), minKey: 10, maxKey: 100, @@ -430,9 +430,9 @@ func TestPrimaryBlockMetadata_Serialization(t *testing.T) { assert.NotEmpty(t, data) // Test unmarshaling - restored, err := unmarshalPrimaryBlockMetadata(data) + restored, err := unmarshalBlockMetadata(data) require.NoError(t, err) - defer releasePrimaryBlockMetadata(restored) + defer releaseBlockMetadata(restored) // Verify all fields match assert.Equal(t, original.seriesID, restored.seriesID) @@ -518,8 +518,8 @@ func TestPartMetadata_JSONFormat(t *testing.T) { assert.Contains(t, jsonStr, "maxKey") } -func TestPrimaryBlockMetadata_AccessorMethods(t *testing.T) { - pbm := &primaryBlockMetadata{ +func TestBlockMetadata_AccessorMethods(t *testing.T) { + bm := &blockMetadata{ seriesID: common.SeriesID(123), minKey: 10, maxKey: 100, @@ -531,35 +531,35 @@ func TestPrimaryBlockMetadata_AccessorMethods(t *testing.T) { } // Test accessor methods - assert.Equal(t, common.SeriesID(123), pbm.SeriesID()) - assert.Equal(t, int64(10), pbm.MinKey()) - assert.Equal(t, int64(100), pbm.MaxKey()) - assert.Equal(t, dataBlock{offset: 1000, size: 2048}, pbm.DataBlock()) - assert.Equal(t, dataBlock{offset: 3048, size: 512}, pbm.KeysBlock()) - assert.Equal(t, map[string]dataBlock{"tag1": {offset: 3560, size: 256}}, pbm.TagsBlocks()) + assert.Equal(t, common.SeriesID(123), bm.SeriesID()) + assert.Equal(t, int64(10), bm.MinKey()) + assert.Equal(t, int64(100), bm.MaxKey()) + assert.Equal(t, dataBlock{offset: 1000, size: 2048}, bm.DataBlock()) + assert.Equal(t, dataBlock{offset: 3048, size: 512}, bm.KeysBlock()) + assert.Equal(t, map[string]dataBlock{"tag1": {offset: 3560, size: 256}}, bm.TagsBlocks()) } -func TestPrimaryBlockMetadata_SetterMethods(t *testing.T) { - pbm := generatePrimaryBlockMetadata() - defer releasePrimaryBlockMetadata(pbm) +func TestBlockMetadata_SetterMethods(t *testing.T) { + bm := generateBlockMetadata() + defer releaseBlockMetadata(bm) // Test setter methods - pbm.setSeriesID(common.SeriesID(456)) - assert.Equal(t, common.SeriesID(456), pbm.seriesID) + bm.setSeriesID(common.SeriesID(456)) + assert.Equal(t, common.SeriesID(456), bm.seriesID) - pbm.setKeyRange(20, 200) - assert.Equal(t, int64(20), pbm.minKey) - assert.Equal(t, int64(200), pbm.maxKey) + bm.setKeyRange(20, 200) + assert.Equal(t, int64(20), bm.minKey) + assert.Equal(t, int64(200), bm.maxKey) - pbm.setDataBlock(2000, 4096) - assert.Equal(t, dataBlock{offset: 2000, size: 4096}, pbm.dataBlock) + bm.setDataBlock(2000, 4096) + assert.Equal(t, dataBlock{offset: 2000, size: 4096}, bm.dataBlock) - pbm.setKeysBlock(6096, 1024) - assert.Equal(t, dataBlock{offset: 6096, size: 1024}, pbm.keysBlock) + bm.setKeysBlock(6096, 1024) + assert.Equal(t, dataBlock{offset: 6096, size: 1024}, bm.keysBlock) - pbm.addTagBlock("test_tag", 7120, 512) + bm.addTagBlock("test_tag", 7120, 512) expected := dataBlock{offset: 7120, size: 512} - assert.Equal(t, expected, pbm.tagsBlocks["test_tag"]) + assert.Equal(t, expected, bm.tagsBlocks["test_tag"]) } func TestMetadata_Pooling(t *testing.T) { @@ -579,21 +579,21 @@ func TestMetadata_Pooling(t *testing.T) { releasePartMetadata(pm2) - // Test primaryBlockMetadata pooling - pbm1 := generatePrimaryBlockMetadata() - pbm1.seriesID = 456 - pbm1.minKey = 20 - pbm1.tagsBlocks["test"] = dataBlock{offset: 100, size: 200} + // Test blockMetadata pooling + bm1 := generateBlockMetadata() + bm1.seriesID = 456 + bm1.minKey = 20 + bm1.tagsBlocks["test"] = dataBlock{offset: 100, size: 200} - releasePrimaryBlockMetadata(pbm1) + releaseBlockMetadata(bm1) - pbm2 := generatePrimaryBlockMetadata() - // pbm2 should be the same instance as pbm1, but reset - assert.Equal(t, common.SeriesID(0), pbm2.seriesID) - assert.Equal(t, int64(0), pbm2.minKey) - assert.Equal(t, 0, len(pbm2.tagsBlocks)) + bm2 := generateBlockMetadata() + // bm2 should be the same instance as bm1, but reset + assert.Equal(t, common.SeriesID(0), bm2.seriesID) + assert.Equal(t, int64(0), bm2.minKey) + assert.Equal(t, 0, len(bm2.tagsBlocks)) - releasePrimaryBlockMetadata(pbm2) + releaseBlockMetadata(bm2) } func TestMetadata_Reset(t *testing.T) { @@ -618,8 +618,8 @@ func TestMetadata_Reset(t *testing.T) { assert.Equal(t, int64(0), pm.MaxKey) assert.Equal(t, uint64(0), pm.ID) - // Test primaryBlockMetadata reset - pbm := &primaryBlockMetadata{ + // Test blockMetadata reset + bm := &blockMetadata{ seriesID: 123, minKey: 10, maxKey: 100, @@ -630,12 +630,12 @@ func TestMetadata_Reset(t *testing.T) { }, } - pbm.reset() + bm.reset() - assert.Equal(t, common.SeriesID(0), pbm.seriesID) - assert.Equal(t, int64(0), pbm.minKey) - assert.Equal(t, int64(0), pbm.maxKey) - assert.Equal(t, dataBlock{}, pbm.dataBlock) - assert.Equal(t, dataBlock{}, pbm.keysBlock) - assert.Equal(t, 0, len(pbm.tagsBlocks)) + assert.Equal(t, common.SeriesID(0), bm.seriesID) + assert.Equal(t, int64(0), bm.minKey) + assert.Equal(t, int64(0), bm.maxKey) + assert.Equal(t, dataBlock{}, bm.dataBlock) + assert.Equal(t, dataBlock{}, bm.keysBlock) + assert.Equal(t, 0, len(bm.tagsBlocks)) }