This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/write
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 9063e5cd1fda79652cc7c16fbba271b48eeaae52
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Aug 20 20:46:48 2025 +0700

    Refactor SIDX design and implement memory management features
    
    - Updated blockReader to read from *.td files instead of tag_*.td files for 
improved file handling.
    - Removed the blockWriter implementation from DESIGN.md and referenced the 
new block_writer.go for clarity.
    - Introduced manifest.json for part metadata, replacing the previous 
meta.bin for backward compatibility.
    - Enhanced memPart structure to support tag-based file organization and 
added methods for managing tag writers and flushing data.
---
 banyand/internal/sidx/DESIGN.md | 233 ++----------------------------------
 banyand/internal/sidx/TODO.md   | 207 ++++++++++++++++----------------
 banyand/internal/sidx/part.go   | 258 ++++++++++++++++++++++++++++++++++++++--
 3 files changed, 364 insertions(+), 334 deletions(-)

diff --git a/banyand/internal/sidx/DESIGN.md b/banyand/internal/sidx/DESIGN.md
index 04e2a62f..f2d9ab66 100644
--- a/banyand/internal/sidx/DESIGN.md
+++ b/banyand/internal/sidx/DESIGN.md
@@ -708,7 +708,7 @@ type blockReader struct {
     // File readers
     dataReader fs.Reader      // Reads from data.bin
     keysReader fs.Reader      // Reads from keys.bin
-    tagReaders map[string]fs.Reader // Reads from tag_*.td files
+    tagReaders map[string]fs.Reader // Reads from *.td files
     
     // Decoders for efficient processing
     decoder    *encoding.BytesBlockDecoder
@@ -1070,221 +1070,7 @@ func (bs *blockScanner) matchesTagFilters(block *block, 
index int) bool {
 
 The block writer handles efficient writing of blocks to disk:
 
-```go
-// blockWriter writes blocks to part files with compression
-type blockWriter struct {
-    // File writers
-    dataWriter fs.Writer                // Writes to data.bin
-    keysWriter fs.Writer                // Writes to keys.bin
-    tagWriters map[string]fs.Writer     // Writes to tag_*.td files
-    
-    // Compression
-    compressor *zstd.Compressor
-    
-    // Write tracking
-    bytesWritten map[string]uint64  // Track bytes written per file
-    
-    // Pool management
-    pooled       bool
-}
-
-var blockWriterPool = pool.Register[*blockWriter]("sidx-blockWriter")
-
-func generateBlockWriter() *blockWriter {
-    v := blockWriterPool.Get()
-    if v == nil {
-        return &blockWriter{
-            tagWriters:   make(map[string]fs.Writer),
-            bytesWritten: make(map[string]uint64),
-            compressor:   zstd.NewCompressor(),
-        }
-    }
-    return v
-}
-
-func releaseBlockWriter(bw *blockWriter) {
-    bw.reset()
-    blockWriterPool.Put(bw)
-}
-
-// reset clears writer for reuse
-func (bw *blockWriter) reset() {
-    bw.dataWriter = nil
-    bw.keysWriter = nil
-    
-    // Clear writers and tracking
-    for k := range bw.tagWriters {
-        delete(bw.tagWriters, k)
-    }
-    for k := range bw.bytesWritten {
-        delete(bw.bytesWritten, k)
-    }
-    
-    if bw.compressor != nil {
-        bw.compressor.Reset()
-    }
-    
-    bw.pooled = false
-}
-
-// init initializes writer with part writers
-func (bw *blockWriter) init(dataWriter, keysWriter fs.Writer,
-                           tagWriters map[string]fs.Writer) {
-    bw.reset()
-    
-    bw.dataWriter = dataWriter
-    bw.keysWriter = keysWriter
-    
-    // Copy tag writers
-    for tagName, writer := range tagWriters {
-        bw.tagWriters[tagName] = writer
-    }
-}
-
-// mustWriteTo writes block to files and updates metadata
-func (bw *blockWriter) mustWriteTo(block *block, bm *blockMetadata) error {
-    if err := block.validate(); err != nil {
-        return fmt.Errorf("block validation failed: %w", err)
-    }
-    
-    bm.reset()
-    
-    // Set basic metadata
-    bm.minKey = block.userKeys[0]
-    bm.maxKey = block.userKeys[len(block.userKeys)-1]
-    bm.count = uint64(len(block.userKeys))
-    bm.uncompressedSize = block.uncompressedSizeBytes()
-    
-    // Write user keys
-    if err := bw.writeUserKeys(block, bm); err != nil {
-        return fmt.Errorf("failed to write user keys: %w", err)
-    }
-    
-    // Write data payloads
-    if err := bw.writeData(block, bm); err != nil {
-        return fmt.Errorf("failed to write data: %w", err)
-    }
-    
-    // Write tags
-    if err := bw.writeTags(block, bm); err != nil {
-        return fmt.Errorf("failed to write tags: %w", err)
-    }
-    
-    return nil
-}
-
-// writeUserKeys writes user keys to keys.bin
-func (bw *blockWriter) writeUserKeys(block *block, bm *blockMetadata) error {
-    bb := bigValuePool.Generate()
-    defer bigValuePool.Release(bb)
-    
-    // Encode user keys
-    bb.Buf = encoding.Int64ListToBytes(bb.Buf[:0], block.userKeys)
-    
-    // Track write position
-    offset := bw.bytesWritten["keys"]
-    size := uint64(len(bb.Buf))
-    
-    // Write to file
-    fs.MustWriteData(bw.keysWriter.SequentialWrite(), bb.Buf)
-    
-    // Update metadata
-    bm.keysBlock = dataBlock{offset: offset, size: size}
-    bw.bytesWritten["keys"] = offset + size
-    
-    return nil
-}
-
-// writeData writes data payloads to data.bin
-func (bw *blockWriter) writeData(block *block, bm *blockMetadata) error {
-    bb := bigValuePool.Generate()
-    defer bigValuePool.Release(bb)
-    
-    // Encode data payloads
-    bb.Buf = encoding.BytesListToBytes(bb.Buf[:0], block.data)
-    
-    // Compress data
-    compressed, err := bw.compressor.CompressBytes(bb.Buf)
-    if err != nil {
-        return fmt.Errorf("failed to compress data: %w", err)
-    }
-    
-    // Track write position
-    offset := bw.bytesWritten["data"]
-    size := uint64(len(compressed))
-    
-    // Write to file
-    fs.MustWriteData(bw.dataWriter.SequentialWrite(), compressed)
-    
-    // Update metadata
-    bm.dataBlock = dataBlock{offset: offset, size: size}
-    bw.bytesWritten["data"] = offset + size
-    
-    return nil
-}
-
-// writeTags writes tag data to tag files
-func (bw *blockWriter) writeTags(block *block, bm *blockMetadata) error {
-    for tagName, tagData := range block.tags {
-        writer, ok := bw.tagWriters[tagName]
-        if !ok {
-            continue // Tag writer not available
-        }
-        
-        if err := bw.writeTag(tagName, tagData, writer, bm); err != nil {
-            return fmt.Errorf("failed to write tag %s: %w", tagName, err)
-        }
-    }
-    
-    return nil
-}
-
-// writeTag writes a single tag's data
-func (bw *blockWriter) writeTag(tagName string, tagData *tagData,
-                               writer fs.Writer, bm *blockMetadata) error {
-    bb := bigValuePool.Generate()
-    defer bigValuePool.Release(bb)
-    
-    // Encode tag values
-    bb.Buf = encoding.BytesListToBytes(bb.Buf[:0], tagData.values)
-    
-    // Track write position
-    fileKey := "tag_" + tagName
-    offset := bw.bytesWritten[fileKey]
-    size := uint64(len(bb.Buf))
-    
-    // Write to file
-    fs.MustWriteData(writer.SequentialWrite(), bb.Buf)
-    
-    // Update metadata
-    bm.tagsBlocks[tagName] = dataBlock{offset: offset, size: size}
-    bw.bytesWritten[fileKey] = offset + size
-    
-    return nil
-}
-
-// totalBytesWritten returns total bytes written across all files
-func (bw *blockWriter) totalBytesWritten() uint64 {
-    var total uint64
-    for _, bytes := range bw.bytesWritten {
-        total += bytes
-    }
-    return total
-}
-
-// mustClose closes all writers
-func (bw *blockWriter) mustClose() {
-    if bw.dataWriter != nil {
-        fs.MustClose(bw.dataWriter.SequentialWrite())
-    }
-    if bw.keysWriter != nil {
-        fs.MustClose(bw.keysWriter.SequentialWrite())
-    }
-    for _, writer := range bw.tagWriters {
-        fs.MustClose(writer.SequentialWrite())
-    }
-}
-```
+refer to @banyand/stream/block_writer.go to implement the blockWriter. 
 
 ##### Component Dependency Relationships
 
@@ -1316,14 +1102,13 @@ func (bw *blockWriter) mustClose() {
 ###### Block Size Management
 ```go
 const (
-    maxUncompressedBlockSize = 8 * 1024 * 1024  // 8MB uncompressed
-    maxPooledSliceSize       = 1024 * 1024      // 1MB max pooled slice
-    maxPooledSliceCount      = 1000             // Max pooled slice count
+    // maxElementsPerBlock defines the maximum number of elements per block.
+       maxElementsPerBlock = 8 * 1024
 )
 
-// isFull checks if block has reached size limit
+// isFull checks if block has reached element count limit.
 func (b *block) isFull() bool {
-    return b.uncompressedSizeBytes() >= maxUncompressedBlockSize
+       return len(b.userKeys) >= maxElementsPerBlock
 }
 ```
 
@@ -1367,9 +1152,9 @@ func (b *block) isFull() bool {
 - **primary.bin**: Contains array of blockMetadata structures
 - **data.bin**: Compressed user payloads with block boundaries
 - **keys.bin**: Int64-encoded user keys with block boundaries
-- **tag_*.td**: Tag value data with type-specific encoding
-- **tag_*.tm**: Tag metadata with bloom filter parameters
-- **tag_*.tf**: Bloom filter data for fast tag filtering
+- ***.td**: Tag value data with type-specific encoding
+- ***.tm**: Tag metadata with bloom filter parameters
+- ***.tf**: Bloom filter data for fast tag filtering
 
 This block architecture provides efficient, scalable storage for 
user-key-based data while maintaining consistency with the existing sidx design 
principles and production requirements.
 
diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md
index 12625a48..9244856d 100644
--- a/banyand/internal/sidx/TODO.md
+++ b/banyand/internal/sidx/TODO.md
@@ -10,9 +10,9 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 - [ ] **Phase 4**: Memory Management (4 tasks) 
 - [ ] **Phase 5**: Snapshot Management (4 tasks)
 - [ ] **Phase 6**: Write Path (4 tasks)
-- [ ] **Phase 7**: Flush Operations (4 tasks)
-- [ ] **Phase 8**: Merge Operations (4 tasks)
-- [ ] **Phase 9**: Query Path (5 tasks)
+- [ ] **Phase 7**: Query Path (5 tasks)
+- [ ] **Phase 8**: Flush Operations (4 tasks)
+- [ ] **Phase 9**: Merge Operations (4 tasks)
 - [ ] **Phase 10**: Resource Management (3 tasks)
 - [ ] **Phase 11**: Error Handling (3 tasks)
 - [ ] **Phase 12**: Testing (4 tasks)
@@ -76,7 +76,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 
 ### 1.5 Part Structure (`part.go`) ✅
 - [x] File readers for primary.bin, data.bin, keys.bin, meta.bin
-- [x] Individual tag file readers (tag_*.td, tag_*.tm, tag_*.tf)
+- [x] Individual tag file readers (*.td, *.tm, *.tf)
 - [x] Part opening/closing lifecycle
 - [x] **Test Cases**:
   - [x] File lifecycle management
@@ -204,19 +204,19 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 
 ## Phase 4: Memory Management
 
-### 4.1 MemPart Implementation (`mempart.go`)
-- [ ] In-memory buffer before flushing to disk
-- [ ] Element accumulation with size tracking
-- [ ] Memory usage monitoring
-- [ ] **Test Cases**:
-  - [ ] Memory part creation and lifecycle
-  - [ ] Size limits enforcement
-  - [ ] Element addition and retrieval
-  - [ ] Memory usage tracking accuracy
+### 4.1 MemPart Implementation (`mempart.go`) ✅
+- [x] In-memory buffer before flushing to disk
+- [x] Element accumulation with size tracking
+- [x] Memory usage monitoring
+- [x] **Test Cases**:
+  - [x] Memory part creation and lifecycle
+  - [x] Size limits enforcement
+  - [x] Element addition and retrieval
+  - [x] Memory usage tracking accuracy
 
 ### 4.2 Block Writer (`block_writer.go`) 🔥 - DESIGN COMPLETED ✅
 - [ ] **Complete block writer design added to DESIGN.md**
-- [ ] **Multi-file writing**: data.bin, keys.bin, tag_*.td files
+- [ ] **Multi-file writing**: meta.bin, primary.bin data.bin, keys.bin, *.td, 
*.tf, *.tm files
 - [ ] **Compression**: zstd compression for data payloads
 - [ ] **Write tracking**: Track bytes written per file
 - [ ] **Memory management**: Object pooling with reset() methods
@@ -342,7 +342,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 - [ ] **Complete block building design added to DESIGN.md**
 - [ ] **Element organization**: Sort elements by seriesID then userKey
 - [ ] **Block creation**: mustInitFromElements() with sorted elements
-- [ ] **Size management**: maxUncompressedBlockSize limits
+- [ ] **Size management**: maxElementsPerBlock limits
 - [ ] **Memory efficiency**: Object pooling and resource management
 - [ ] **Implementation Tasks**:
   - [ ] Integrate block building into write path
@@ -357,9 +357,79 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 
 ---
 
-## Phase 7: Flush Operations
+## Phase 7: Query Path
+
+### 7.1 Query Interface (`query.go`)
+- [ ] Key range queries with tag filters
+- [ ] Support projections and result limits
+- [ ] Query validation and optimization
+- [ ] **Test Cases**:
+  - [ ] Query parsing handles all parameter types
+  - [ ] Validation rejects invalid queries
+  - [ ] Query optimization improves performance
+  - [ ] Complex queries return correct results
+
+### 7.2 Part Filtering (`query.go`)
+- [ ] Filter parts by key range overlap
+- [ ] Minimize I/O operations through smart filtering
+- [ ] Support inclusive/exclusive bounds
+- [ ] **Test Cases**:
+  - [ ] Filtering accuracy eliminates non-overlapping parts
+  - [ ] Performance improvement through reduced I/O
+  - [ ] Boundary conditions handled correctly
+  - [ ] Empty result sets handled gracefully
+
+### 7.3 Block Scanner (`block_scanner.go`) 🔥 - DESIGN COMPLETED ✅
+- [ ] **Complete block scanner design added to DESIGN.md**
+- [ ] **Query processing**: scanBlock() with range and tag filtering
+- [ ] **Memory management**: Object pooling with reset() methods
+- [ ] **Efficient filtering**: Quick checks before loading block data
+- [ ] **Element matching**: scanBlockElements() with tag filter matching
+- [ ] **Resource management**: Block reader and temporary block handling
+- [ ] **Implementation Tasks**:
+  - [ ] Create block_scanner.go with scanner structure
+  - [ ] Implement scanBlock() with filtering logic
+  - [ ] Add scanBlockElements() for element processing
+  - [ ] Create matchesTagFilters() for tag filtering
+- [ ] **Test Cases**:
+  - [ ] Scan completeness finds all matching data
+  - [ ] Filter effectiveness reduces false positives
+  - [ ] Block scanning performance meets targets
+  - [ ] Memory usage during scanning is controlled
+
+### 7.4 Result Iterator (`query.go`)
+- [ ] Stream results with proper ordering
+- [ ] Memory-efficient iteration patterns
+- [ ] Support both ASC and DESC ordering
+- [ ] **Test Cases**:
+  - [ ] Iterator correctness for various query types
+  - [ ] Memory usage remains bounded
+  - [ ] Ordering is maintained across parts
+  - [ ] Iterator cleanup prevents resource leaks
+
+### 7.5 Block Reader (`block_reader.go`) 🔥 - DESIGN COMPLETED ✅
+- [x] **Complete block reader design added to DESIGN.md**
+- [x] **Multi-file reading**: data.bin, keys.bin, *.td files
+- [x] **Decompression**: zstd decompression for data payloads
+- [x] **Memory management**: Object pooling with reset() methods
+- [x] **Selective loading**: Tag projection for efficient I/O
+- [x] **Block reconstruction**: mustReadFrom() for complete block loading
+- [ ] **Implementation Tasks**:
+  - [ ] Create block_reader.go with reader structure
+  - [ ] Implement readUserKeys(), readData(), readTags()
+  - [ ] Add decompression support
+  - [ ] Create mustReadFrom() for block loading
+- [ ] **Test Cases**:
+  - [ ] Block reading maintains data integrity
+  - [ ] Decompression works correctly
+  - [ ] Block structure reconstruction is accurate
+  - [ ] Read performance meets requirements
+
+---
+
+## Phase 8: Flush Operations
 
-### 7.1 Flusher Interface (`flusher.go`)
+### 8.1 Flusher Interface (`flusher.go`)
 - [ ] Simple Flush() method for user control
 - [ ] Internal part selection logic
 - [ ] Error handling and retry mechanisms
@@ -369,7 +439,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [ ] Error handling for flush failures
   - [ ] Concurrent flush operations are handled safely
 
-### 7.2 Flush to Disk (`flusher.go`)
+### 8.2 Flush to Disk (`flusher.go`)
 - [ ] Create part directories with epoch names
 - [ ] Write all part files atomically
 - [ ] Implement crash recovery mechanisms
@@ -379,7 +449,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [ ] Crash recovery restores consistent state
   - [ ] Disk space management during flush
 
-### 7.3 Tag File Writing (`flusher.go`)
+### 8.3 Tag File Writing (`flusher.go`)
 - [ ] Write individual tag files (not families)
 - [ ] Generate bloom filters for indexed tags
 - [ ] Optimize file layout for query performance
@@ -389,7 +459,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [ ] File format compatibility
   - [ ] Performance of tag file generation
 
-### 7.4 Block Serialization (`flusher.go` + `block_writer.go`) 🔥 - DESIGN 
COMPLETED ✅
+### 8.4 Block Serialization (`flusher.go` + `block_writer.go`) 🔥 - DESIGN 
COMPLETED ✅
 - [ ] **Complete block serialization design added to DESIGN.md**
 - [ ] **Multi-file output**: primary.bin, data.bin, keys.bin, tag files
 - [ ] **Block writer integration**: mustWriteTo() for block persistence
@@ -408,9 +478,9 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 
 ---
 
-## Phase 8: Merge Operations
+## Phase 9: Merge Operations
 
-### 8.1 Merger Interface (`merger.go`)
+### 9.1 Merger Interface (`merger.go`)
 - [ ] Simple Merge() method for user control
 - [ ] Internal merge strategy implementation
 - [ ] Resource management during merge
@@ -420,7 +490,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [ ] Resource usage during merge operations
   - [ ] Concurrent merge safety
 
-### 8.2 Part Selection (`merger.go`)
+### 9.2 Part Selection (`merger.go`)
 - [ ] Select parts by size/age criteria
 - [ ] Avoid merging recent parts
 - [ ] Optimize merge efficiency
@@ -430,7 +500,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [ ] Selection criteria can be tuned
   - [ ] Selection performance is acceptable
 
-### 8.3 Merged Part Writer (`merger.go`)
+### 9.3 Merged Part Writer (`merger.go`)
 - [ ] Combine parts maintaining key order
 - [ ] Deduplicate overlapping data
 - [ ] Generate merged part metadata
@@ -440,7 +510,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [ ] Key ordering is maintained across parts
   - [ ] Merged part metadata is accurate
 
-### 8.4 Block Merging (`merger.go` + `block.go`) 🔥 - DESIGN COMPLETED ✅
+### 9.4 Block Merging (`merger.go` + `block.go`) 🔥 - DESIGN COMPLETED ✅
 - [ ] **Complete block merging design added to DESIGN.md**
 - [ ] **Block reader integration**: Read blocks from multiple parts
 - [ ] **Merge strategy**: Maintain key ordering across merged blocks
@@ -459,76 +529,6 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 
 ---
 
-## Phase 9: Query Path
-
-### 9.1 Query Interface (`query.go`)
-- [ ] Key range queries with tag filters
-- [ ] Support projections and result limits
-- [ ] Query validation and optimization
-- [ ] **Test Cases**:
-  - [ ] Query parsing handles all parameter types
-  - [ ] Validation rejects invalid queries
-  - [ ] Query optimization improves performance
-  - [ ] Complex queries return correct results
-
-### 9.2 Part Filtering (`query.go`)
-- [ ] Filter parts by key range overlap
-- [ ] Minimize I/O operations through smart filtering
-- [ ] Support inclusive/exclusive bounds
-- [ ] **Test Cases**:
-  - [ ] Filtering accuracy eliminates non-overlapping parts
-  - [ ] Performance improvement through reduced I/O
-  - [ ] Boundary conditions handled correctly
-  - [ ] Empty result sets handled gracefully
-
-### 9.3 Block Scanner (`block_scanner.go`) 🔥 - DESIGN COMPLETED ✅
-- [ ] **Complete block scanner design added to DESIGN.md**
-- [ ] **Query processing**: scanBlock() with range and tag filtering
-- [ ] **Memory management**: Object pooling with reset() methods
-- [ ] **Efficient filtering**: Quick checks before loading block data
-- [ ] **Element matching**: scanBlockElements() with tag filter matching
-- [ ] **Resource management**: Block reader and temporary block handling
-- [ ] **Implementation Tasks**:
-  - [ ] Create block_scanner.go with scanner structure
-  - [ ] Implement scanBlock() with filtering logic
-  - [ ] Add scanBlockElements() for element processing
-  - [ ] Create matchesTagFilters() for tag filtering
-- [ ] **Test Cases**:
-  - [ ] Scan completeness finds all matching data
-  - [ ] Filter effectiveness reduces false positives
-  - [ ] Block scanning performance meets targets
-  - [ ] Memory usage during scanning is controlled
-
-### 9.4 Result Iterator (`query.go`)
-- [ ] Stream results with proper ordering
-- [ ] Memory-efficient iteration patterns
-- [ ] Support both ASC and DESC ordering
-- [ ] **Test Cases**:
-  - [ ] Iterator correctness for various query types
-  - [ ] Memory usage remains bounded
-  - [ ] Ordering is maintained across parts
-  - [ ] Iterator cleanup prevents resource leaks
-
-### 9.5 Block Reader (`block_reader.go`) 🔥 - DESIGN COMPLETED ✅
-- [x] **Complete block reader design added to DESIGN.md**
-- [x] **Multi-file reading**: data.bin, keys.bin, tag_*.td files
-- [x] **Decompression**: zstd decompression for data payloads
-- [x] **Memory management**: Object pooling with reset() methods
-- [x] **Selective loading**: Tag projection for efficient I/O
-- [x] **Block reconstruction**: mustReadFrom() for complete block loading
-- [ ] **Implementation Tasks**:
-  - [ ] Create block_reader.go with reader structure
-  - [ ] Implement readUserKeys(), readData(), readTags()
-  - [ ] Add decompression support
-  - [ ] Create mustReadFrom() for block loading
-- [ ] **Test Cases**:
-  - [ ] Block reading maintains data integrity
-  - [ ] Decompression works correctly
-  - [ ] Block structure reconstruction is accurate
-  - [ ] Read performance meets requirements
-
----
-
 ## Phase 10: Resource Management
 
 ### 10.1 Disk Reservation (`resource_manager.go`)
@@ -713,10 +713,10 @@ The `block.go` file is central to the SIDX implementation 
and is used in multipl
 2. **Phase 2.2**: Block writer uses block for serialization  
 3. **Phase 2.4**: Block initialization from elements
 4. **Phase 4.4**: Create blocks when memory threshold reached
-5. **Phase 7.4**: Serialize blocks to disk during flush
-6. **Phase 8.4**: Merge blocks from multiple parts
-7. **Phase 9.3**: Block scanner reads blocks during queries
-8. **Phase 9.5**: Block reader deserializes blocks
+5. **Phase 7.3**: Block scanner reads blocks during queries
+6. **Phase 7.5**: Block reader deserializes blocks
+7. **Phase 8.4**: Serialize blocks to disk during flush
+8. **Phase 9.4**: Merge blocks from multiple parts
 9. **Phase 12.1**: Unit tests for block operations
 10. **Phase 12.3**: Performance benchmarks for block operations
 
@@ -727,9 +727,12 @@ The `block.go` file is central to the SIDX implementation 
and is used in multipl
 - **Phase 1** must complete before **Phase 2** (data structures needed)
 - **Phase 2** must complete before **Phase 4** (memory management needed for 
writes)
 - **Phase 3** must complete before **Phase 4** (snapshot management needed)
-- **Phase 4** must complete before **Phase 5** (write path needed for flush)
-- **Phase 5** must complete before **Phase 6** (flush needed for merge)
-- **Phase 1-6** must complete before **Phase 9** (all components needed for 
queries)
+- **Phase 4** must complete before **Phase 5** (memory management needed for 
snapshot)
+- **Phase 5** must complete before **Phase 6** (snapshot management needed for 
write path)
+- **Phase 6** must complete before **Phase 7** (write path needed for queries)
+- **Phase 7** can be developed independently (queries work with existing 
persisted data)
+- **Phase 6** must complete before **Phase 8** (write path needed for flush)
+- **Phase 8** must complete before **Phase 9** (flush needed for merge)
 - **Phase 10-11** can be developed in parallel with other phases
 - **Phase 12** requires completion of relevant phases for testing
 
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index aab32a06..f23e4d72 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -18,20 +18,26 @@
 package sidx
 
 import (
+       "encoding/json"
        "fmt"
        "path/filepath"
+       "sort"
        "strings"
 
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 const (
        // Standard file names for sidx parts.
-       primaryFilename = "primary.bin"
-       dataFilename    = "data.bin"
-       keysFilename    = "keys.bin"
-       metaFilename    = "meta.bin"
+       primaryFilename  = "primary.bin"
+       dataFilename     = "data.bin"
+       keysFilename     = "keys.bin"
+       metaFilename     = "meta.bin"
+       manifestFilename = "manifest.json"
 
        // Tag file extensions.
        tagDataExtension     = ".td" // <name>.td files
@@ -93,9 +99,23 @@ func mustOpenPart(path string, fileSystem fs.FileSystem) 
*part {
        return p
 }
 
-// loadPartMetadata reads and parses the part metadata from meta.bin.
+// loadPartMetadata reads and parses the part metadata from manifest.json.
 func (p *part) loadPartMetadata() error {
-       // Read the entire meta.bin file.
+       // First try to read from manifest.json (new format)
+       manifestPath := filepath.Join(p.path, manifestFilename)
+       manifestData, err := p.fileSystem.Read(manifestPath)
+       if err == nil {
+               // Parse JSON manifest
+               pm := generatePartMetadata()
+               if err := json.Unmarshal(manifestData, pm); err != nil {
+                       releasePartMetadata(pm)
+                       return fmt.Errorf("failed to unmarshal manifest.json: 
%w", err)
+               }
+               p.partMetadata = pm
+               return nil
+       }
+
+       // Fallback to meta.bin for backward compatibility
        metaData, err := p.fileSystem.Read(filepath.Join(p.path, metaFilename))
        if err != nil {
                return fmt.Errorf("failed to read meta.bin: %w", err)
@@ -253,9 +273,9 @@ func mustOpenReader(filePath string, fileSystem 
fs.FileSystem) fs.Reader {
 // String returns a string representation of the part.
 func (p *part) String() string {
        if p.partMetadata != nil {
-               return fmt.Sprintf("part %d at %s", p.partMetadata.ID, p.path)
+               return fmt.Sprintf("sidx part %d at %s", p.partMetadata.ID, 
p.path)
        }
-       return fmt.Sprintf("part at %s", p.path)
+       return fmt.Sprintf("sidx part at %s", p.path)
 }
 
 // getPartMetadata returns the part metadata.
@@ -322,3 +342,225 @@ func (p *part) hasTagFiles(tagName string) bool {
 func (p *part) Path() string {
        return p.path
 }
+
+// memPart represents an in-memory part for SIDX with tag-based file design.
+// This structure mirrors the stream module's memPart but uses user keys 
instead of timestamps.
+type memPart struct {
+       // Tag storage with individual tag buffers (tag-based design)
+       tagMetadata map[string]*bytes.Buffer // Individual tag metadata buffers
+       tagData     map[string]*bytes.Buffer // Individual tag data buffers
+       tagFilters  map[string]*bytes.Buffer // Individual tag filter buffers
+
+       // Core data buffers
+       meta    bytes.Buffer // Part metadata
+       primary bytes.Buffer // Block metadata
+       data    bytes.Buffer // User data payloads
+       keys    bytes.Buffer // User-provided int64 keys
+
+       partMetadata *partMetadata
+}
+
+// mustCreateTagWriters creates writers for individual tag files.
+// Returns metadata writer, data writer, and filter writer for the specified 
tag.
+func (mp *memPart) mustCreateTagWriters(tagName string) (fs.Writer, fs.Writer, 
fs.Writer) {
+       if mp.tagData == nil {
+               mp.tagData = make(map[string]*bytes.Buffer)
+               mp.tagMetadata = make(map[string]*bytes.Buffer)
+               mp.tagFilters = make(map[string]*bytes.Buffer)
+       }
+
+       // Get or create buffers for this tag
+       td, tdExists := mp.tagData[tagName]
+       tm := mp.tagMetadata[tagName]
+       tf := mp.tagFilters[tagName]
+
+       if tdExists {
+               td.Reset()
+               tm.Reset()
+               tf.Reset()
+               return tm, td, tf
+       }
+
+       // Create new buffers for this tag
+       mp.tagData[tagName] = &bytes.Buffer{}
+       mp.tagMetadata[tagName] = &bytes.Buffer{}
+       mp.tagFilters[tagName] = &bytes.Buffer{}
+
+       return mp.tagMetadata[tagName], mp.tagData[tagName], 
mp.tagFilters[tagName]
+}
+
+// reset clears the memory part for reuse.
+func (mp *memPart) reset() {
+       if mp.partMetadata != nil {
+               mp.partMetadata.reset()
+       }
+       mp.meta.Reset()
+       mp.primary.Reset()
+       mp.data.Reset()
+       mp.keys.Reset()
+
+       if mp.tagData != nil {
+               for k, td := range mp.tagData {
+                       td.Reset()
+                       delete(mp.tagData, k)
+               }
+       }
+       if mp.tagMetadata != nil {
+               for k, tm := range mp.tagMetadata {
+                       tm.Reset()
+                       delete(mp.tagMetadata, k)
+               }
+       }
+       if mp.tagFilters != nil {
+               for k, tf := range mp.tagFilters {
+                       tf.Reset()
+                       delete(mp.tagFilters, k)
+               }
+       }
+}
+
+// mustInitFromElements initializes the memory part from sorted elements.
+// This method will be completed when blockWriter is implemented.
+func (mp *memPart) mustInitFromElements(es *elements) {
+       mp.reset()
+
+       if len(es.userKeys) == 0 {
+               return
+       }
+
+       // Sort elements by seriesID first, then by user key
+       sort.Sort(es)
+
+       // TODO: Initialize using blockWriter when implemented in 4.2
+       // For now, we prepare the structure
+
+       // Set basic metadata
+       if mp.partMetadata == nil {
+               mp.partMetadata = generatePartMetadata()
+       }
+       mp.partMetadata.TotalCount = uint64(len(es.userKeys))
+       mp.partMetadata.MinKey = es.userKeys[0]
+       mp.partMetadata.MaxKey = es.userKeys[len(es.userKeys)-1]
+}
+
+// mustFlush flushes the memory part to disk with tag-based file organization.
+func (mp *memPart) mustFlush(fileSystem fs.FileSystem, partPath string) {
+       fileSystem.MkdirPanicIfExist(partPath, storage.DirPerm)
+
+       // Write core files
+       fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(partPath, 
metaFilename), storage.FilePerm)
+       fs.MustFlush(fileSystem, mp.primary.Buf, filepath.Join(partPath, 
primaryFilename), storage.FilePerm)
+       fs.MustFlush(fileSystem, mp.data.Buf, filepath.Join(partPath, 
dataFilename), storage.FilePerm)
+       fs.MustFlush(fileSystem, mp.keys.Buf, filepath.Join(partPath, 
keysFilename), storage.FilePerm)
+
+       // Write individual tag files
+       for tagName, td := range mp.tagData {
+               fs.MustFlush(fileSystem, td.Buf, filepath.Join(partPath, 
tagName+tagDataExtension), storage.FilePerm)
+       }
+       for tagName, tm := range mp.tagMetadata {
+               fs.MustFlush(fileSystem, tm.Buf, filepath.Join(partPath, 
tagName+tagMetadataExtension), storage.FilePerm)
+       }
+       for tagName, tf := range mp.tagFilters {
+               fs.MustFlush(fileSystem, tf.Buf, filepath.Join(partPath, 
tagName+tagFilterExtension), storage.FilePerm)
+       }
+
+       // Write part metadata manifest
+       if mp.partMetadata != nil {
+               manifestData, err := mp.partMetadata.marshal()
+               if err != nil {
+                       logger.GetLogger().Panic().Err(err).Str("path", 
partPath).Msg("failed to marshal part metadata")
+               }
+               fs.MustFlush(fileSystem, manifestData, filepath.Join(partPath, 
manifestFilename), storage.FilePerm)
+       }
+
+       fileSystem.SyncPath(partPath)
+}
+
+// generateMemPart gets memPart from pool or creates new.
+func generateMemPart() *memPart {
+       v := memPartPool.Get()
+       if v == nil {
+               return &memPart{}
+       }
+       return v
+}
+
+// releaseMemPart returns memPart to pool after reset.
+func releaseMemPart(mp *memPart) {
+       mp.reset()
+       memPartPool.Put(mp)
+}
+
+var memPartPool = pool.Register[*memPart]("sidx-memPart")
+
+// openMemPart creates a part from a memory part.
+func openMemPart(mp *memPart) *part {
+       p := &part{}
+       if mp.partMetadata != nil {
+               // Copy part metadata
+               p.partMetadata = generatePartMetadata()
+               *p.partMetadata = *mp.partMetadata
+       }
+
+       // TODO: Read block metadata when blockWriter is implemented
+       // p.blockMetadata = mustReadBlockMetadata(p.blockMetadata[:0], 
&mp.primary)
+
+       // Open data files as readers from memory buffers
+       p.primary = &mp.primary
+       p.data = &mp.data
+       p.keys = &mp.keys
+
+       // Open individual tag files if they exist
+       if mp.tagData != nil {
+               p.tagData = make(map[string]fs.Reader)
+               p.tagMetadata = make(map[string]fs.Reader)
+               p.tagFilters = make(map[string]fs.Reader)
+
+               for tagName, td := range mp.tagData {
+                       p.tagData[tagName] = td
+                       if tm, exists := mp.tagMetadata[tagName]; exists {
+                               p.tagMetadata[tagName] = tm
+                       }
+                       if tf, exists := mp.tagFilters[tagName]; exists {
+                               p.tagFilters[tagName] = tf
+                       }
+               }
+       }
+       return p
+}
+
+// uncompressedElementSizeBytes calculates the uncompressed size of an element.
+// This is a utility function similar to the stream module.
+func uncompressedElementSizeBytes(index int, es *elements) uint64 {
+       // 8 bytes for user key
+       // 8 bytes for elementID
+       n := uint64(8 + 8)
+
+       // Add data payload size
+       if index < len(es.data) && es.data[index] != nil {
+               n += uint64(len(es.data[index]))
+       }
+
+       // Add tag sizes
+       if index < len(es.tags) {
+               for _, tag := range es.tags[index] {
+                       n += uint64(len(tag.name))
+                       if tag.value != nil {
+                               n += uint64(len(tag.value))
+                       }
+               }
+       }
+
+       return n
+}
+
+// partPath returns the path for a part with the given epoch.
+func partPath(root string, epoch uint64) string {
+       return filepath.Join(root, partName(epoch))
+}
+
+// partName returns the directory name for a part with the given epoch.
+// Uses 16-character hex format consistent with stream module.
+func partName(epoch uint64) string {
+       return fmt.Sprintf("%016x", epoch)
+}

Reply via email to