This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/element
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 36db9cc6276babba42ef031e4d5ab07f80bbdef8
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sat Aug 16 11:08:50 2025 +0800

    Add metadata structures and validation tests for part and primary block 
metadata
    
    - Introduced `metadata.go` and `metadata_test.go` files to define 
`partMetadata` and `primaryBlockMetadata` structures.
    - Implemented validation methods for metadata integrity, including key 
range checks and size validations.
    - Added comprehensive test cases for serialization, deserialization, and 
corruption detection of metadata.
    - Updated TODO.md to reflect completion of metadata structures and tests.
---
 banyand/internal/sidx/TODO.md          |  20 +-
 banyand/internal/sidx/metadata.go      | 436 ++++++++++++++++++++++
 banyand/internal/sidx/metadata_test.go | 641 +++++++++++++++++++++++++++++++++
 3 files changed, 1087 insertions(+), 10 deletions(-)

diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md
index cafbb7ba..0273586d 100644
--- a/banyand/internal/sidx/TODO.md
+++ b/banyand/internal/sidx/TODO.md
@@ -4,7 +4,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 
 ## Implementation Progress Overview
 
-- [x] **Phase 1**: Core Data Structures (6 tasks) - 2/6 completed
+- [x] **Phase 1**: Core Data Structures (6 tasks) - 3/6 completed
 - [ ] **Phase 2**: Memory Management (4 tasks) 
 - [ ] **Phase 3**: Snapshot Management (4 tasks)
 - [ ] **Phase 4**: Write Path (4 tasks)
@@ -41,15 +41,15 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [x] Filter generation for indexed tags
   - [x] Tag serialization round-trip integrity
 
-### 1.3 Metadata Structures (`metadata.go`)
-- [ ] partMetadata with MinKey/MaxKey (replacing timestamps)
-- [ ] primaryBlockMetadata with block offsets and key ranges
-- [ ] Validation methods for metadata integrity
-- [ ] **Test Cases**:
-  - [ ] Metadata serialization/deserialization
-  - [ ] Key range validation (MinKey <= MaxKey)
-  - [ ] Version compatibility checks
-  - [ ] Corruption detection in metadata
+### 1.3 Metadata Structures (`metadata.go`) ✅
+- [x] partMetadata with MinKey/MaxKey (replacing timestamps)
+- [x] primaryBlockMetadata with block offsets and key ranges
+- [x] Validation methods for metadata integrity
+- [x] **Test Cases**:
+  - [x] Metadata serialization/deserialization
+  - [x] Key range validation (MinKey <= MaxKey)
+  - [x] Version compatibility checks
+  - [x] Corruption detection in metadata
 
 ### 1.4 Block Structure (`block.go`) 🔥
 - [ ] **Core block organization for elements within parts**
diff --git a/banyand/internal/sidx/metadata.go 
b/banyand/internal/sidx/metadata.go
new file mode 100644
index 00000000..5f78f028
--- /dev/null
+++ b/banyand/internal/sidx/metadata.go
@@ -0,0 +1,436 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "bytes"
+       "encoding/binary"
+       "encoding/json"
+       "fmt"
+       "io"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+)
+
+// partMetadata contains metadata for an entire part (replaces 
timestamp-specific metadata from stream module).
+type partMetadata struct {
+       // Size information
+       CompressedSizeBytes   uint64 `json:"compressedSizeBytes"`
+       UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"`
+       TotalCount            uint64 `json:"totalCount"`
+       BlocksCount           uint64 `json:"blocksCount"`
+
+       // Key range (replaces timestamp range from stream module)
+       MinKey int64 `json:"minKey"` // Minimum user key in part
+       MaxKey int64 `json:"maxKey"` // Maximum user key in part
+
+       // Identity
+       ID uint64 `json:"id"` // Unique part identifier
+}
+
+// primaryBlockMetadata contains metadata for a block within a part.
+type primaryBlockMetadata struct {
+       // Block references to files
+       tagsBlocks map[string]dataBlock // References to tag files
+       dataBlock  dataBlock            // Reference to data in data.bin
+       keysBlock  dataBlock            // Reference to keys in keys.bin
+
+       // Block identification
+       seriesID common.SeriesID
+
+       // Key range within block
+       minKey int64 // Minimum user key in block
+       maxKey int64 // Maximum user key in block
+}
+
+var (
+       partMetadataPool         = 
pool.Register[*partMetadata]("sidx-partMetadata")
+       primaryBlockMetadataPool = 
pool.Register[*primaryBlockMetadata]("sidx-primaryBlockMetadata")
+)
+
+// generatePartMetadata gets partMetadata from pool or creates new.
+func generatePartMetadata() *partMetadata {
+       v := partMetadataPool.Get()
+       if v == nil {
+               return &partMetadata{}
+       }
+       return v
+}
+
+// releasePartMetadata returns partMetadata to pool after reset.
+func releasePartMetadata(pm *partMetadata) {
+       if pm == nil {
+               return
+       }
+       pm.reset()
+       partMetadataPool.Put(pm)
+}
+
+// generatePrimaryBlockMetadata gets primaryBlockMetadata from pool or creates 
new.
+func generatePrimaryBlockMetadata() *primaryBlockMetadata {
+       v := primaryBlockMetadataPool.Get()
+       if v == nil {
+               return &primaryBlockMetadata{
+                       tagsBlocks: make(map[string]dataBlock),
+               }
+       }
+       return v
+}
+
+// releasePrimaryBlockMetadata returns primaryBlockMetadata to pool after 
reset.
+func releasePrimaryBlockMetadata(pbm *primaryBlockMetadata) {
+       if pbm == nil {
+               return
+       }
+       pbm.reset()
+       primaryBlockMetadataPool.Put(pbm)
+}
+
+// reset clears partMetadata for reuse in object pool.
+func (pm *partMetadata) reset() {
+       pm.CompressedSizeBytes = 0
+       pm.UncompressedSizeBytes = 0
+       pm.TotalCount = 0
+       pm.BlocksCount = 0
+       pm.MinKey = 0
+       pm.MaxKey = 0
+       pm.ID = 0
+}
+
+// reset clears primaryBlockMetadata for reuse in object pool.
+func (pbm *primaryBlockMetadata) reset() {
+       pbm.seriesID = 0
+       pbm.minKey = 0
+       pbm.maxKey = 0
+       pbm.dataBlock = dataBlock{}
+       pbm.keysBlock = dataBlock{}
+       // Clear the map instead of creating a new one
+       for k := range pbm.tagsBlocks {
+               delete(pbm.tagsBlocks, k)
+       }
+}
+
+// validate validates the partMetadata for consistency.
+func (pm *partMetadata) validate() error {
+       if pm.MinKey > pm.MaxKey {
+               return fmt.Errorf("invalid key range: MinKey (%d) > MaxKey 
(%d)", pm.MinKey, pm.MaxKey)
+       }
+       if pm.CompressedSizeBytes > pm.UncompressedSizeBytes {
+               return fmt.Errorf("invalid size: compressed (%d) > uncompressed 
(%d)",
+                       pm.CompressedSizeBytes, pm.UncompressedSizeBytes)
+       }
+       if pm.BlocksCount == 0 && pm.TotalCount > 0 {
+               return fmt.Errorf("invalid counts: no blocks but has %d 
elements", pm.TotalCount)
+       }
+       return nil
+}
+
+// validate validates the primaryBlockMetadata for consistency.
+func (pbm *primaryBlockMetadata) validate() error {
+       if pbm.minKey > pbm.maxKey {
+               return fmt.Errorf("invalid block key range: minKey (%d) > 
maxKey (%d)", pbm.minKey, pbm.maxKey)
+       }
+       if pbm.seriesID == 0 {
+               return fmt.Errorf("invalid seriesID: cannot be zero")
+       }
+       if pbm.dataBlock.size == 0 {
+               return fmt.Errorf("invalid data block: size cannot be zero")
+       }
+       if pbm.keysBlock.size == 0 {
+               return fmt.Errorf("invalid keys block: size cannot be zero")
+       }
+       return nil
+}
+
+// validatePrimaryBlockMetadata validates ordering of blocks within a part.
+func validatePrimaryBlockMetadata(blocks []primaryBlockMetadata) error {
+       if len(blocks) == 0 {
+               return nil
+       }
+
+       for i := 1; i < len(blocks); i++ {
+               prev := &blocks[i-1]
+               curr := &blocks[i]
+
+               // Validate individual blocks
+               if err := prev.validate(); err != nil {
+                       return fmt.Errorf("block %d validation failed: %w", 
i-1, err)
+               }
+
+               // Check ordering: seriesID first, then minKey
+               if curr.seriesID < prev.seriesID {
+                       return fmt.Errorf("blocks not ordered by seriesID: 
block %d seriesID (%d) < block %d seriesID (%d)",
+                               i, curr.seriesID, i-1, prev.seriesID)
+               }
+
+               // For same seriesID, check key ordering
+               if curr.seriesID == prev.seriesID && curr.minKey < prev.minKey {
+                       return fmt.Errorf("blocks not ordered by key: block %d 
minKey (%d) < block %d minKey (%d) for seriesID %d",
+                               i, curr.minKey, i-1, prev.minKey, curr.seriesID)
+               }
+
+               // Check for overlapping key ranges within same seriesID
+               if curr.seriesID == prev.seriesID && curr.minKey <= prev.maxKey 
{
+                       return fmt.Errorf("overlapping key ranges: block %d 
[%d, %d] overlaps with block %d [%d, %d] for seriesID %d",
+                               i, curr.minKey, curr.maxKey, i-1, prev.minKey, 
prev.maxKey, curr.seriesID)
+               }
+       }
+
+       // Validate the last block
+       if err := blocks[len(blocks)-1].validate(); err != nil {
+               return fmt.Errorf("block %d validation failed: %w", 
len(blocks)-1, err)
+       }
+
+       return nil
+}
+
+// marshal serializes partMetadata to JSON bytes.
+func (pm *partMetadata) marshal() ([]byte, error) {
+       data, err := json.Marshal(pm)
+       if err != nil {
+               return nil, fmt.Errorf("failed to marshal partMetadata to JSON: 
%w", err)
+       }
+
+       return data, nil
+}
+
+// unmarshalPartMetadata deserializes partMetadata from JSON bytes.
+func unmarshalPartMetadata(data []byte) (*partMetadata, error) {
+       if len(data) == 0 {
+               return nil, fmt.Errorf("empty data provided")
+       }
+
+       pm := generatePartMetadata()
+
+       if err := json.Unmarshal(data, pm); err != nil {
+               releasePartMetadata(pm)
+               return nil, fmt.Errorf("failed to unmarshal partMetadata from 
JSON: %w", err)
+       }
+
+       // Validate the metadata
+       if err := pm.validate(); err != nil {
+               releasePartMetadata(pm)
+               return nil, fmt.Errorf("metadata validation failed: %w", err)
+       }
+
+       return pm, nil
+}
+
+// marshal serializes primaryBlockMetadata to bytes.
+func (pbm *primaryBlockMetadata) marshal() ([]byte, error) {
+       buf := &bytes.Buffer{}
+
+       // Write seriesID
+       if err := binary.Write(buf, binary.LittleEndian, pbm.seriesID); err != 
nil {
+               return nil, fmt.Errorf("failed to write seriesID: %w", err)
+       }
+
+       // Write key range
+       if err := binary.Write(buf, binary.LittleEndian, pbm.minKey); err != 
nil {
+               return nil, fmt.Errorf("failed to write minKey: %w", err)
+       }
+       if err := binary.Write(buf, binary.LittleEndian, pbm.maxKey); err != 
nil {
+               return nil, fmt.Errorf("failed to write maxKey: %w", err)
+       }
+
+       // Write data block
+       if err := binary.Write(buf, binary.LittleEndian, pbm.dataBlock.offset); 
err != nil {
+               return nil, fmt.Errorf("failed to write data block offset: %w", 
err)
+       }
+       if err := binary.Write(buf, binary.LittleEndian, pbm.dataBlock.size); 
err != nil {
+               return nil, fmt.Errorf("failed to write data block size: %w", 
err)
+       }
+
+       // Write keys block
+       if err := binary.Write(buf, binary.LittleEndian, pbm.keysBlock.offset); 
err != nil {
+               return nil, fmt.Errorf("failed to write keys block offset: %w", 
err)
+       }
+       if err := binary.Write(buf, binary.LittleEndian, pbm.keysBlock.size); 
err != nil {
+               return nil, fmt.Errorf("failed to write keys block size: %w", 
err)
+       }
+
+       // Write tag blocks count
+       if err := binary.Write(buf, binary.LittleEndian, 
uint32(len(pbm.tagsBlocks))); err != nil {
+               return nil, fmt.Errorf("failed to write tag blocks count: %w", 
err)
+       }
+
+       // Write tag blocks
+       for tagName, tagBlock := range pbm.tagsBlocks {
+               // Write tag name length and name
+               nameBytes := []byte(tagName)
+               if err := binary.Write(buf, binary.LittleEndian, 
uint32(len(nameBytes))); err != nil {
+                       return nil, fmt.Errorf("failed to write tag name 
length: %w", err)
+               }
+               if _, err := buf.Write(nameBytes); err != nil {
+                       return nil, fmt.Errorf("failed to write tag name: %w", 
err)
+               }
+
+               // Write tag block
+               if err := binary.Write(buf, binary.LittleEndian, 
tagBlock.offset); err != nil {
+                       return nil, fmt.Errorf("failed to write tag block 
offset: %w", err)
+               }
+               if err := binary.Write(buf, binary.LittleEndian, 
tagBlock.size); err != nil {
+                       return nil, fmt.Errorf("failed to write tag block size: 
%w", err)
+               }
+       }
+
+       return buf.Bytes(), nil
+}
+
+// unmarshalPrimaryBlockMetadata deserializes primaryBlockMetadata from bytes.
+func unmarshalPrimaryBlockMetadata(data []byte) (*primaryBlockMetadata, error) 
{
+       pbm := generatePrimaryBlockMetadata()
+       buf := bytes.NewReader(data)
+
+       // Read seriesID
+       if err := binary.Read(buf, binary.LittleEndian, &pbm.seriesID); err != 
nil {
+               releasePrimaryBlockMetadata(pbm)
+               return nil, fmt.Errorf("failed to read seriesID: %w", err)
+       }
+
+       // Read key range
+       if err := binary.Read(buf, binary.LittleEndian, &pbm.minKey); err != 
nil {
+               releasePrimaryBlockMetadata(pbm)
+               return nil, fmt.Errorf("failed to read minKey: %w", err)
+       }
+       if err := binary.Read(buf, binary.LittleEndian, &pbm.maxKey); err != 
nil {
+               releasePrimaryBlockMetadata(pbm)
+               return nil, fmt.Errorf("failed to read maxKey: %w", err)
+       }
+
+       // Read data block
+       if err := binary.Read(buf, binary.LittleEndian, &pbm.dataBlock.offset); 
err != nil {
+               releasePrimaryBlockMetadata(pbm)
+               return nil, fmt.Errorf("failed to read data block offset: %w", 
err)
+       }
+       if err := binary.Read(buf, binary.LittleEndian, &pbm.dataBlock.size); 
err != nil {
+               releasePrimaryBlockMetadata(pbm)
+               return nil, fmt.Errorf("failed to read data block size: %w", 
err)
+       }
+
+       // Read keys block
+       if err := binary.Read(buf, binary.LittleEndian, &pbm.keysBlock.offset); 
err != nil {
+               releasePrimaryBlockMetadata(pbm)
+               return nil, fmt.Errorf("failed to read keys block offset: %w", 
err)
+       }
+       if err := binary.Read(buf, binary.LittleEndian, &pbm.keysBlock.size); 
err != nil {
+               releasePrimaryBlockMetadata(pbm)
+               return nil, fmt.Errorf("failed to read keys block size: %w", 
err)
+       }
+
+       // Read tag blocks count
+       var tagBlocksCount uint32
+       if err := binary.Read(buf, binary.LittleEndian, &tagBlocksCount); err 
!= nil {
+               releasePrimaryBlockMetadata(pbm)
+               return nil, fmt.Errorf("failed to read tag blocks count: %w", 
err)
+       }
+
+       // Read tag blocks
+       for i := uint32(0); i < tagBlocksCount; i++ {
+               // Read tag name
+               var nameLen uint32
+               if err := binary.Read(buf, binary.LittleEndian, &nameLen); err 
!= nil {
+                       releasePrimaryBlockMetadata(pbm)
+                       return nil, fmt.Errorf("failed to read tag name length: 
%w", err)
+               }
+               nameBytes := make([]byte, nameLen)
+               if _, err := io.ReadFull(buf, nameBytes); err != nil {
+                       releasePrimaryBlockMetadata(pbm)
+                       return nil, fmt.Errorf("failed to read tag name: %w", 
err)
+               }
+               tagName := string(nameBytes)
+
+               // Read tag block
+               var tagBlock dataBlock
+               if err := binary.Read(buf, binary.LittleEndian, 
&tagBlock.offset); err != nil {
+                       releasePrimaryBlockMetadata(pbm)
+                       return nil, fmt.Errorf("failed to read tag block 
offset: %w", err)
+               }
+               if err := binary.Read(buf, binary.LittleEndian, 
&tagBlock.size); err != nil {
+                       releasePrimaryBlockMetadata(pbm)
+                       return nil, fmt.Errorf("failed to read tag block size: 
%w", err)
+               }
+
+               pbm.tagsBlocks[tagName] = tagBlock
+       }
+
+       // Validate the metadata
+       if err := pbm.validate(); err != nil {
+               releasePrimaryBlockMetadata(pbm)
+               return nil, fmt.Errorf("block metadata validation failed: %w", 
err)
+       }
+
+       return pbm, nil
+}
+
+// SeriesID returns the seriesID of the block.
+func (pbm *primaryBlockMetadata) SeriesID() common.SeriesID {
+       return pbm.seriesID
+}
+
+// MinKey returns the minimum user key in the block.
+func (pbm *primaryBlockMetadata) MinKey() int64 {
+       return pbm.minKey
+}
+
+// MaxKey returns the maximum user key in the block.
+func (pbm *primaryBlockMetadata) MaxKey() int64 {
+       return pbm.maxKey
+}
+
+// DataBlock returns the data block reference.
+func (pbm *primaryBlockMetadata) DataBlock() dataBlock {
+       return pbm.dataBlock
+}
+
+// KeysBlock returns the keys block reference.
+func (pbm *primaryBlockMetadata) KeysBlock() dataBlock {
+       return pbm.keysBlock
+}
+
+// TagsBlocks returns the tag blocks references.
+func (pbm *primaryBlockMetadata) TagsBlocks() map[string]dataBlock {
+       return pbm.tagsBlocks
+}
+
+// setSeriesID sets the seriesID of the block.
+func (pbm *primaryBlockMetadata) setSeriesID(seriesID common.SeriesID) {
+       pbm.seriesID = seriesID
+}
+
+// setKeyRange sets the key range of the block.
+func (pbm *primaryBlockMetadata) setKeyRange(minKey, maxKey int64) {
+       pbm.minKey = minKey
+       pbm.maxKey = maxKey
+}
+
+// setDataBlock sets the data block reference.
+func (pbm *primaryBlockMetadata) setDataBlock(offset, size uint64) {
+       pbm.dataBlock = dataBlock{offset: offset, size: size}
+}
+
+// setKeysBlock sets the keys block reference.
+func (pbm *primaryBlockMetadata) setKeysBlock(offset, size uint64) {
+       pbm.keysBlock = dataBlock{offset: offset, size: size}
+}
+
+// addTagBlock adds a tag block reference.
+func (pbm *primaryBlockMetadata) addTagBlock(tagName string, offset, size 
uint64) {
+       pbm.tagsBlocks[tagName] = dataBlock{offset: offset, size: size}
+}
diff --git a/banyand/internal/sidx/metadata_test.go 
b/banyand/internal/sidx/metadata_test.go
new file mode 100644
index 00000000..99f6c205
--- /dev/null
+++ b/banyand/internal/sidx/metadata_test.go
@@ -0,0 +1,641 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "encoding/json"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+)
+
+func TestPartMetadata_Validation(t *testing.T) {
+       tests := []struct {
+               metadata  *partMetadata
+               name      string
+               errMsg    string
+               expectErr bool
+       }{
+               {
+                       name: "valid metadata",
+                       metadata: &partMetadata{
+                               CompressedSizeBytes:   100,
+                               UncompressedSizeBytes: 200,
+                               TotalCount:            10,
+                               BlocksCount:           2,
+                               MinKey:                1,
+                               MaxKey:                100,
+                               ID:                    1,
+                       },
+                       expectErr: false,
+               },
+               {
+                       name: "invalid key range - MinKey > MaxKey",
+                       metadata: &partMetadata{
+                               CompressedSizeBytes:   100,
+                               UncompressedSizeBytes: 200,
+                               TotalCount:            10,
+                               BlocksCount:           2,
+                               MinKey:                100,
+                               MaxKey:                1,
+                               ID:                    1,
+                       },
+                       expectErr: true,
+                       errMsg:    "invalid key range",
+               },
+               {
+                       name: "invalid size - compressed > uncompressed",
+                       metadata: &partMetadata{
+                               CompressedSizeBytes:   300,
+                               UncompressedSizeBytes: 200,
+                               TotalCount:            10,
+                               BlocksCount:           2,
+                               MinKey:                1,
+                               MaxKey:                100,
+                               ID:                    1,
+                       },
+                       expectErr: true,
+                       errMsg:    "invalid size",
+               },
+               {
+                       name: "invalid counts - no blocks but has elements",
+                       metadata: &partMetadata{
+                               CompressedSizeBytes:   100,
+                               UncompressedSizeBytes: 200,
+                               TotalCount:            10,
+                               BlocksCount:           0,
+                               MinKey:                1,
+                               MaxKey:                100,
+                               ID:                    1,
+                       },
+                       expectErr: true,
+                       errMsg:    "invalid counts",
+               },
+               {
+                       name: "equal min and max keys",
+                       metadata: &partMetadata{
+                               CompressedSizeBytes:   100,
+                               UncompressedSizeBytes: 200,
+                               TotalCount:            1,
+                               BlocksCount:           1,
+                               MinKey:                50,
+                               MaxKey:                50,
+                               ID:                    1,
+                       },
+                       expectErr: false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       err := tt.metadata.validate()
+                       if tt.expectErr {
+                               require.Error(t, err)
+                               assert.Contains(t, err.Error(), tt.errMsg)
+                       } else {
+                               require.NoError(t, err)
+                       }
+               })
+       }
+}
+
+func TestPrimaryBlockMetadata_Validation(t *testing.T) {
+       tests := []struct {
+               metadata  *primaryBlockMetadata
+               name      string
+               errMsg    string
+               expectErr bool
+       }{
+               {
+                       name: "valid block metadata",
+                       metadata: &primaryBlockMetadata{
+                               seriesID:   1,
+                               minKey:     1,
+                               maxKey:     100,
+                               dataBlock:  dataBlock{offset: 0, size: 1024},
+                               keysBlock:  dataBlock{offset: 1024, size: 256},
+                               tagsBlocks: map[string]dataBlock{"tag1": 
{offset: 1280, size: 512}},
+                       },
+                       expectErr: false,
+               },
+               {
+                       name: "invalid key range - minKey > maxKey",
+                       metadata: &primaryBlockMetadata{
+                               seriesID:   1,
+                               minKey:     100,
+                               maxKey:     1,
+                               dataBlock:  dataBlock{offset: 0, size: 1024},
+                               keysBlock:  dataBlock{offset: 1024, size: 256},
+                               tagsBlocks: map[string]dataBlock{},
+                       },
+                       expectErr: true,
+                       errMsg:    "invalid block key range",
+               },
+               {
+                       name: "invalid seriesID - zero",
+                       metadata: &primaryBlockMetadata{
+                               seriesID:   0,
+                               minKey:     1,
+                               maxKey:     100,
+                               dataBlock:  dataBlock{offset: 0, size: 1024},
+                               keysBlock:  dataBlock{offset: 1024, size: 256},
+                               tagsBlocks: map[string]dataBlock{},
+                       },
+                       expectErr: true,
+                       errMsg:    "invalid seriesID",
+               },
+               {
+                       name: "invalid data block - zero size",
+                       metadata: &primaryBlockMetadata{
+                               seriesID:   1,
+                               minKey:     1,
+                               maxKey:     100,
+                               dataBlock:  dataBlock{offset: 0, size: 0},
+                               keysBlock:  dataBlock{offset: 1024, size: 256},
+                               tagsBlocks: map[string]dataBlock{},
+                       },
+                       expectErr: true,
+                       errMsg:    "invalid data block",
+               },
+               {
+                       name: "invalid keys block - zero size",
+                       metadata: &primaryBlockMetadata{
+                               seriesID:   1,
+                               minKey:     1,
+                               maxKey:     100,
+                               dataBlock:  dataBlock{offset: 0, size: 1024},
+                               keysBlock:  dataBlock{offset: 1024, size: 0},
+                               tagsBlocks: map[string]dataBlock{},
+                       },
+                       expectErr: true,
+                       errMsg:    "invalid keys block",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       err := tt.metadata.validate()
+                       if tt.expectErr {
+                               require.Error(t, err)
+                               assert.Contains(t, err.Error(), tt.errMsg)
+                       } else {
+                               require.NoError(t, err)
+                       }
+               })
+       }
+}
+
+func TestValidatePrimaryBlockMetadata(t *testing.T) {
+       tests := []struct {
+               name      string
+               errMsg    string
+               blocks    []primaryBlockMetadata
+               expectErr bool
+       }{
+               {
+                       name:      "empty blocks",
+                       blocks:    []primaryBlockMetadata{},
+                       expectErr: false,
+               },
+               {
+                       name: "single valid block",
+                       blocks: []primaryBlockMetadata{
+                               {
+                                       seriesID:   1,
+                                       minKey:     1,
+                                       maxKey:     100,
+                                       dataBlock:  dataBlock{offset: 0, size: 
1024},
+                                       keysBlock:  dataBlock{offset: 1024, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                       },
+                       expectErr: false,
+               },
+               {
+                       name: "properly ordered blocks by seriesID",
+                       blocks: []primaryBlockMetadata{
+                               {
+                                       seriesID:   1,
+                                       minKey:     1,
+                                       maxKey:     100,
+                                       dataBlock:  dataBlock{offset: 0, size: 
1024},
+                                       keysBlock:  dataBlock{offset: 1024, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                               {
+                                       seriesID:   2,
+                                       minKey:     1,
+                                       maxKey:     50,
+                                       dataBlock:  dataBlock{offset: 1280, 
size: 1024},
+                                       keysBlock:  dataBlock{offset: 2304, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                       },
+                       expectErr: false,
+               },
+               {
+                       name: "properly ordered blocks by key within same 
seriesID",
+                       blocks: []primaryBlockMetadata{
+                               {
+                                       seriesID:   1,
+                                       minKey:     1,
+                                       maxKey:     50,
+                                       dataBlock:  dataBlock{offset: 0, size: 
1024},
+                                       keysBlock:  dataBlock{offset: 1024, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                               {
+                                       seriesID:   1,
+                                       minKey:     51,
+                                       maxKey:     100,
+                                       dataBlock:  dataBlock{offset: 1280, 
size: 1024},
+                                       keysBlock:  dataBlock{offset: 2304, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                       },
+                       expectErr: false,
+               },
+               {
+                       name: "improperly ordered blocks by seriesID",
+                       blocks: []primaryBlockMetadata{
+                               {
+                                       seriesID:   2,
+                                       minKey:     1,
+                                       maxKey:     100,
+                                       dataBlock:  dataBlock{offset: 0, size: 
1024},
+                                       keysBlock:  dataBlock{offset: 1024, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                               {
+                                       seriesID:   1,
+                                       minKey:     1,
+                                       maxKey:     50,
+                                       dataBlock:  dataBlock{offset: 1280, 
size: 1024},
+                                       keysBlock:  dataBlock{offset: 2304, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                       },
+                       expectErr: true,
+                       errMsg:    "blocks not ordered by seriesID",
+               },
+               {
+                       name: "improperly ordered blocks by key within same 
seriesID",
+                       blocks: []primaryBlockMetadata{
+                               {
+                                       seriesID:   1,
+                                       minKey:     51,
+                                       maxKey:     100,
+                                       dataBlock:  dataBlock{offset: 0, size: 
1024},
+                                       keysBlock:  dataBlock{offset: 1024, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                               {
+                                       seriesID:   1,
+                                       minKey:     1,
+                                       maxKey:     50,
+                                       dataBlock:  dataBlock{offset: 1280, 
size: 1024},
+                                       keysBlock:  dataBlock{offset: 2304, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                       },
+                       expectErr: true,
+                       errMsg:    "blocks not ordered by key",
+               },
+               {
+                       name: "overlapping key ranges within same seriesID",
+                       blocks: []primaryBlockMetadata{
+                               {
+                                       seriesID:   1,
+                                       minKey:     1,
+                                       maxKey:     50,
+                                       dataBlock:  dataBlock{offset: 0, size: 
1024},
+                                       keysBlock:  dataBlock{offset: 1024, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                               {
+                                       seriesID:   1,
+                                       minKey:     40,
+                                       maxKey:     80,
+                                       dataBlock:  dataBlock{offset: 1280, 
size: 1024},
+                                       keysBlock:  dataBlock{offset: 2304, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                       },
+                       expectErr: true,
+                       errMsg:    "overlapping key ranges",
+               },
+               {
+                       name: "adjacent key ranges within same seriesID 
(valid)",
+                       blocks: []primaryBlockMetadata{
+                               {
+                                       seriesID:   1,
+                                       minKey:     1,
+                                       maxKey:     50,
+                                       dataBlock:  dataBlock{offset: 0, size: 
1024},
+                                       keysBlock:  dataBlock{offset: 1024, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                               {
+                                       seriesID:   1,
+                                       minKey:     51,
+                                       maxKey:     100,
+                                       dataBlock:  dataBlock{offset: 1280, 
size: 1024},
+                                       keysBlock:  dataBlock{offset: 2304, 
size: 256},
+                                       tagsBlocks: map[string]dataBlock{},
+                               },
+                       },
+                       expectErr: false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       err := validatePrimaryBlockMetadata(tt.blocks)
+                       if tt.expectErr {
+                               require.Error(t, err)
+                               assert.Contains(t, err.Error(), tt.errMsg)
+                       } else {
+                               require.NoError(t, err)
+                       }
+               })
+       }
+}
+
+func TestPartMetadata_Serialization(t *testing.T) {
+       original := &partMetadata{
+               CompressedSizeBytes:   1000,
+               UncompressedSizeBytes: 2000,
+               TotalCount:            50,
+               BlocksCount:           5,
+               MinKey:                10,
+               MaxKey:                1000,
+               ID:                    12345,
+       }
+
+       // Test marshaling
+       data, err := original.marshal()
+       require.NoError(t, err)
+       assert.NotEmpty(t, data)
+
+       // Test unmarshaling
+       restored, err := unmarshalPartMetadata(data)
+       require.NoError(t, err)
+       defer releasePartMetadata(restored)
+
+       // Verify all fields match
+       assert.Equal(t, original.CompressedSizeBytes, 
restored.CompressedSizeBytes)
+       assert.Equal(t, original.UncompressedSizeBytes, 
restored.UncompressedSizeBytes)
+       assert.Equal(t, original.TotalCount, restored.TotalCount)
+       assert.Equal(t, original.BlocksCount, restored.BlocksCount)
+       assert.Equal(t, original.MinKey, restored.MinKey)
+       assert.Equal(t, original.MaxKey, restored.MaxKey)
+       assert.Equal(t, original.ID, restored.ID)
+}
+
+func TestPrimaryBlockMetadata_Serialization(t *testing.T) {
+       original := &primaryBlockMetadata{
+               seriesID:  common.SeriesID(123),
+               minKey:    10,
+               maxKey:    100,
+               dataBlock: dataBlock{offset: 1000, size: 2048},
+               keysBlock: dataBlock{offset: 3048, size: 512},
+               tagsBlocks: map[string]dataBlock{
+                       "service_id": {offset: 3560, size: 256},
+                       "endpoint":   {offset: 3816, size: 512},
+                       "status":     {offset: 4328, size: 128},
+               },
+       }
+
+       // Test marshaling
+       data, err := original.marshal()
+       require.NoError(t, err)
+       assert.NotEmpty(t, data)
+
+       // Test unmarshaling
+       restored, err := unmarshalPrimaryBlockMetadata(data)
+       require.NoError(t, err)
+       defer releasePrimaryBlockMetadata(restored)
+
+       // Verify all fields match
+       assert.Equal(t, original.seriesID, restored.seriesID)
+       assert.Equal(t, original.minKey, restored.minKey)
+       assert.Equal(t, original.maxKey, restored.maxKey)
+       assert.Equal(t, original.dataBlock, restored.dataBlock)
+       assert.Equal(t, original.keysBlock, restored.keysBlock)
+       assert.Equal(t, len(original.tagsBlocks), len(restored.tagsBlocks))
+
+       // Verify tag blocks match
+       for tagName, originalBlock := range original.tagsBlocks {
+               restoredBlock, exists := restored.tagsBlocks[tagName]
+               assert.True(t, exists, "Tag block %s should exist", tagName)
+               assert.Equal(t, originalBlock, restoredBlock)
+       }
+}
+
+func TestPartMetadata_CorruptionDetection(t *testing.T) {
+       tests := []struct {
+               name        string
+               errMsg      string
+               corruptData []byte
+       }{
+               {
+                       name:        "empty data",
+                       corruptData: []byte{},
+                       errMsg:      "empty data provided",
+               },
+               {
+                       name:        "invalid JSON",
+                       corruptData: []byte("{invalid json"),
+                       errMsg:      "failed to unmarshal partMetadata from 
JSON",
+               },
+               {
+                       name:        "invalid key range in JSON",
+                       corruptData: 
[]byte(`{"compressedSizeBytes":1000,"uncompressedSizeBytes":2000,"totalCount":50,"blocksCount":5,"minKey":1000,"maxKey":10,"id":12345}`),
+                       errMsg:      "metadata validation failed",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       _, err := unmarshalPartMetadata(tt.corruptData)
+                       require.Error(t, err)
+                       assert.Contains(t, err.Error(), tt.errMsg)
+               })
+       }
+}
+
+func TestPartMetadata_JSONFormat(t *testing.T) {
+       // Test that the JSON format is human-readable and contains expected 
fields
+       original := &partMetadata{
+               CompressedSizeBytes:   1000,
+               UncompressedSizeBytes: 2000,
+               TotalCount:            50,
+               BlocksCount:           5,
+               MinKey:                10,
+               MaxKey:                1000,
+               ID:                    12345,
+       }
+
+       data, err := original.marshal()
+       require.NoError(t, err)
+
+       // Verify it's valid JSON
+       var parsed map[string]interface{}
+       err = json.Unmarshal(data, &parsed)
+       require.NoError(t, err)
+
+       // Verify expected fields are present
+       assert.Equal(t, float64(1000), parsed["compressedSizeBytes"])
+       assert.Equal(t, float64(2000), parsed["uncompressedSizeBytes"])
+       assert.Equal(t, float64(50), parsed["totalCount"])
+       assert.Equal(t, float64(5), parsed["blocksCount"])
+       assert.Equal(t, float64(10), parsed["minKey"])
+       assert.Equal(t, float64(1000), parsed["maxKey"])
+       assert.Equal(t, float64(12345), parsed["id"])
+
+       // Verify JSON is human-readable (contains field names)
+       jsonStr := string(data)
+       assert.Contains(t, jsonStr, "compressedSizeBytes")
+       assert.Contains(t, jsonStr, "minKey")
+       assert.Contains(t, jsonStr, "maxKey")
+}
+
+func TestPrimaryBlockMetadata_AccessorMethods(t *testing.T) {
+       pbm := &primaryBlockMetadata{
+               seriesID:  common.SeriesID(123),
+               minKey:    10,
+               maxKey:    100,
+               dataBlock: dataBlock{offset: 1000, size: 2048},
+               keysBlock: dataBlock{offset: 3048, size: 512},
+               tagsBlocks: map[string]dataBlock{
+                       "tag1": {offset: 3560, size: 256},
+               },
+       }
+
+       // Test accessor methods
+       assert.Equal(t, common.SeriesID(123), pbm.SeriesID())
+       assert.Equal(t, int64(10), pbm.MinKey())
+       assert.Equal(t, int64(100), pbm.MaxKey())
+       assert.Equal(t, dataBlock{offset: 1000, size: 2048}, pbm.DataBlock())
+       assert.Equal(t, dataBlock{offset: 3048, size: 512}, pbm.KeysBlock())
+       assert.Equal(t, map[string]dataBlock{"tag1": {offset: 3560, size: 
256}}, pbm.TagsBlocks())
+}
+
+func TestPrimaryBlockMetadata_SetterMethods(t *testing.T) {
+       pbm := generatePrimaryBlockMetadata()
+       defer releasePrimaryBlockMetadata(pbm)
+
+       // Test setter methods
+       pbm.setSeriesID(common.SeriesID(456))
+       assert.Equal(t, common.SeriesID(456), pbm.seriesID)
+
+       pbm.setKeyRange(20, 200)
+       assert.Equal(t, int64(20), pbm.minKey)
+       assert.Equal(t, int64(200), pbm.maxKey)
+
+       pbm.setDataBlock(2000, 4096)
+       assert.Equal(t, dataBlock{offset: 2000, size: 4096}, pbm.dataBlock)
+
+       pbm.setKeysBlock(6096, 1024)
+       assert.Equal(t, dataBlock{offset: 6096, size: 1024}, pbm.keysBlock)
+
+       pbm.addTagBlock("test_tag", 7120, 512)
+       expected := dataBlock{offset: 7120, size: 512}
+       assert.Equal(t, expected, pbm.tagsBlocks["test_tag"])
+}
+
+func TestMetadata_Pooling(t *testing.T) {
+       // Test partMetadata pooling
+       pm1 := generatePartMetadata()
+       pm1.ID = 123
+       pm1.MinKey = 10
+       pm1.MaxKey = 100
+
+       releasePartMetadata(pm1)
+
+       pm2 := generatePartMetadata()
+       // pm2 should be the same instance as pm1, but reset
+       assert.Equal(t, uint64(0), pm2.ID)
+       assert.Equal(t, int64(0), pm2.MinKey)
+       assert.Equal(t, int64(0), pm2.MaxKey)
+
+       releasePartMetadata(pm2)
+
+       // Test primaryBlockMetadata pooling
+       pbm1 := generatePrimaryBlockMetadata()
+       pbm1.seriesID = 456
+       pbm1.minKey = 20
+       pbm1.tagsBlocks["test"] = dataBlock{offset: 100, size: 200}
+
+       releasePrimaryBlockMetadata(pbm1)
+
+       pbm2 := generatePrimaryBlockMetadata()
+       // pbm2 should be the same instance as pbm1, but reset
+       assert.Equal(t, common.SeriesID(0), pbm2.seriesID)
+       assert.Equal(t, int64(0), pbm2.minKey)
+       assert.Equal(t, 0, len(pbm2.tagsBlocks))
+
+       releasePrimaryBlockMetadata(pbm2)
+}
+
+func TestMetadata_Reset(t *testing.T) {
+       // Test partMetadata reset
+       pm := &partMetadata{
+               CompressedSizeBytes:   1000,
+               UncompressedSizeBytes: 2000,
+               TotalCount:            50,
+               BlocksCount:           5,
+               MinKey:                10,
+               MaxKey:                1000,
+               ID:                    12345,
+       }
+
+       pm.reset()
+
+       assert.Equal(t, uint64(0), pm.CompressedSizeBytes)
+       assert.Equal(t, uint64(0), pm.UncompressedSizeBytes)
+       assert.Equal(t, uint64(0), pm.TotalCount)
+       assert.Equal(t, uint64(0), pm.BlocksCount)
+       assert.Equal(t, int64(0), pm.MinKey)
+       assert.Equal(t, int64(0), pm.MaxKey)
+       assert.Equal(t, uint64(0), pm.ID)
+
+       // Test primaryBlockMetadata reset
+       pbm := &primaryBlockMetadata{
+               seriesID:  123,
+               minKey:    10,
+               maxKey:    100,
+               dataBlock: dataBlock{offset: 1000, size: 2048},
+               keysBlock: dataBlock{offset: 3048, size: 512},
+               tagsBlocks: map[string]dataBlock{
+                       "tag1": {offset: 3560, size: 256},
+               },
+       }
+
+       pbm.reset()
+
+       assert.Equal(t, common.SeriesID(0), pbm.seriesID)
+       assert.Equal(t, int64(0), pbm.minKey)
+       assert.Equal(t, int64(0), pbm.maxKey)
+       assert.Equal(t, dataBlock{}, pbm.dataBlock)
+       assert.Equal(t, dataBlock{}, pbm.keysBlock)
+       assert.Equal(t, 0, len(pbm.tagsBlocks))
+}


Reply via email to