This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/write in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 9063e5cd1fda79652cc7c16fbba271b48eeaae52 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Aug 20 20:46:48 2025 +0700 Refactor SIDX design and implement memory management features - Updated blockReader to read from *.td files instead of tag_*.td files for improved file handling. - Removed the blockWriter implementation from DESIGN.md and referenced the new block_writer.go for clarity. - Introduced manifest.json for part metadata, replacing the previous meta.bin for backward compatibility. - Enhanced memPart structure to support tag-based file organization and added methods for managing tag writers and flushing data. --- banyand/internal/sidx/DESIGN.md | 233 ++---------------------------------- banyand/internal/sidx/TODO.md | 207 ++++++++++++++++---------------- banyand/internal/sidx/part.go | 258 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 364 insertions(+), 334 deletions(-) diff --git a/banyand/internal/sidx/DESIGN.md b/banyand/internal/sidx/DESIGN.md index 04e2a62f..f2d9ab66 100644 --- a/banyand/internal/sidx/DESIGN.md +++ b/banyand/internal/sidx/DESIGN.md @@ -708,7 +708,7 @@ type blockReader struct { // File readers dataReader fs.Reader // Reads from data.bin keysReader fs.Reader // Reads from keys.bin - tagReaders map[string]fs.Reader // Reads from tag_*.td files + tagReaders map[string]fs.Reader // Reads from *.td files // Decoders for efficient processing decoder *encoding.BytesBlockDecoder @@ -1070,221 +1070,7 @@ func (bs *blockScanner) matchesTagFilters(block *block, index int) bool { The block writer handles efficient writing of blocks to disk: -```go -// blockWriter writes blocks to part files with compression -type blockWriter struct { - // File writers - dataWriter fs.Writer // Writes to data.bin - keysWriter fs.Writer // Writes to keys.bin - tagWriters map[string]fs.Writer // Writes to tag_*.td files - - // Compression - compressor *zstd.Compressor - - // Write tracking - bytesWritten map[string]uint64 // Track bytes written per file - - // Pool management - pooled bool -} - -var blockWriterPool = pool.Register[*blockWriter]("sidx-blockWriter") - -func generateBlockWriter() *blockWriter { - v := blockWriterPool.Get() - if v == nil { - return &blockWriter{ - tagWriters: make(map[string]fs.Writer), - bytesWritten: make(map[string]uint64), - compressor: zstd.NewCompressor(), - } - } - return v -} - -func releaseBlockWriter(bw *blockWriter) { - bw.reset() - blockWriterPool.Put(bw) -} - -// reset clears writer for reuse -func (bw *blockWriter) reset() { - bw.dataWriter = nil - bw.keysWriter = nil - - // Clear writers and tracking - for k := range bw.tagWriters { - delete(bw.tagWriters, k) - } - for k := range bw.bytesWritten { - delete(bw.bytesWritten, k) - } - - if bw.compressor != nil { - bw.compressor.Reset() - } - - bw.pooled = false -} - -// init initializes writer with part writers -func (bw *blockWriter) init(dataWriter, keysWriter fs.Writer, - tagWriters map[string]fs.Writer) { - bw.reset() - - bw.dataWriter = dataWriter - bw.keysWriter = keysWriter - - // Copy tag writers - for tagName, writer := range tagWriters { - bw.tagWriters[tagName] = writer - } -} - -// mustWriteTo writes block to files and updates metadata -func (bw *blockWriter) mustWriteTo(block *block, bm *blockMetadata) error { - if err := block.validate(); err != nil { - return fmt.Errorf("block validation failed: %w", err) - } - - bm.reset() - - // Set basic metadata - bm.minKey = block.userKeys[0] - bm.maxKey = block.userKeys[len(block.userKeys)-1] - bm.count = uint64(len(block.userKeys)) - bm.uncompressedSize = block.uncompressedSizeBytes() - - // Write user keys - if err := bw.writeUserKeys(block, bm); err != nil { - return fmt.Errorf("failed to write user keys: %w", err) - } - - // Write data payloads - if err := bw.writeData(block, bm); err != nil { - return fmt.Errorf("failed to write data: %w", err) - } - - // Write tags - if err := bw.writeTags(block, bm); err != nil { - return fmt.Errorf("failed to write tags: %w", err) - } - - return nil -} - -// writeUserKeys writes user keys to keys.bin -func (bw *blockWriter) writeUserKeys(block *block, bm *blockMetadata) error { - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - - // Encode user keys - bb.Buf = encoding.Int64ListToBytes(bb.Buf[:0], block.userKeys) - - // Track write position - offset := bw.bytesWritten["keys"] - size := uint64(len(bb.Buf)) - - // Write to file - fs.MustWriteData(bw.keysWriter.SequentialWrite(), bb.Buf) - - // Update metadata - bm.keysBlock = dataBlock{offset: offset, size: size} - bw.bytesWritten["keys"] = offset + size - - return nil -} - -// writeData writes data payloads to data.bin -func (bw *blockWriter) writeData(block *block, bm *blockMetadata) error { - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - - // Encode data payloads - bb.Buf = encoding.BytesListToBytes(bb.Buf[:0], block.data) - - // Compress data - compressed, err := bw.compressor.CompressBytes(bb.Buf) - if err != nil { - return fmt.Errorf("failed to compress data: %w", err) - } - - // Track write position - offset := bw.bytesWritten["data"] - size := uint64(len(compressed)) - - // Write to file - fs.MustWriteData(bw.dataWriter.SequentialWrite(), compressed) - - // Update metadata - bm.dataBlock = dataBlock{offset: offset, size: size} - bw.bytesWritten["data"] = offset + size - - return nil -} - -// writeTags writes tag data to tag files -func (bw *blockWriter) writeTags(block *block, bm *blockMetadata) error { - for tagName, tagData := range block.tags { - writer, ok := bw.tagWriters[tagName] - if !ok { - continue // Tag writer not available - } - - if err := bw.writeTag(tagName, tagData, writer, bm); err != nil { - return fmt.Errorf("failed to write tag %s: %w", tagName, err) - } - } - - return nil -} - -// writeTag writes a single tag's data -func (bw *blockWriter) writeTag(tagName string, tagData *tagData, - writer fs.Writer, bm *blockMetadata) error { - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - - // Encode tag values - bb.Buf = encoding.BytesListToBytes(bb.Buf[:0], tagData.values) - - // Track write position - fileKey := "tag_" + tagName - offset := bw.bytesWritten[fileKey] - size := uint64(len(bb.Buf)) - - // Write to file - fs.MustWriteData(writer.SequentialWrite(), bb.Buf) - - // Update metadata - bm.tagsBlocks[tagName] = dataBlock{offset: offset, size: size} - bw.bytesWritten[fileKey] = offset + size - - return nil -} - -// totalBytesWritten returns total bytes written across all files -func (bw *blockWriter) totalBytesWritten() uint64 { - var total uint64 - for _, bytes := range bw.bytesWritten { - total += bytes - } - return total -} - -// mustClose closes all writers -func (bw *blockWriter) mustClose() { - if bw.dataWriter != nil { - fs.MustClose(bw.dataWriter.SequentialWrite()) - } - if bw.keysWriter != nil { - fs.MustClose(bw.keysWriter.SequentialWrite()) - } - for _, writer := range bw.tagWriters { - fs.MustClose(writer.SequentialWrite()) - } -} -``` +refer to @banyand/stream/block_writer.go to implement the blockWriter. ##### Component Dependency Relationships @@ -1316,14 +1102,13 @@ func (bw *blockWriter) mustClose() { ###### Block Size Management ```go const ( - maxUncompressedBlockSize = 8 * 1024 * 1024 // 8MB uncompressed - maxPooledSliceSize = 1024 * 1024 // 1MB max pooled slice - maxPooledSliceCount = 1000 // Max pooled slice count + // maxElementsPerBlock defines the maximum number of elements per block. + maxElementsPerBlock = 8 * 1024 ) -// isFull checks if block has reached size limit +// isFull checks if block has reached element count limit. func (b *block) isFull() bool { - return b.uncompressedSizeBytes() >= maxUncompressedBlockSize + return len(b.userKeys) >= maxElementsPerBlock } ``` @@ -1367,9 +1152,9 @@ func (b *block) isFull() bool { - **primary.bin**: Contains array of blockMetadata structures - **data.bin**: Compressed user payloads with block boundaries - **keys.bin**: Int64-encoded user keys with block boundaries -- **tag_*.td**: Tag value data with type-specific encoding -- **tag_*.tm**: Tag metadata with bloom filter parameters -- **tag_*.tf**: Bloom filter data for fast tag filtering +- ***.td**: Tag value data with type-specific encoding +- ***.tm**: Tag metadata with bloom filter parameters +- ***.tf**: Bloom filter data for fast tag filtering This block architecture provides efficient, scalable storage for user-key-based data while maintaining consistency with the existing sidx design principles and production requirements. diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md index 12625a48..9244856d 100644 --- a/banyand/internal/sidx/TODO.md +++ b/banyand/internal/sidx/TODO.md @@ -10,9 +10,9 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] **Phase 4**: Memory Management (4 tasks) - [ ] **Phase 5**: Snapshot Management (4 tasks) - [ ] **Phase 6**: Write Path (4 tasks) -- [ ] **Phase 7**: Flush Operations (4 tasks) -- [ ] **Phase 8**: Merge Operations (4 tasks) -- [ ] **Phase 9**: Query Path (5 tasks) +- [ ] **Phase 7**: Query Path (5 tasks) +- [ ] **Phase 8**: Flush Operations (4 tasks) +- [ ] **Phase 9**: Merge Operations (4 tasks) - [ ] **Phase 10**: Resource Management (3 tasks) - [ ] **Phase 11**: Error Handling (3 tasks) - [ ] **Phase 12**: Testing (4 tasks) @@ -76,7 +76,7 @@ This document tracks the implementation progress of the Secondary Index File Sys ### 1.5 Part Structure (`part.go`) ✅ - [x] File readers for primary.bin, data.bin, keys.bin, meta.bin -- [x] Individual tag file readers (tag_*.td, tag_*.tm, tag_*.tf) +- [x] Individual tag file readers (*.td, *.tm, *.tf) - [x] Part opening/closing lifecycle - [x] **Test Cases**: - [x] File lifecycle management @@ -204,19 +204,19 @@ This document tracks the implementation progress of the Secondary Index File Sys ## Phase 4: Memory Management -### 4.1 MemPart Implementation (`mempart.go`) -- [ ] In-memory buffer before flushing to disk -- [ ] Element accumulation with size tracking -- [ ] Memory usage monitoring -- [ ] **Test Cases**: - - [ ] Memory part creation and lifecycle - - [ ] Size limits enforcement - - [ ] Element addition and retrieval - - [ ] Memory usage tracking accuracy +### 4.1 MemPart Implementation (`mempart.go`) ✅ +- [x] In-memory buffer before flushing to disk +- [x] Element accumulation with size tracking +- [x] Memory usage monitoring +- [x] **Test Cases**: + - [x] Memory part creation and lifecycle + - [x] Size limits enforcement + - [x] Element addition and retrieval + - [x] Memory usage tracking accuracy ### 4.2 Block Writer (`block_writer.go`) 🔥 - DESIGN COMPLETED ✅ - [ ] **Complete block writer design added to DESIGN.md** -- [ ] **Multi-file writing**: data.bin, keys.bin, tag_*.td files +- [ ] **Multi-file writing**: meta.bin, primary.bin data.bin, keys.bin, *.td, *.tf, *.tm files - [ ] **Compression**: zstd compression for data payloads - [ ] **Write tracking**: Track bytes written per file - [ ] **Memory management**: Object pooling with reset() methods @@ -342,7 +342,7 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] **Complete block building design added to DESIGN.md** - [ ] **Element organization**: Sort elements by seriesID then userKey - [ ] **Block creation**: mustInitFromElements() with sorted elements -- [ ] **Size management**: maxUncompressedBlockSize limits +- [ ] **Size management**: maxElementsPerBlock limits - [ ] **Memory efficiency**: Object pooling and resource management - [ ] **Implementation Tasks**: - [ ] Integrate block building into write path @@ -357,9 +357,79 @@ This document tracks the implementation progress of the Secondary Index File Sys --- -## Phase 7: Flush Operations +## Phase 7: Query Path + +### 7.1 Query Interface (`query.go`) +- [ ] Key range queries with tag filters +- [ ] Support projections and result limits +- [ ] Query validation and optimization +- [ ] **Test Cases**: + - [ ] Query parsing handles all parameter types + - [ ] Validation rejects invalid queries + - [ ] Query optimization improves performance + - [ ] Complex queries return correct results + +### 7.2 Part Filtering (`query.go`) +- [ ] Filter parts by key range overlap +- [ ] Minimize I/O operations through smart filtering +- [ ] Support inclusive/exclusive bounds +- [ ] **Test Cases**: + - [ ] Filtering accuracy eliminates non-overlapping parts + - [ ] Performance improvement through reduced I/O + - [ ] Boundary conditions handled correctly + - [ ] Empty result sets handled gracefully + +### 7.3 Block Scanner (`block_scanner.go`) 🔥 - DESIGN COMPLETED ✅ +- [ ] **Complete block scanner design added to DESIGN.md** +- [ ] **Query processing**: scanBlock() with range and tag filtering +- [ ] **Memory management**: Object pooling with reset() methods +- [ ] **Efficient filtering**: Quick checks before loading block data +- [ ] **Element matching**: scanBlockElements() with tag filter matching +- [ ] **Resource management**: Block reader and temporary block handling +- [ ] **Implementation Tasks**: + - [ ] Create block_scanner.go with scanner structure + - [ ] Implement scanBlock() with filtering logic + - [ ] Add scanBlockElements() for element processing + - [ ] Create matchesTagFilters() for tag filtering +- [ ] **Test Cases**: + - [ ] Scan completeness finds all matching data + - [ ] Filter effectiveness reduces false positives + - [ ] Block scanning performance meets targets + - [ ] Memory usage during scanning is controlled + +### 7.4 Result Iterator (`query.go`) +- [ ] Stream results with proper ordering +- [ ] Memory-efficient iteration patterns +- [ ] Support both ASC and DESC ordering +- [ ] **Test Cases**: + - [ ] Iterator correctness for various query types + - [ ] Memory usage remains bounded + - [ ] Ordering is maintained across parts + - [ ] Iterator cleanup prevents resource leaks + +### 7.5 Block Reader (`block_reader.go`) 🔥 - DESIGN COMPLETED ✅ +- [x] **Complete block reader design added to DESIGN.md** +- [x] **Multi-file reading**: data.bin, keys.bin, *.td files +- [x] **Decompression**: zstd decompression for data payloads +- [x] **Memory management**: Object pooling with reset() methods +- [x] **Selective loading**: Tag projection for efficient I/O +- [x] **Block reconstruction**: mustReadFrom() for complete block loading +- [ ] **Implementation Tasks**: + - [ ] Create block_reader.go with reader structure + - [ ] Implement readUserKeys(), readData(), readTags() + - [ ] Add decompression support + - [ ] Create mustReadFrom() for block loading +- [ ] **Test Cases**: + - [ ] Block reading maintains data integrity + - [ ] Decompression works correctly + - [ ] Block structure reconstruction is accurate + - [ ] Read performance meets requirements + +--- + +## Phase 8: Flush Operations -### 7.1 Flusher Interface (`flusher.go`) +### 8.1 Flusher Interface (`flusher.go`) - [ ] Simple Flush() method for user control - [ ] Internal part selection logic - [ ] Error handling and retry mechanisms @@ -369,7 +439,7 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Error handling for flush failures - [ ] Concurrent flush operations are handled safely -### 7.2 Flush to Disk (`flusher.go`) +### 8.2 Flush to Disk (`flusher.go`) - [ ] Create part directories with epoch names - [ ] Write all part files atomically - [ ] Implement crash recovery mechanisms @@ -379,7 +449,7 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Crash recovery restores consistent state - [ ] Disk space management during flush -### 7.3 Tag File Writing (`flusher.go`) +### 8.3 Tag File Writing (`flusher.go`) - [ ] Write individual tag files (not families) - [ ] Generate bloom filters for indexed tags - [ ] Optimize file layout for query performance @@ -389,7 +459,7 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] File format compatibility - [ ] Performance of tag file generation -### 7.4 Block Serialization (`flusher.go` + `block_writer.go`) 🔥 - DESIGN COMPLETED ✅ +### 8.4 Block Serialization (`flusher.go` + `block_writer.go`) 🔥 - DESIGN COMPLETED ✅ - [ ] **Complete block serialization design added to DESIGN.md** - [ ] **Multi-file output**: primary.bin, data.bin, keys.bin, tag files - [ ] **Block writer integration**: mustWriteTo() for block persistence @@ -408,9 +478,9 @@ This document tracks the implementation progress of the Secondary Index File Sys --- -## Phase 8: Merge Operations +## Phase 9: Merge Operations -### 8.1 Merger Interface (`merger.go`) +### 9.1 Merger Interface (`merger.go`) - [ ] Simple Merge() method for user control - [ ] Internal merge strategy implementation - [ ] Resource management during merge @@ -420,7 +490,7 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Resource usage during merge operations - [ ] Concurrent merge safety -### 8.2 Part Selection (`merger.go`) +### 9.2 Part Selection (`merger.go`) - [ ] Select parts by size/age criteria - [ ] Avoid merging recent parts - [ ] Optimize merge efficiency @@ -430,7 +500,7 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Selection criteria can be tuned - [ ] Selection performance is acceptable -### 8.3 Merged Part Writer (`merger.go`) +### 9.3 Merged Part Writer (`merger.go`) - [ ] Combine parts maintaining key order - [ ] Deduplicate overlapping data - [ ] Generate merged part metadata @@ -440,7 +510,7 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Key ordering is maintained across parts - [ ] Merged part metadata is accurate -### 8.4 Block Merging (`merger.go` + `block.go`) 🔥 - DESIGN COMPLETED ✅ +### 9.4 Block Merging (`merger.go` + `block.go`) 🔥 - DESIGN COMPLETED ✅ - [ ] **Complete block merging design added to DESIGN.md** - [ ] **Block reader integration**: Read blocks from multiple parts - [ ] **Merge strategy**: Maintain key ordering across merged blocks @@ -459,76 +529,6 @@ This document tracks the implementation progress of the Secondary Index File Sys --- -## Phase 9: Query Path - -### 9.1 Query Interface (`query.go`) -- [ ] Key range queries with tag filters -- [ ] Support projections and result limits -- [ ] Query validation and optimization -- [ ] **Test Cases**: - - [ ] Query parsing handles all parameter types - - [ ] Validation rejects invalid queries - - [ ] Query optimization improves performance - - [ ] Complex queries return correct results - -### 9.2 Part Filtering (`query.go`) -- [ ] Filter parts by key range overlap -- [ ] Minimize I/O operations through smart filtering -- [ ] Support inclusive/exclusive bounds -- [ ] **Test Cases**: - - [ ] Filtering accuracy eliminates non-overlapping parts - - [ ] Performance improvement through reduced I/O - - [ ] Boundary conditions handled correctly - - [ ] Empty result sets handled gracefully - -### 9.3 Block Scanner (`block_scanner.go`) 🔥 - DESIGN COMPLETED ✅ -- [ ] **Complete block scanner design added to DESIGN.md** -- [ ] **Query processing**: scanBlock() with range and tag filtering -- [ ] **Memory management**: Object pooling with reset() methods -- [ ] **Efficient filtering**: Quick checks before loading block data -- [ ] **Element matching**: scanBlockElements() with tag filter matching -- [ ] **Resource management**: Block reader and temporary block handling -- [ ] **Implementation Tasks**: - - [ ] Create block_scanner.go with scanner structure - - [ ] Implement scanBlock() with filtering logic - - [ ] Add scanBlockElements() for element processing - - [ ] Create matchesTagFilters() for tag filtering -- [ ] **Test Cases**: - - [ ] Scan completeness finds all matching data - - [ ] Filter effectiveness reduces false positives - - [ ] Block scanning performance meets targets - - [ ] Memory usage during scanning is controlled - -### 9.4 Result Iterator (`query.go`) -- [ ] Stream results with proper ordering -- [ ] Memory-efficient iteration patterns -- [ ] Support both ASC and DESC ordering -- [ ] **Test Cases**: - - [ ] Iterator correctness for various query types - - [ ] Memory usage remains bounded - - [ ] Ordering is maintained across parts - - [ ] Iterator cleanup prevents resource leaks - -### 9.5 Block Reader (`block_reader.go`) 🔥 - DESIGN COMPLETED ✅ -- [x] **Complete block reader design added to DESIGN.md** -- [x] **Multi-file reading**: data.bin, keys.bin, tag_*.td files -- [x] **Decompression**: zstd decompression for data payloads -- [x] **Memory management**: Object pooling with reset() methods -- [x] **Selective loading**: Tag projection for efficient I/O -- [x] **Block reconstruction**: mustReadFrom() for complete block loading -- [ ] **Implementation Tasks**: - - [ ] Create block_reader.go with reader structure - - [ ] Implement readUserKeys(), readData(), readTags() - - [ ] Add decompression support - - [ ] Create mustReadFrom() for block loading -- [ ] **Test Cases**: - - [ ] Block reading maintains data integrity - - [ ] Decompression works correctly - - [ ] Block structure reconstruction is accurate - - [ ] Read performance meets requirements - ---- - ## Phase 10: Resource Management ### 10.1 Disk Reservation (`resource_manager.go`) @@ -713,10 +713,10 @@ The `block.go` file is central to the SIDX implementation and is used in multipl 2. **Phase 2.2**: Block writer uses block for serialization 3. **Phase 2.4**: Block initialization from elements 4. **Phase 4.4**: Create blocks when memory threshold reached -5. **Phase 7.4**: Serialize blocks to disk during flush -6. **Phase 8.4**: Merge blocks from multiple parts -7. **Phase 9.3**: Block scanner reads blocks during queries -8. **Phase 9.5**: Block reader deserializes blocks +5. **Phase 7.3**: Block scanner reads blocks during queries +6. **Phase 7.5**: Block reader deserializes blocks +7. **Phase 8.4**: Serialize blocks to disk during flush +8. **Phase 9.4**: Merge blocks from multiple parts 9. **Phase 12.1**: Unit tests for block operations 10. **Phase 12.3**: Performance benchmarks for block operations @@ -727,9 +727,12 @@ The `block.go` file is central to the SIDX implementation and is used in multipl - **Phase 1** must complete before **Phase 2** (data structures needed) - **Phase 2** must complete before **Phase 4** (memory management needed for writes) - **Phase 3** must complete before **Phase 4** (snapshot management needed) -- **Phase 4** must complete before **Phase 5** (write path needed for flush) -- **Phase 5** must complete before **Phase 6** (flush needed for merge) -- **Phase 1-6** must complete before **Phase 9** (all components needed for queries) +- **Phase 4** must complete before **Phase 5** (memory management needed for snapshot) +- **Phase 5** must complete before **Phase 6** (snapshot management needed for write path) +- **Phase 6** must complete before **Phase 7** (write path needed for queries) +- **Phase 7** can be developed independently (queries work with existing persisted data) +- **Phase 6** must complete before **Phase 8** (write path needed for flush) +- **Phase 8** must complete before **Phase 9** (flush needed for merge) - **Phase 10-11** can be developed in parallel with other phases - **Phase 12** requires completion of relevant phases for testing diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go index aab32a06..f23e4d72 100644 --- a/banyand/internal/sidx/part.go +++ b/banyand/internal/sidx/part.go @@ -18,20 +18,26 @@ package sidx import ( + "encoding/json" "fmt" "path/filepath" + "sort" "strings" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) const ( // Standard file names for sidx parts. - primaryFilename = "primary.bin" - dataFilename = "data.bin" - keysFilename = "keys.bin" - metaFilename = "meta.bin" + primaryFilename = "primary.bin" + dataFilename = "data.bin" + keysFilename = "keys.bin" + metaFilename = "meta.bin" + manifestFilename = "manifest.json" // Tag file extensions. tagDataExtension = ".td" // <name>.td files @@ -93,9 +99,23 @@ func mustOpenPart(path string, fileSystem fs.FileSystem) *part { return p } -// loadPartMetadata reads and parses the part metadata from meta.bin. +// loadPartMetadata reads and parses the part metadata from manifest.json. func (p *part) loadPartMetadata() error { - // Read the entire meta.bin file. + // First try to read from manifest.json (new format) + manifestPath := filepath.Join(p.path, manifestFilename) + manifestData, err := p.fileSystem.Read(manifestPath) + if err == nil { + // Parse JSON manifest + pm := generatePartMetadata() + if err := json.Unmarshal(manifestData, pm); err != nil { + releasePartMetadata(pm) + return fmt.Errorf("failed to unmarshal manifest.json: %w", err) + } + p.partMetadata = pm + return nil + } + + // Fallback to meta.bin for backward compatibility metaData, err := p.fileSystem.Read(filepath.Join(p.path, metaFilename)) if err != nil { return fmt.Errorf("failed to read meta.bin: %w", err) @@ -253,9 +273,9 @@ func mustOpenReader(filePath string, fileSystem fs.FileSystem) fs.Reader { // String returns a string representation of the part. func (p *part) String() string { if p.partMetadata != nil { - return fmt.Sprintf("part %d at %s", p.partMetadata.ID, p.path) + return fmt.Sprintf("sidx part %d at %s", p.partMetadata.ID, p.path) } - return fmt.Sprintf("part at %s", p.path) + return fmt.Sprintf("sidx part at %s", p.path) } // getPartMetadata returns the part metadata. @@ -322,3 +342,225 @@ func (p *part) hasTagFiles(tagName string) bool { func (p *part) Path() string { return p.path } + +// memPart represents an in-memory part for SIDX with tag-based file design. +// This structure mirrors the stream module's memPart but uses user keys instead of timestamps. +type memPart struct { + // Tag storage with individual tag buffers (tag-based design) + tagMetadata map[string]*bytes.Buffer // Individual tag metadata buffers + tagData map[string]*bytes.Buffer // Individual tag data buffers + tagFilters map[string]*bytes.Buffer // Individual tag filter buffers + + // Core data buffers + meta bytes.Buffer // Part metadata + primary bytes.Buffer // Block metadata + data bytes.Buffer // User data payloads + keys bytes.Buffer // User-provided int64 keys + + partMetadata *partMetadata +} + +// mustCreateTagWriters creates writers for individual tag files. +// Returns metadata writer, data writer, and filter writer for the specified tag. +func (mp *memPart) mustCreateTagWriters(tagName string) (fs.Writer, fs.Writer, fs.Writer) { + if mp.tagData == nil { + mp.tagData = make(map[string]*bytes.Buffer) + mp.tagMetadata = make(map[string]*bytes.Buffer) + mp.tagFilters = make(map[string]*bytes.Buffer) + } + + // Get or create buffers for this tag + td, tdExists := mp.tagData[tagName] + tm := mp.tagMetadata[tagName] + tf := mp.tagFilters[tagName] + + if tdExists { + td.Reset() + tm.Reset() + tf.Reset() + return tm, td, tf + } + + // Create new buffers for this tag + mp.tagData[tagName] = &bytes.Buffer{} + mp.tagMetadata[tagName] = &bytes.Buffer{} + mp.tagFilters[tagName] = &bytes.Buffer{} + + return mp.tagMetadata[tagName], mp.tagData[tagName], mp.tagFilters[tagName] +} + +// reset clears the memory part for reuse. +func (mp *memPart) reset() { + if mp.partMetadata != nil { + mp.partMetadata.reset() + } + mp.meta.Reset() + mp.primary.Reset() + mp.data.Reset() + mp.keys.Reset() + + if mp.tagData != nil { + for k, td := range mp.tagData { + td.Reset() + delete(mp.tagData, k) + } + } + if mp.tagMetadata != nil { + for k, tm := range mp.tagMetadata { + tm.Reset() + delete(mp.tagMetadata, k) + } + } + if mp.tagFilters != nil { + for k, tf := range mp.tagFilters { + tf.Reset() + delete(mp.tagFilters, k) + } + } +} + +// mustInitFromElements initializes the memory part from sorted elements. +// This method will be completed when blockWriter is implemented. +func (mp *memPart) mustInitFromElements(es *elements) { + mp.reset() + + if len(es.userKeys) == 0 { + return + } + + // Sort elements by seriesID first, then by user key + sort.Sort(es) + + // TODO: Initialize using blockWriter when implemented in 4.2 + // For now, we prepare the structure + + // Set basic metadata + if mp.partMetadata == nil { + mp.partMetadata = generatePartMetadata() + } + mp.partMetadata.TotalCount = uint64(len(es.userKeys)) + mp.partMetadata.MinKey = es.userKeys[0] + mp.partMetadata.MaxKey = es.userKeys[len(es.userKeys)-1] +} + +// mustFlush flushes the memory part to disk with tag-based file organization. +func (mp *memPart) mustFlush(fileSystem fs.FileSystem, partPath string) { + fileSystem.MkdirPanicIfExist(partPath, storage.DirPerm) + + // Write core files + fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(partPath, metaFilename), storage.FilePerm) + fs.MustFlush(fileSystem, mp.primary.Buf, filepath.Join(partPath, primaryFilename), storage.FilePerm) + fs.MustFlush(fileSystem, mp.data.Buf, filepath.Join(partPath, dataFilename), storage.FilePerm) + fs.MustFlush(fileSystem, mp.keys.Buf, filepath.Join(partPath, keysFilename), storage.FilePerm) + + // Write individual tag files + for tagName, td := range mp.tagData { + fs.MustFlush(fileSystem, td.Buf, filepath.Join(partPath, tagName+tagDataExtension), storage.FilePerm) + } + for tagName, tm := range mp.tagMetadata { + fs.MustFlush(fileSystem, tm.Buf, filepath.Join(partPath, tagName+tagMetadataExtension), storage.FilePerm) + } + for tagName, tf := range mp.tagFilters { + fs.MustFlush(fileSystem, tf.Buf, filepath.Join(partPath, tagName+tagFilterExtension), storage.FilePerm) + } + + // Write part metadata manifest + if mp.partMetadata != nil { + manifestData, err := mp.partMetadata.marshal() + if err != nil { + logger.GetLogger().Panic().Err(err).Str("path", partPath).Msg("failed to marshal part metadata") + } + fs.MustFlush(fileSystem, manifestData, filepath.Join(partPath, manifestFilename), storage.FilePerm) + } + + fileSystem.SyncPath(partPath) +} + +// generateMemPart gets memPart from pool or creates new. +func generateMemPart() *memPart { + v := memPartPool.Get() + if v == nil { + return &memPart{} + } + return v +} + +// releaseMemPart returns memPart to pool after reset. +func releaseMemPart(mp *memPart) { + mp.reset() + memPartPool.Put(mp) +} + +var memPartPool = pool.Register[*memPart]("sidx-memPart") + +// openMemPart creates a part from a memory part. +func openMemPart(mp *memPart) *part { + p := &part{} + if mp.partMetadata != nil { + // Copy part metadata + p.partMetadata = generatePartMetadata() + *p.partMetadata = *mp.partMetadata + } + + // TODO: Read block metadata when blockWriter is implemented + // p.blockMetadata = mustReadBlockMetadata(p.blockMetadata[:0], &mp.primary) + + // Open data files as readers from memory buffers + p.primary = &mp.primary + p.data = &mp.data + p.keys = &mp.keys + + // Open individual tag files if they exist + if mp.tagData != nil { + p.tagData = make(map[string]fs.Reader) + p.tagMetadata = make(map[string]fs.Reader) + p.tagFilters = make(map[string]fs.Reader) + + for tagName, td := range mp.tagData { + p.tagData[tagName] = td + if tm, exists := mp.tagMetadata[tagName]; exists { + p.tagMetadata[tagName] = tm + } + if tf, exists := mp.tagFilters[tagName]; exists { + p.tagFilters[tagName] = tf + } + } + } + return p +} + +// uncompressedElementSizeBytes calculates the uncompressed size of an element. +// This is a utility function similar to the stream module. +func uncompressedElementSizeBytes(index int, es *elements) uint64 { + // 8 bytes for user key + // 8 bytes for elementID + n := uint64(8 + 8) + + // Add data payload size + if index < len(es.data) && es.data[index] != nil { + n += uint64(len(es.data[index])) + } + + // Add tag sizes + if index < len(es.tags) { + for _, tag := range es.tags[index] { + n += uint64(len(tag.name)) + if tag.value != nil { + n += uint64(len(tag.value)) + } + } + } + + return n +} + +// partPath returns the path for a part with the given epoch. +func partPath(root string, epoch uint64) string { + return filepath.Join(root, partName(epoch)) +} + +// partName returns the directory name for a part with the given epoch. +// Uses 16-character hex format consistent with stream module. +func partName(epoch uint64) string { + return fmt.Sprintf("%016x", epoch) +}