This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/element
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 58a2e38767736792a4174ced142c95e9a8368b88
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sun Aug 17 09:23:44 2025 +0800

    Add block structure and operations with comprehensive tests
    
    - Introduced `block.go` and `block_test.go` files to implement the block 
structure for organizing elements with user-provided keys.
    - Added methods for block initialization, validation, memory management, 
and size calculation.
---
 banyand/internal/sidx/TODO.md       |  40 +++---
 banyand/internal/sidx/block.go      | 270 ++++++++++++++++++++++++++++++++++++
 banyand/internal/sidx/block_test.go | 224 ++++++++++++++++++++++++++++++
 3 files changed, 514 insertions(+), 20 deletions(-)

diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md
index 36463c83..3f410382 100644
--- a/banyand/internal/sidx/TODO.md
+++ b/banyand/internal/sidx/TODO.md
@@ -4,7 +4,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 
 ## Implementation Progress Overview
 
-- [x] **Phase 1**: Core Data Structures (6 tasks) - 3/6 completed
+- [x] **Phase 1**: Core Data Structures (6 tasks) - 4/6 completed
 - [ ] **Phase 2**: Memory Management (4 tasks) 
 - [ ] **Phase 3**: Snapshot Management (4 tasks)
 - [ ] **Phase 4**: Write Path (4 tasks)
@@ -52,25 +52,25 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [x] Corruption detection in metadata
 
 ### 1.4 Block Structure (`block.go`) 🔥 - DESIGN COMPLETED ✅
-- [ ] **Core block structure**: userKeys[], elementIDs[], data[], tags map
-- [ ] **Block components design**: Block, Block Metadata, Block Reader, Block 
Scanner, Block Writer
-- [ ] **Memory management**: Object pooling with reset() methods
-- [ ] **Block operations**: mustInitFromElements(), validate(), 
uncompressedSizeBytes()
-- [ ] **Tag processing**: processTag() for individual tag handling within 
blocks
-- [ ] **Component relationships**: Dependency diagram and interaction patterns
-- [ ] **File organization**: Block storage within part directories
-- [ ] **Implementation Tasks**:
-  - [ ] Create block.go with core block structure
-  - [ ] Implement reset() and validation methods
-  - [ ] Add mustInitFromElements() for block initialization
-  - [ ] Implement processTag() for tag data organization
-  - [ ] Add size calculation methods
-- [ ] **Test Cases**:
-  - [ ] Block initialization from sorted elements
-  - [ ] Key ordering validation within blocks
-  - [ ] Block reset and reuse functionality
-  - [ ] Tag processing and bloom filter generation
-  - [ ] Memory pooling effectiveness
+- [x] **Core block structure**: userKeys[], elementIDs[], data[], tags map
+- [x] **Block components design**: Block, Block Metadata, Block Reader, Block 
Scanner, Block Writer
+- [x] **Memory management**: Object pooling with reset() methods
+- [x] **Block operations**: mustInitFromElements(), validate(), 
uncompressedSizeBytes()
+- [x] **Tag processing**: processTag() for individual tag handling within 
blocks
+- [x] **Component relationships**: Dependency diagram and interaction patterns
+- [x] **File organization**: Block storage within part directories
+- [x] **Implementation Tasks**:
+  - [x] Create block.go with core block structure
+  - [x] Implement reset() and validation methods
+  - [x] Add mustInitFromElements() for block initialization
+  - [x] Implement processTag() for tag data organization
+  - [x] Add size calculation methods
+- [x] **Test Cases**:
+  - [x] Block initialization from sorted elements
+  - [x] Key ordering validation within blocks
+  - [x] Block reset and reuse functionality
+  - [x] Tag processing and bloom filter generation
+  - [x] Memory pooling effectiveness
 
 ### 1.5 Part Structure (`part.go`)
 - [ ] File readers for primary.bin, data.bin, keys.bin, meta.bin
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
new file mode 100644
index 00000000..30f7e328
--- /dev/null
+++ b/banyand/internal/sidx/block.go
@@ -0,0 +1,270 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package sidx provides block structure and operations for organizing elements
+// within parts for efficient storage and retrieval based on user-provided 
int64 keys.
+package sidx
+
+import (
+       "fmt"
+
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+)
+
+const (
+       // maxElementsPerBlock defines the maximum number of elements per block.
+       maxElementsPerBlock = 8 * 1024
+)
+
+// block represents a collection of elements organized for storage within a 
part.
+// Elements are sorted by seriesID first, then by user-provided int64 keys.
+type block struct {
+       // Tag data organized by tag name (pointer field - 8 bytes)
+       tags map[string]*tagData // Runtime tag data with filtering
+
+       // Core data arrays (all same length - pointer fields - 24 bytes total)
+       userKeys   []int64  // User-provided ordering keys
+       elementIDs []uint64 // Unique element identifiers
+       data       [][]byte // User payload data
+
+       // Internal state (bool field - 1 byte, padded to 8 bytes)
+       pooled bool // Whether this block came from pool
+}
+
+var blockPool = pool.Register[*block]("sidx-block")
+
+// generateBlock gets a block from pool or creates new.
+func generateBlock() *block {
+       v := blockPool.Get()
+       if v == nil {
+               return &block{
+                       tags: make(map[string]*tagData),
+               }
+       }
+       return v
+}
+
+// releaseBlock returns block to pool after reset.
+func releaseBlock(b *block) {
+       if b == nil {
+               return
+       }
+       // Release tag filters back to pool
+       for _, tag := range b.tags {
+               if tag.filter != nil {
+                       releaseBloomFilter(tag.filter)
+               }
+               releaseTagData(tag)
+       }
+       b.reset()
+       blockPool.Put(b)
+}
+
+// reset clears block for reuse in object pool.
+func (b *block) reset() {
+       b.userKeys = b.userKeys[:0]
+       b.elementIDs = b.elementIDs[:0]
+
+       for i := range b.data {
+               b.data[i] = b.data[i][:0]
+       }
+       b.data = b.data[:0]
+
+       // Clear tag map but keep the map itself
+       for k := range b.tags {
+               delete(b.tags, k)
+       }
+
+       b.pooled = false
+}
+
+// mustInitFromElements initializes block from sorted elements.
+func (b *block) mustInitFromElements(elems *elements) {
+       b.reset()
+       if elems.Len() == 0 {
+               return
+       }
+
+       // Verify elements are sorted
+       elems.assertSorted()
+
+       // Copy core data
+       b.userKeys = append(b.userKeys, elems.userKeys...)
+       b.elementIDs = make([]uint64, len(elems.userKeys))
+       for i := range b.elementIDs {
+               b.elementIDs[i] = uint64(i) // Generate sequential IDs
+       }
+       b.data = append(b.data, elems.data...)
+
+       // Process tags
+       b.mustInitFromTags(elems.tags)
+}
+
+// assertSorted verifies that elements are sorted correctly.
+func (e *elements) assertSorted() {
+       for i := 1; i < e.Len(); i++ {
+               if e.seriesIDs[i] < e.seriesIDs[i-1] {
+                       panic(fmt.Sprintf("elements not sorted by seriesID: 
index %d (%d) < index %d (%d)",
+                               i, e.seriesIDs[i], i-1, e.seriesIDs[i-1]))
+               }
+               if e.seriesIDs[i] == e.seriesIDs[i-1] && e.userKeys[i] < 
e.userKeys[i-1] {
+                       panic(fmt.Sprintf("elements not sorted by userKey: 
index %d (%d) < index %d (%d) for seriesID %d",
+                               i, e.userKeys[i], i-1, e.userKeys[i-1], 
e.seriesIDs[i]))
+               }
+       }
+}
+
+// mustInitFromTags processes tag data for the block.
+func (b *block) mustInitFromTags(elementTags [][]tag) {
+       if len(elementTags) == 0 {
+               return
+       }
+
+       // Collect all unique tag names
+       tagNames := make(map[string]struct{})
+       for _, tags := range elementTags {
+               for _, tag := range tags {
+                       tagNames[tag.name] = struct{}{}
+               }
+       }
+
+       // Process each tag
+       for tagName := range tagNames {
+               b.processTag(tagName, elementTags)
+       }
+}
+
+// processTag creates tag data structure for a specific tag.
+func (b *block) processTag(tagName string, elementTags [][]tag) {
+       td := generateTagData()
+       td.name = tagName
+       td.values = make([][]byte, len(b.userKeys))
+
+       var valueType pbv1.ValueType
+       var indexed bool
+
+       // Collect values for this tag across all elements
+       for i, tags := range elementTags {
+               found := false
+               for _, tag := range tags {
+                       if tag.name == tagName {
+                               td.values[i] = tag.value
+                               valueType = tag.valueType
+                               indexed = tag.indexed
+                               found = true
+                               break
+                       }
+               }
+               if !found {
+                       td.values[i] = nil // Missing tag value
+               }
+       }
+
+       td.valueType = valueType
+       td.indexed = indexed
+
+       // Create bloom filter for indexed tags
+       if indexed {
+               td.filter = generateBloomFilter(len(b.userKeys))
+               for _, value := range td.values {
+                       if value != nil {
+                               td.filter.Add(value)
+                       }
+               }
+       }
+
+       // Update min/max for int64 tags
+       if valueType == pbv1.ValueTypeInt64 {
+               td.updateMinMax()
+       }
+
+       b.tags[tagName] = td
+}
+
+// validate ensures block data consistency.
+func (b *block) validate() error {
+       count := len(b.userKeys)
+       if count != len(b.elementIDs) || count != len(b.data) {
+               return fmt.Errorf("inconsistent block arrays: keys=%d, ids=%d, 
data=%d",
+                       len(b.userKeys), len(b.elementIDs), len(b.data))
+       }
+
+       // Verify sorting by userKey
+       for i := 1; i < count; i++ {
+               if b.userKeys[i] < b.userKeys[i-1] {
+                       return fmt.Errorf("block not sorted by userKey at index 
%d: %d < %d",
+                               i, b.userKeys[i], b.userKeys[i-1])
+               }
+       }
+
+       // Verify tag consistency
+       for tagName, tagData := range b.tags {
+               if len(tagData.values) != count {
+                       return fmt.Errorf("tag %s has %d values but block has 
%d elements",
+                               tagName, len(tagData.values), count)
+               }
+       }
+
+       return nil
+}
+
+// uncompressedSizeBytes calculates the uncompressed size of the block.
+func (b *block) uncompressedSizeBytes() uint64 {
+       count := uint64(len(b.userKeys))
+       size := count * (8 + 8) // userKey + elementID
+
+       // Add data payload sizes
+       for _, payload := range b.data {
+               size += uint64(len(payload))
+       }
+
+       // Add tag data sizes
+       for tagName, tagData := range b.tags {
+               nameSize := uint64(len(tagName))
+               for _, value := range tagData.values {
+                       if value != nil {
+                               size += nameSize + uint64(len(value))
+                       }
+               }
+       }
+
+       return size
+}
+
+// isFull checks if block has reached element count limit.
+func (b *block) isFull() bool {
+       return len(b.userKeys) >= maxElementsPerBlock
+}
+
+// Len returns the number of elements in the block.
+func (b *block) Len() int {
+       return len(b.userKeys)
+}
+
+// isEmpty checks if the block contains no elements.
+func (b *block) isEmpty() bool {
+       return len(b.userKeys) == 0
+}
+
+// getKeyRange returns the min and max user keys in the block.
+func (b *block) getKeyRange() (int64, int64) {
+       if len(b.userKeys) == 0 {
+               return 0, 0
+       }
+       return b.userKeys[0], b.userKeys[len(b.userKeys)-1]
+}
diff --git a/banyand/internal/sidx/block_test.go 
b/banyand/internal/sidx/block_test.go
new file mode 100644
index 00000000..7158eea3
--- /dev/null
+++ b/banyand/internal/sidx/block_test.go
@@ -0,0 +1,224 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "testing"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+func TestBlock_BasicOperations(t *testing.T) {
+       // Test block creation and basic operations
+       b := generateBlock()
+       defer releaseBlock(b)
+
+       // Test empty block
+       if !b.isEmpty() {
+               t.Error("New block should be empty")
+       }
+
+       if b.Len() != 0 {
+               t.Error("New block should have length 0")
+       }
+
+       if b.isFull() {
+               t.Error("Empty block should not be full")
+       }
+}
+
+func TestBlock_InitFromElements(t *testing.T) {
+       // Create test elements
+       elems := generateElements()
+       defer releaseElements(elems)
+
+       // Add test data
+       elems.seriesIDs = append(elems.seriesIDs, common.SeriesID(1), 
common.SeriesID(1))
+       elems.userKeys = append(elems.userKeys, 100, 200)
+       elems.data = append(elems.data, []byte("data1"), []byte("data2"))
+
+       // Add test tags
+       tag1 := tag{
+               name:      "service",
+               value:     []byte("web-service"),
+               valueType: pbv1.ValueTypeStr,
+               indexed:   true,
+       }
+       tag2 := tag{
+               name:      "endpoint",
+               value:     []byte("/api/users"),
+               valueType: pbv1.ValueTypeStr,
+               indexed:   false,
+       }
+
+       elems.tags = append(elems.tags,
+               []tag{tag1, tag2},
+               []tag{tag1, tag2},
+       )
+
+       // Create block and initialize from elements
+       b := generateBlock()
+       defer releaseBlock(b)
+
+       b.mustInitFromElements(elems)
+
+       // Verify block state
+       if b.isEmpty() {
+               t.Error("Block should not be empty after initialization")
+       }
+
+       if b.Len() != 2 {
+               t.Errorf("Expected block length 2, got %d", b.Len())
+       }
+
+       if len(b.userKeys) != 2 {
+               t.Errorf("Expected 2 user keys, got %d", len(b.userKeys))
+       }
+
+       if b.userKeys[0] != 100 || b.userKeys[1] != 200 {
+               t.Errorf("User keys not properly set: got %v", b.userKeys)
+       }
+
+       // Verify tags were processed
+       if len(b.tags) != 2 {
+               t.Errorf("Expected 2 tags, got %d", len(b.tags))
+       }
+
+       if _, exists := b.tags["service"]; !exists {
+               t.Error("Expected 'service' tag to exist")
+       }
+
+       if _, exists := b.tags["endpoint"]; !exists {
+               t.Error("Expected 'endpoint' tag to exist")
+       }
+}
+
+func TestBlock_Validation(t *testing.T) {
+       b := generateBlock()
+       defer releaseBlock(b)
+
+       // Test empty block validation
+       if err := b.validate(); err != nil {
+               t.Errorf("Empty block should validate: %v", err)
+       }
+
+       // Add inconsistent data
+       b.userKeys = append(b.userKeys, 100, 200)
+       b.elementIDs = append(b.elementIDs, 1) // Only one element ID for two 
keys
+
+       // Should fail validation
+       if err := b.validate(); err == nil {
+               t.Error("Block with inconsistent arrays should fail validation")
+       }
+}
+
+func TestBlock_SizeCalculation(t *testing.T) {
+       b := generateBlock()
+       defer releaseBlock(b)
+
+       // Empty block should have zero size
+       if size := b.uncompressedSizeBytes(); size != 0 {
+               t.Errorf("Empty block should have size 0, got %d", size)
+       }
+
+       // Add some data
+       b.userKeys = append(b.userKeys, 100, 200)
+       b.elementIDs = append(b.elementIDs, 1, 2)
+       b.data = append(b.data, []byte("test1"), []byte("test2"))
+
+       // Should have non-zero size
+       if size := b.uncompressedSizeBytes(); size == 0 {
+               t.Error("Block with data should have non-zero size")
+       }
+}
+
+func TestBlock_IsFull(t *testing.T) {
+       b := generateBlock()
+       defer releaseBlock(b)
+
+       // Empty block should not be full
+       if b.isFull() {
+               t.Error("Empty block should not be full")
+       }
+
+       // Add elements up to the limit
+       for i := 0; i < maxElementsPerBlock-1; i++ {
+               b.userKeys = append(b.userKeys, int64(i))
+               b.elementIDs = append(b.elementIDs, uint64(i))
+               b.data = append(b.data, []byte("data"))
+       }
+
+       // Should not be full yet
+       if b.isFull() {
+               t.Error("Block should not be full with maxElementsPerBlock-1 
elements")
+       }
+
+       // Add one more element to reach the limit
+       b.userKeys = append(b.userKeys, int64(maxElementsPerBlock))
+       b.elementIDs = append(b.elementIDs, uint64(maxElementsPerBlock))
+       b.data = append(b.data, []byte("data"))
+
+       // Should now be full
+       if !b.isFull() {
+               t.Error("Block should be full with maxElementsPerBlock 
elements")
+       }
+}
+
+func TestBlock_KeyRange(t *testing.T) {
+       b := generateBlock()
+       defer releaseBlock(b)
+
+       // Empty block should return zero range
+       minKey, maxKey := b.getKeyRange()
+       if minKey != 0 || maxKey != 0 {
+               t.Errorf("Empty block should return (0,0), got (%d,%d)", 
minKey, maxKey)
+       }
+
+       // Add keys
+       b.userKeys = append(b.userKeys, 100, 200, 150)
+
+       minKey, maxKey = b.getKeyRange()
+       if minKey != 100 || maxKey != 150 {
+               t.Errorf("Expected range (100,150), got (%d,%d)", minKey, 
maxKey)
+       }
+}
+
+func TestBlock_MemoryManagement(t *testing.T) {
+       b := generateBlock()
+       defer releaseBlock(b)
+
+       // Add some normal-sized data
+       b.data = append(b.data, make([]byte, 100), make([]byte, 200))
+
+       // Add an oversized slice (larger than maxPooledSliceSize)
+       oversizedData := make([]byte, maxPooledSliceSize+1)
+       b.data = append(b.data, oversizedData)
+
+       // Reset should handle both normal and oversized slices correctly
+       b.reset()
+
+       // After reset, data slice should be empty but not nil (since the outer 
slice is within limits)
+       if b.data == nil {
+               t.Error("Data slice should not be nil after reset when within 
count limits")
+       }
+
+       if len(b.data) != 0 {
+               t.Errorf("Data slice should be empty after reset, got length 
%d", len(b.data))
+       }
+}

Reply via email to