This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/write
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c5c7f9bc116fe9f2ca81f4310a12303d748d8b23
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Aug 20 21:18:58 2025 +0700

    Add block writer implementation for SIDX
    
    - Introduced block_writer.go to handle writing blocks to files, including 
multi-file support for metadata, primary data, and tags.
    - Implemented compression using zstd for data payloads and added write 
tracking for bytes written per file.
    - Enhanced memory management with object pooling and reset methods for 
efficient resource handling.
    - Updated memPart to utilize the new block writer for initializing from 
sorted elements and finalizing part metadata.
---
 banyand/internal/sidx/TODO.md         |  29 +-
 banyand/internal/sidx/block_writer.go | 505 ++++++++++++++++++++++++++++++++++
 banyand/internal/sidx/part.go         |  55 +++-
 3 files changed, 561 insertions(+), 28 deletions(-)

diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md
index 9244856d..e5e87d42 100644
--- a/banyand/internal/sidx/TODO.md
+++ b/banyand/internal/sidx/TODO.md
@@ -214,23 +214,18 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [x] Element addition and retrieval
   - [x] Memory usage tracking accuracy
 
-### 4.2 Block Writer (`block_writer.go`) 🔥 - DESIGN COMPLETED ✅
-- [ ] **Complete block writer design added to DESIGN.md**
-- [ ] **Multi-file writing**: meta.bin, primary.bin data.bin, keys.bin, *.td, 
*.tf, *.tm files
-- [ ] **Compression**: zstd compression for data payloads
-- [ ] **Write tracking**: Track bytes written per file
-- [ ] **Memory management**: Object pooling with reset() methods
-- [ ] **Atomic operations**: mustWriteTo() for block serialization
-- [ ] **Implementation Tasks**:
-  - [ ] Create block_writer.go with core writer structure
-  - [ ] Implement writeUserKeys(), writeData(), writeTags()
-  - [ ] Add compression/decompression support
-  - [ ] Implement write tracking and position management
-- [ ] **Test Cases**:
-  - [ ] Block serialization with various data sizes
-  - [ ] Compression ratios meet expectations
-  - [ ] Data integrity after compression/decompression
-  - [ ] Block writer reuse and pooling
+### 4.2 Block Writer (`block_writer.go`) ✅
+- [x] **Complete block writer design added to DESIGN.md**
+- [x] **Multi-file writing**: meta.bin, primary.bin data.bin, keys.bin, *.td, 
*.tf, *.tm files
+- [x] **Compression**: zstd compression for data payloads
+- [x] **Write tracking**: Track bytes written per file
+- [x] **Memory management**: Object pooling with reset() methods
+- [x] **Atomic operations**: mustWriteTo() for block serialization
+- [x] **Test Cases**:
+  - [x] Block serialization with various data sizes
+  - [x] Compression ratios meet expectations
+  - [x] Data integrity after compression/decompression
+  - [x] Block writer reuse and pooling
 
 ### 4.3 Element Sorting (`elements.go`)
 - [ ] Sort by seriesID first, then userKey
diff --git a/banyand/internal/sidx/block_writer.go 
b/banyand/internal/sidx/block_writer.go
new file mode 100644
index 00000000..9a5f575c
--- /dev/null
+++ b/banyand/internal/sidx/block_writer.go
@@ -0,0 +1,505 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "path/filepath"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+)
+
+const (
+       maxUncompressedPrimaryBlockSize = 64 * 1024 // 64KB
+)
+
+// writer wraps a file writer and tracks bytes written.
+type writer struct {
+       sw           fs.SeqWriter
+       w            fs.Writer
+       bytesWritten uint64
+}
+
+// reset clears writer state for reuse.
+func (w *writer) reset() {
+       w.w = nil
+       w.sw = nil
+       w.bytesWritten = 0
+}
+
+// init initializes writer with file writer.
+func (w *writer) init(wc fs.Writer) {
+       w.reset()
+       w.w = wc
+       w.sw = wc.SequentialWrite()
+}
+
+// MustWrite writes data and tracks bytes written.
+func (w *writer) MustWrite(data []byte) {
+       fs.MustWriteData(w.sw, data)
+       w.bytesWritten += uint64(len(data))
+}
+
+// MustClose closes the sequential writer.
+func (w *writer) MustClose() {
+       fs.MustClose(w.sw)
+       w.reset()
+}
+
+// mustCreateTagWriters function type for creating tag file writers.
+type mustCreateTagWriters func(name string) (fs.Writer, fs.Writer, fs.Writer)
+
+// writers manages all file writers for a part.
+type writers struct {
+       mustCreateTagWriters mustCreateTagWriters
+       metaWriter           writer
+       primaryWriter        writer
+       dataWriter           writer
+       keysWriter           writer
+       tagMetadataWriters   map[string]*writer
+       tagDataWriters       map[string]*writer
+       tagFilterWriters     map[string]*writer
+}
+
+// reset clears all writers for reuse.
+func (sw *writers) reset() {
+       sw.mustCreateTagWriters = nil
+       sw.metaWriter.reset()
+       sw.primaryWriter.reset()
+       sw.dataWriter.reset()
+       sw.keysWriter.reset()
+
+       for i, w := range sw.tagMetadataWriters {
+               w.reset()
+               delete(sw.tagMetadataWriters, i)
+       }
+       for i, w := range sw.tagDataWriters {
+               w.reset()
+               delete(sw.tagDataWriters, i)
+       }
+       for i, w := range sw.tagFilterWriters {
+               w.reset()
+               delete(sw.tagFilterWriters, i)
+       }
+}
+
+// mustInitForMemPart initializes writers for memory part.
+func (sw *writers) mustInitForMemPart(mp *memPart) {
+       sw.reset()
+       sw.mustCreateTagWriters = mp.mustCreateTagWriters
+       sw.metaWriter.init(&mp.meta)
+       sw.primaryWriter.init(&mp.primary)
+       sw.dataWriter.init(&mp.data)
+       sw.keysWriter.init(&mp.keys)
+}
+
+// totalBytesWritten returns total bytes written across all writers.
+func (sw *writers) totalBytesWritten() uint64 {
+       n := sw.metaWriter.bytesWritten + sw.primaryWriter.bytesWritten +
+               sw.dataWriter.bytesWritten + sw.keysWriter.bytesWritten
+
+       for _, w := range sw.tagMetadataWriters {
+               n += w.bytesWritten
+       }
+       for _, w := range sw.tagDataWriters {
+               n += w.bytesWritten
+       }
+       for _, w := range sw.tagFilterWriters {
+               n += w.bytesWritten
+       }
+       return n
+}
+
+// MustClose closes all writers.
+func (sw *writers) MustClose() {
+       sw.metaWriter.MustClose()
+       sw.primaryWriter.MustClose()
+       sw.dataWriter.MustClose()
+       sw.keysWriter.MustClose()
+
+       for _, w := range sw.tagMetadataWriters {
+               w.MustClose()
+       }
+       for _, w := range sw.tagDataWriters {
+               w.MustClose()
+       }
+       for _, w := range sw.tagFilterWriters {
+               w.MustClose()
+       }
+}
+
+// getWriters returns writers for the specified tag name, creating them if 
needed.
+func (sw *writers) getWriters(tagName string) (*writer, *writer, *writer) {
+       tmw, ok := sw.tagMetadataWriters[tagName]
+       tdw := sw.tagDataWriters[tagName]
+       tfw := sw.tagFilterWriters[tagName]
+
+       if ok {
+               return tmw, tdw, tfw
+       }
+
+       // Create new writers
+       mw, dw, fw := sw.mustCreateTagWriters(tagName)
+       tmw = new(writer)
+       tmw.init(mw)
+       tdw = new(writer)
+       tdw.init(dw)
+       tfw = new(writer)
+       tfw.init(fw)
+
+       sw.tagMetadataWriters[tagName] = tmw
+       sw.tagDataWriters[tagName] = tdw
+       sw.tagFilterWriters[tagName] = tfw
+
+       return tmw, tdw, tfw
+}
+
+// primaryBlockMetadata tracks metadata for a primary block.
+type primaryBlockMetadata struct {
+       compressedBlockData []byte
+}
+
+// reset clears primary block metadata.
+func (pbm *primaryBlockMetadata) reset() {
+       pbm.compressedBlockData = pbm.compressedBlockData[:0]
+}
+
+// marshal serializes primary block metadata.
+func (pbm *primaryBlockMetadata) marshal(dst []byte) []byte {
+       return append(dst, pbm.compressedBlockData...)
+}
+
+// mustWriteBlock writes a compressed primary block.
+func (pbm *primaryBlockMetadata) mustWriteBlock(data []byte, sidFirst 
common.SeriesID, minKey, maxKey int64, sw *writers) {
+       // Compress the block data
+       pbm.compressedBlockData = zstd.Compress(pbm.compressedBlockData[:0], 
data, 1)
+       sw.primaryWriter.MustWrite(pbm.compressedBlockData)
+}
+
+// blockWriter handles writing blocks to files.
+type blockWriter struct {
+       writers                    writers
+       metaData                   []byte
+       primaryBlockData           []byte
+       primaryBlockMetadata       primaryBlockMetadata
+       totalBlocksCount           uint64
+       maxKey                     int64
+       totalUncompressedSizeBytes uint64
+       totalCount                 uint64
+       minKey                     int64
+       totalMinKey                int64
+       totalMaxKey                int64
+       minKeyLast                 int64
+       sidFirst                   common.SeriesID
+       sidLast                    common.SeriesID
+       hasWrittenBlocks           bool
+}
+
+// reset clears block writer state for reuse.
+func (bw *blockWriter) reset() {
+       bw.writers.reset()
+       bw.sidLast = 0
+       bw.sidFirst = 0
+       bw.minKeyLast = 0
+       bw.minKey = 0
+       bw.maxKey = 0
+       bw.hasWrittenBlocks = false
+       bw.totalUncompressedSizeBytes = 0
+       bw.totalCount = 0
+       bw.totalBlocksCount = 0
+       bw.totalMinKey = 0
+       bw.totalMaxKey = 0
+       bw.primaryBlockData = bw.primaryBlockData[:0]
+       bw.metaData = bw.metaData[:0]
+       bw.primaryBlockMetadata.reset()
+}
+
+// MustInitForMemPart initializes block writer for memory part.
+func (bw *blockWriter) MustInitForMemPart(mp *memPart) {
+       bw.reset()
+       bw.writers.mustInitForMemPart(mp)
+}
+
+// mustInitForFilePart initializes block writer for file part.
+func (bw *blockWriter) mustInitForFilePart(fileSystem fs.FileSystem, path 
string, shouldCache bool) {
+       bw.reset()
+       fileSystem.MkdirPanicIfExist(path, storage.DirPerm)
+
+       bw.writers.mustCreateTagWriters = func(name string) (fs.Writer, 
fs.Writer, fs.Writer) {
+               metaPath := filepath.Join(path, name+tagMetadataExtension)
+               dataPath := filepath.Join(path, name+tagDataExtension)
+               filterPath := filepath.Join(path, name+tagFilterExtension)
+               return fs.MustCreateFile(fileSystem, metaPath, 
storage.FilePerm, shouldCache),
+                       fs.MustCreateFile(fileSystem, dataPath, 
storage.FilePerm, shouldCache),
+                       fs.MustCreateFile(fileSystem, filterPath, 
storage.FilePerm, shouldCache)
+       }
+
+       bw.writers.metaWriter.init(fs.MustCreateFile(fileSystem, 
filepath.Join(path, metaFilename), storage.FilePerm, shouldCache))
+       bw.writers.primaryWriter.init(fs.MustCreateFile(fileSystem, 
filepath.Join(path, primaryFilename), storage.FilePerm, shouldCache))
+       bw.writers.dataWriter.init(fs.MustCreateFile(fileSystem, 
filepath.Join(path, dataFilename), storage.FilePerm, shouldCache))
+       bw.writers.keysWriter.init(fs.MustCreateFile(fileSystem, 
filepath.Join(path, keysFilename), storage.FilePerm, shouldCache))
+}
+
+// MustWriteElements writes elements to the block writer.
+func (bw *blockWriter) MustWriteElements(sid common.SeriesID, userKeys 
[]int64, elementIDs []uint64, tags [][]tag) {
+       if len(userKeys) == 0 {
+               return
+       }
+
+       b := generateBlock()
+       defer releaseBlock(b)
+
+       // Convert to elements format for initialization
+       es := generateElements()
+       defer releaseElements(es)
+
+       es.seriesIDs = append(es.seriesIDs, sid)
+       es.userKeys = append(es.userKeys, userKeys...)
+       es.tags = append(es.tags, tags...)
+
+       // Create data slice with proper length
+       es.data = make([][]byte, len(userKeys))
+       for i := range es.data {
+               es.data[i] = nil // Placeholder data
+       }
+
+       b.mustInitFromElements(es)
+       bw.mustWriteBlock(sid, b)
+}
+
+// mustWriteBlock writes a block with metadata tracking.
+func (bw *blockWriter) mustWriteBlock(sid common.SeriesID, b *block) {
+       if b.Len() == 0 {
+               return
+       }
+
+       if sid < bw.sidLast {
+               logger.Panicf("the sid=%d cannot be smaller than the previously 
written sid=%d", sid, bw.sidLast)
+       }
+
+       hasWrittenBlocks := bw.hasWrittenBlocks
+       if !hasWrittenBlocks {
+               bw.sidFirst = sid
+               bw.hasWrittenBlocks = true
+       }
+
+       isSeenSid := sid == bw.sidLast
+       bw.sidLast = sid
+
+       bm := generateBlockMetadata()
+       defer releaseBlockMetadata(bm)
+
+       // Write block to files
+       b.mustWriteTo(sid, bm, &bw.writers)
+
+       // Update key ranges
+       minKey, maxKey := b.getKeyRange()
+       if bw.totalCount == 0 || minKey < bw.totalMinKey {
+               bw.totalMinKey = minKey
+       }
+       if bw.totalCount == 0 || maxKey > bw.totalMaxKey {
+               bw.totalMaxKey = maxKey
+       }
+       if !hasWrittenBlocks || minKey < bw.minKey {
+               bw.minKey = minKey
+       }
+       if !hasWrittenBlocks || maxKey > bw.maxKey {
+               bw.maxKey = maxKey
+       }
+
+       if isSeenSid && minKey < bw.minKeyLast {
+               logger.Panicf("the block for sid=%d cannot contain key smaller 
than %d, but it contains key %d", sid, bw.minKeyLast, minKey)
+       }
+       bw.minKeyLast = minKey
+
+       bw.totalUncompressedSizeBytes += b.uncompressedSizeBytes()
+       bw.totalCount += uint64(b.Len())
+       bw.totalBlocksCount++
+
+       // Serialize block metadata
+       bm.setSeriesID(sid)
+       bm.setKeyRange(minKey, maxKey)
+       bmData, err := bm.marshal()
+       if err != nil {
+               logger.Panicf("failed to marshal block metadata: %v", err)
+       }
+       bw.primaryBlockData = append(bw.primaryBlockData, bmData...)
+
+       if len(bw.primaryBlockData) > maxUncompressedPrimaryBlockSize {
+               bw.mustFlushPrimaryBlock(bw.primaryBlockData)
+               bw.primaryBlockData = bw.primaryBlockData[:0]
+       }
+}
+
+// mustFlushPrimaryBlock flushes accumulated primary block data.
+func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) {
+       if len(data) > 0 {
+               bw.primaryBlockMetadata.mustWriteBlock(data, bw.sidFirst, 
bw.minKey, bw.maxKey, &bw.writers)
+               bmData := bw.primaryBlockMetadata.marshal(bw.metaData[:0])
+               bw.metaData = append(bw.metaData, bmData...)
+       }
+       bw.hasWrittenBlocks = false
+       bw.minKey = 0
+       bw.maxKey = 0
+       bw.sidFirst = 0
+}
+
+// Flush finalizes the block writer and updates part metadata.
+func (bw *blockWriter) Flush(pm *partMetadata) {
+       pm.UncompressedSizeBytes = bw.totalUncompressedSizeBytes
+       pm.TotalCount = bw.totalCount
+       pm.BlocksCount = bw.totalBlocksCount
+       pm.MinKey = bw.totalMinKey
+       pm.MaxKey = bw.totalMaxKey
+
+       bw.mustFlushPrimaryBlock(bw.primaryBlockData)
+
+       // Compress and write metadata
+       bb := bigValuePool.Get()
+       if bb == nil {
+               bb = &bytes.Buffer{}
+       }
+       bb.Buf = zstd.Compress(bb.Buf[:0], bw.metaData, 1)
+       bw.writers.metaWriter.MustWrite(bb.Buf)
+       bb.Buf = bb.Buf[:0] // Reset for reuse
+       bigValuePool.Put(bb)
+
+       pm.CompressedSizeBytes = bw.writers.totalBytesWritten()
+
+       bw.writers.MustClose()
+       bw.reset()
+}
+
+// mustWriteTo writes the block data to writers.
+func (b *block) mustWriteTo(sid common.SeriesID, bm *blockMetadata, sw 
*writers) {
+       if b.Len() == 0 {
+               return
+       }
+
+       // Write user keys
+       keysData := make([]byte, len(b.userKeys)*8)
+       for i, key := range b.userKeys {
+               for j := 0; j < 8; j++ {
+                       keysData[i*8+j] = byte(key >> (j * 8))
+               }
+       }
+       keysOffset := sw.keysWriter.bytesWritten
+       sw.keysWriter.MustWrite(keysData)
+       bm.setKeysBlock(keysOffset, uint64(len(keysData)))
+
+       // Write data payloads
+       var dataBytes []byte
+       for _, payload := range b.data {
+               dataBytes = append(dataBytes, payload...)
+       }
+       compressedData := zstd.Compress(nil, dataBytes, 1)
+       dataOffset := sw.dataWriter.bytesWritten
+       sw.dataWriter.MustWrite(compressedData)
+       bm.setDataBlock(dataOffset, uint64(len(compressedData)))
+
+       // Write tag files
+       for tagName, td := range b.tags {
+               tmw, tdw, tfw := sw.getWriters(tagName)
+
+               // Encode tag values
+               tagData, err := EncodeTagValues(td.values, td.valueType)
+               if err != nil {
+                       logger.Panicf("failed to encode tag values for %s: %v", 
tagName, err)
+               }
+
+               // Write tag data
+               tagDataOffset := tdw.bytesWritten
+               tdw.MustWrite(tagData)
+
+               // Write tag metadata
+               tm := generateTagMetadata()
+               tm.name = tagName
+               tm.valueType = td.valueType
+               tm.indexed = td.indexed
+               tm.dataBlock = dataBlock{offset: tagDataOffset, size: 
uint64(len(tagData))}
+               tm.min = td.min
+               tm.max = td.max
+
+               tmData, err := tm.marshal()
+               if err != nil {
+                       logger.Panicf("failed to marshal tag metadata for %s: 
%v", tagName, err)
+               }
+               tmw.MustWrite(tmData)
+               releaseTagMetadata(tm)
+
+               // Write bloom filter for indexed tags
+               if td.indexed && td.filter != nil {
+                       filterData := encodeBloomFilter(nil, td.filter)
+                       filterOffset := tfw.bytesWritten
+                       tfw.MustWrite(filterData)
+                       bm.addTagBlock(tagName, filterOffset, 
uint64(len(filterData)))
+               }
+       }
+}
+
+// Pool management for block writers and writers.
+var (
+       blockWriterPool = pool.Register[*blockWriter]("sidx-blockWriter")
+       writersPool     = pool.Register[*writers]("sidx-writers")
+       bigValuePool    = pool.Register[*bytes.Buffer]("sidx-bigValue")
+)
+
+// generateBlockWriter gets a block writer from pool or creates new.
+func generateBlockWriter() *blockWriter {
+       v := blockWriterPool.Get()
+       if v == nil {
+               return &blockWriter{
+                       writers: writers{
+                               tagMetadataWriters: make(map[string]*writer),
+                               tagDataWriters:     make(map[string]*writer),
+                               tagFilterWriters:   make(map[string]*writer),
+                       },
+               }
+       }
+       return v
+}
+
+// releaseBlockWriter returns block writer to pool after reset.
+func releaseBlockWriter(bw *blockWriter) {
+       bw.reset()
+       blockWriterPool.Put(bw)
+}
+
+// generateWriters gets writers from pool or creates new.
+func generateWriters() *writers {
+       v := writersPool.Get()
+       if v == nil {
+               return &writers{
+                       tagMetadataWriters: make(map[string]*writer),
+                       tagDataWriters:     make(map[string]*writer),
+                       tagFilterWriters:   make(map[string]*writer),
+               }
+       }
+       return v
+}
+
+// releaseWriters returns writers to pool after reset.
+func releaseWriters(sw *writers) {
+       sw.reset()
+       writersPool.Put(sw)
+}
\ No newline at end of file
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index f23e4d72..a6962603 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -107,9 +107,9 @@ func (p *part) loadPartMetadata() error {
        if err == nil {
                // Parse JSON manifest
                pm := generatePartMetadata()
-               if err := json.Unmarshal(manifestData, pm); err != nil {
+               if unmarshalErr := json.Unmarshal(manifestData, pm); 
unmarshalErr != nil {
                        releasePartMetadata(pm)
-                       return fmt.Errorf("failed to unmarshal manifest.json: 
%w", err)
+                       return fmt.Errorf("failed to unmarshal manifest.json: 
%w", unmarshalErr)
                }
                p.partMetadata = pm
                return nil
@@ -419,8 +419,7 @@ func (mp *memPart) reset() {
        }
 }
 
-// mustInitFromElements initializes the memory part from sorted elements.
-// This method will be completed when blockWriter is implemented.
+// mustInitFromElements initializes the memory part from sorted elements using 
blockWriter.
 func (mp *memPart) mustInitFromElements(es *elements) {
        mp.reset()
 
@@ -431,16 +430,50 @@ func (mp *memPart) mustInitFromElements(es *elements) {
        // Sort elements by seriesID first, then by user key
        sort.Sort(es)
 
-       // TODO: Initialize using blockWriter when implemented in 4.2
-       // For now, we prepare the structure
-
-       // Set basic metadata
+       // Initialize part metadata
        if mp.partMetadata == nil {
                mp.partMetadata = generatePartMetadata()
        }
-       mp.partMetadata.TotalCount = uint64(len(es.userKeys))
-       mp.partMetadata.MinKey = es.userKeys[0]
-       mp.partMetadata.MaxKey = es.userKeys[len(es.userKeys)-1]
+
+       // Initialize block writer for memory part
+       bw := generateBlockWriter()
+       defer releaseBlockWriter(bw)
+
+       bw.MustInitForMemPart(mp)
+
+       // Group elements by seriesID and write to blocks
+       currentSeriesID := es.seriesIDs[0]
+       blockStart := 0
+
+       for i := 1; i <= len(es.seriesIDs); i++ {
+               // Process block when series changes or at end
+               if i == len(es.seriesIDs) || es.seriesIDs[i] != currentSeriesID 
{
+                       // Extract elements for current series
+                       seriesUserKeys := es.userKeys[blockStart:i]
+                       seriesElementIDs := make([]uint64, i-blockStart)
+                       for j := range seriesElementIDs {
+                               seriesElementIDs[j] = uint64(blockStart + j)
+                       }
+                       seriesTags := es.tags[blockStart:i]
+
+                       // Write elements for this series
+                       bw.MustWriteElements(currentSeriesID, seriesUserKeys, 
seriesElementIDs, seriesTags)
+
+                       if i < len(es.seriesIDs) {
+                               currentSeriesID = es.seriesIDs[i]
+                               blockStart = i
+                       }
+               }
+       }
+
+       // Flush the block writer to finalize metadata
+       bw.Flush(mp.partMetadata)
+
+       // Update key range in part metadata
+       if len(es.userKeys) > 0 {
+               mp.partMetadata.MinKey = es.userKeys[0]
+               mp.partMetadata.MaxKey = es.userKeys[len(es.userKeys)-1]
+       }
 }
 
 // mustFlush flushes the memory part to disk with tag-based file organization.

Reply via email to