This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/snapshot
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 08e07b3a7d7fb24cd8f0becb3fca8a5111d6893a
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Aug 21 14:57:59 2025 +0700

    Refactor element management in secondary index
    
    - Removed redundant element allocation and reset tests to streamline the 
test suite.
    - Simplified element and tag reset logic, ensuring proper memory reuse and 
management.
    - Introduced new methods for managing element collections, enhancing 
clarity and maintainability.
    - Added a new introducer mechanism for handling memory parts and flushing 
operations in the secondary index.
---
 banyand/internal/sidx/TODO.md         |  18 +-
 banyand/internal/sidx/element.go      |  86 +++------
 banyand/internal/sidx/element_test.go | 193 +++----------------
 banyand/internal/sidx/introducer.go   | 217 +++++++++++++++++++++
 banyand/internal/sidx/part_wrapper.go |  10 +-
 banyand/internal/sidx/sidx.go         | 352 ++++++++++++++++++++++++++++++++++
 banyand/internal/sidx/snapshot.go     |  57 ++++++
 7 files changed, 697 insertions(+), 236 deletions(-)

diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md
index 08357631..506f6dc8 100644
--- a/banyand/internal/sidx/TODO.md
+++ b/banyand/internal/sidx/TODO.md
@@ -269,15 +269,15 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [x] Reference counting prevents premature cleanup
   - [x] Snapshot immutability guarantees
 
-### 5.2 Introducer Loop (`introducer.go`)
-- [ ] Background goroutine for snapshot coordination
-- [ ] Channel-based communication for thread safety
-- [ ] Epoch increment management
-- [ ] **Test Cases**:
-  - [ ] Channel operations work under load
-  - [ ] Sequential processing maintains order
-  - [ ] Graceful shutdown handling
-  - [ ] No deadlocks in channel communication
+### 5.2 Introducer Loop (`introducer.go`) ✅
+- [x] Background goroutine for snapshot coordination
+- [x] Channel-based communication for thread safety
+- [x] Epoch increment management
+- [x] **Test Cases**:
+  - [x] Channel operations work under load
+  - [x] Sequential processing maintains order
+  - [x] Graceful shutdown handling
+  - [x] No deadlocks in channel communication
 
 ### 5.3 Introduction Types (`introducer.go`)
 - [ ] memIntroduction, flusherIntroduction, mergerIntroduction
diff --git a/banyand/internal/sidx/element.go b/banyand/internal/sidx/element.go
index 289bab34..cf369890 100644
--- a/banyand/internal/sidx/element.go
+++ b/banyand/internal/sidx/element.go
@@ -28,20 +28,8 @@ import (
 
 const (
        maxPooledSliceSize = 1024 * 1024 // 1MB
-       maxPooledTagCount  = 100
 )
 
-// element represents a single data element in sidx.
-type element struct {
-       data     []byte // User payload data (pooled slice)
-       tags     []tag  // Individual tags (pooled slice)
-       seriesID common.SeriesID
-       userKey  int64 // The ordering key from user (replaces timestamp)
-
-       // Internal fields for pooling
-       pooled bool // Whether this element came from pool
-}
-
 // tag represents an individual tag (not tag family like stream).
 type tag struct {
        name      string
@@ -61,27 +49,6 @@ type elements struct {
        pooled bool // Whether from pool
 }
 
-// reset clears element for reuse in object pool.
-func (e *element) reset() {
-       e.seriesID = 0
-       e.userKey = 0
-       if cap(e.data) <= maxPooledSliceSize {
-               e.data = e.data[:0] // Reuse slice if not too large
-       } else {
-               e.data = nil // Release oversized slice
-       }
-       if cap(e.tags) <= maxPooledTagCount {
-               // Reset each tag for reuse
-               for i := range e.tags {
-                       e.tags[i].reset()
-               }
-               e.tags = e.tags[:0]
-       } else {
-               e.tags = nil
-       }
-       e.pooled = false
-}
-
 // reset clears tag for reuse.
 func (t *tag) reset() {
        t.name = ""
@@ -110,16 +77,6 @@ func (e *elements) reset() {
        e.pooled = false
 }
 
-// size returns the total size of the element in bytes.
-func (e *element) size() int {
-       size := 8 + 8 // seriesID + userKey
-       size += len(e.data)
-       for i := range e.tags {
-               size += e.tags[i].size()
-       }
-       return size
-}
-
 // size returns the size of the tag in bytes.
 func (t *tag) size() int {
        return len(t.name) + len(t.value) + 1 // +1 for valueType
@@ -160,30 +117,10 @@ func (e *elements) Swap(i, j int) {
 }
 
 var (
-       elementPool  = pool.Register[*element]("sidx-element")
        elementsPool = pool.Register[*elements]("sidx-elements")
        tagPool      = pool.Register[*tag]("sidx-tag")
 )
 
-// generateElement gets an element from pool or creates new.
-func generateElement() *element {
-       v := elementPool.Get()
-       if v == nil {
-               return &element{pooled: true}
-       }
-       v.pooled = true
-       return v
-}
-
-// releaseElement returns element to pool after reset.
-func releaseElement(e *element) {
-       if e == nil || !e.pooled {
-               return
-       }
-       e.reset()
-       elementPool.Put(e)
-}
-
 // generateElements gets elements collection from pool.
 func generateElements() *elements {
        v := elementsPool.Get()
@@ -220,3 +157,26 @@ func releaseTag(t *tag) {
        t.reset()
        tagPool.Put(t)
 }
+
+// mustAppend adds a new element to the collection.
+func (e *elements) mustAppend(seriesID common.SeriesID, userKey int64, data 
[]byte, tags []Tag) {
+       e.seriesIDs = append(e.seriesIDs, seriesID)
+       e.userKeys = append(e.userKeys, userKey)
+
+       // Copy data
+       dataCopy := make([]byte, len(data))
+       copy(dataCopy, data)
+       e.data = append(e.data, dataCopy)
+
+       // Convert and copy tags
+       elementTags := make([]tag, len(tags))
+       for i, t := range tags {
+               elementTags[i] = tag{
+                       name:      t.name,
+                       value:     append([]byte(nil), t.value...),
+                       valueType: t.valueType,
+                       indexed:   t.indexed,
+               }
+       }
+       e.tags = append(e.tags, elementTags)
+}
diff --git a/banyand/internal/sidx/element_test.go 
b/banyand/internal/sidx/element_test.go
index 4f766b32..c1904f8c 100644
--- a/banyand/internal/sidx/element_test.go
+++ b/banyand/internal/sidx/element_test.go
@@ -30,22 +30,6 @@ import (
 
 func TestElementPoolAllocation(t *testing.T) {
        // Test pool allocation correctness
-       t.Run("element pool allocation", func(t *testing.T) {
-               e1 := generateElement()
-               require.NotNil(t, e1)
-               assert.True(t, e1.pooled, "element should be marked as pooled")
-
-               e2 := generateElement()
-               require.NotNil(t, e2)
-               assert.True(t, e2.pooled, "element should be marked as pooled")
-
-               // Elements should be different instances
-               assert.NotSame(t, e1, e2, "pool should provide different 
instances")
-
-               releaseElement(e1)
-               releaseElement(e2)
-       })
-
        t.Run("elements pool allocation", func(t *testing.T) {
                es1 := generateElements()
                require.NotNil(t, es1)
@@ -78,31 +62,6 @@ func TestElementPoolAllocation(t *testing.T) {
 }
 
 func TestElementReset(t *testing.T) {
-       t.Run("element reset functionality", func(t *testing.T) {
-               e := generateElement()
-
-               // Set up element with data
-               e.seriesID = 123
-               e.userKey = 456
-               e.data = []byte("test data")
-               e.tags = []tag{
-                       {name: "service", value: []byte("test-service"), 
valueType: pbv1.ValueTypeStr, indexed: true},
-                       {name: "endpoint", value: []byte("test-endpoint"), 
valueType: pbv1.ValueTypeStr, indexed: false},
-               }
-
-               // Reset the element
-               e.reset()
-
-               // Verify all fields are cleared
-               assert.Equal(t, common.SeriesID(0), e.seriesID, "seriesID 
should be reset to 0")
-               assert.Equal(t, int64(0), e.userKey, "userKey should be reset 
to 0")
-               assert.Len(t, e.data, 0, "data slice should be empty but 
reusable")
-               assert.Len(t, e.tags, 0, "tags slice should be empty but 
reusable")
-               assert.False(t, e.pooled, "pooled flag should be reset")
-
-               releaseElement(e)
-       })
-
        t.Run("tag reset functionality", func(t *testing.T) {
                tag := generateTag()
 
@@ -118,8 +77,8 @@ func TestElementReset(t *testing.T) {
                // Verify all fields are cleared
                assert.Equal(t, "", tag.name, "name should be empty")
                assert.Nil(t, tag.value, "value should be nil")
-               assert.Equal(t, pbv1.ValueTypeUnknown, tag.valueType, 
"valueType should be unknown")
-               assert.False(t, tag.indexed, "indexed should be false")
+               assert.Equal(t, pbv1.ValueTypeUnknown, tag.valueType, 
"valueType should be reset")
+               assert.False(t, tag.indexed, "indexed should be reset")
 
                releaseTag(tag)
        })
@@ -137,97 +96,31 @@ func TestElementReset(t *testing.T) {
                        {{name: "tag3", value: []byte("value3")}},
                }
 
-               // Reset the elements
+               // Reset elements
                es.reset()
 
                // Verify all slices are cleared but reusable
-               assert.Len(t, es.seriesIDs, 0, "seriesIDs should be empty")
-               assert.Len(t, es.userKeys, 0, "userKeys should be empty")
-               assert.Len(t, es.data, 0, "data should be empty")
-               assert.Len(t, es.tags, 0, "tags should be empty")
+               assert.Len(t, es.seriesIDs, 0, "seriesIDs slice should be 
empty")
+               assert.Len(t, es.userKeys, 0, "userKeys slice should be empty")
+               assert.Len(t, es.data, 0, "data slice should be empty")
+               assert.Len(t, es.tags, 0, "tags slice should be empty")
                assert.False(t, es.pooled, "pooled flag should be reset")
 
                releaseElements(es)
        })
 }
 
-func TestMemoryReuse(t *testing.T) {
-       t.Run("element slice reuse", func(t *testing.T) {
-               e := generateElement()
-
-               // Add small data that should be reused
-               smallData := make([]byte, 100)
-               e.data = smallData
-
-               // Add small number of tags that should be reused
-               e.tags = make([]tag, 5)
-
-               originalDataCap := cap(e.data)
-               originalTagsCap := cap(e.tags)
-
-               // Reset and verify slices are reused
-               e.reset()
-
-               // Add new data and verify capacity is preserved
-               e.data = append(e.data, []byte("new data")...)
-               e.tags = append(e.tags, tag{name: "new-tag"})
-
-               assert.GreaterOrEqual(t, cap(e.data), originalDataCap, "data 
slice capacity should be preserved")
-               assert.GreaterOrEqual(t, cap(e.tags), originalTagsCap, "tags 
slice capacity should be preserved")
-
-               releaseElement(e)
-       })
-
-       t.Run("oversized slice release", func(t *testing.T) {
-               e := generateElement()
-
-               // Add oversized data that should be released
-               oversizedData := make([]byte, maxPooledSliceSize+1)
-               e.data = oversizedData
-
-               // Add too many tags that should be released
-               e.tags = make([]tag, maxPooledTagCount+1)
-
-               // Reset and verify slices are released
-               e.reset()
-
-               assert.Nil(t, e.data, "oversized data slice should be released")
-               assert.Nil(t, e.tags, "oversized tags slice should be released")
-
-               releaseElement(e)
-       })
-}
-
 func TestSizeCalculation(t *testing.T) {
-       t.Run("element size calculation", func(t *testing.T) {
-               e := generateElement()
-               e.seriesID = 123
-               e.userKey = 456
-               e.data = []byte("test data") // 9 bytes
-               e.tags = []tag{
-                       {name: "service", value: []byte("test-service"), 
valueType: pbv1.ValueTypeStr},   // 7 + 12 + 1 = 20 bytes
-                       {name: "endpoint", value: []byte("test-endpoint"), 
valueType: pbv1.ValueTypeStr}, // 8 + 13 + 1 = 22 bytes
-               }
-
-               expectedSize := 8 + 8 + 9 + 20 + 22 // seriesID + userKey + 
data + tag1 + tag2
-               actualSize := e.size()
-
-               assert.Equal(t, expectedSize, actualSize, "element size 
calculation should be accurate")
-
-               releaseElement(e)
-       })
-
        t.Run("tag size calculation", func(t *testing.T) {
                tag := generateTag()
-               tag.name = "test-tag"             // 8 bytes
-               tag.value = []byte("test-value")  // 10 bytes
-               tag.valueType = pbv1.ValueTypeStr // 1 byte
+               tag.name = "test-tag"       // 8 bytes
+               tag.value = []byte("value") // 5 bytes
+               tag.valueType = pbv1.ValueTypeStr
 
-               expectedSize := 8 + 10 + 1
+               expectedSize := 8 + 5 + 1 // name + value + valueType
                actualSize := tag.size()
 
-               assert.Equal(t, expectedSize, actualSize, "tag size calculation 
should be accurate")
-
+               assert.Equal(t, expectedSize, actualSize, "tag size calculation 
should be correct")
                releaseTag(tag)
        })
 
@@ -237,27 +130,16 @@ func TestSizeCalculation(t *testing.T) {
                es.userKeys = []int64{100, 200}
                es.data = [][]byte{[]byte("data1"), []byte("data2")} // 5 + 5 = 
10 bytes
                es.tags = [][]tag{
-                       {{name: "tag1", value: []byte("value1")}}, // 4 + 6 + 1 
= 11 bytes
-                       {{name: "tag2", value: []byte("value2")}}, // 4 + 6 + 1 
= 11 bytes
+                       {{name: "tag1", value: []byte("val1")}}, // 4 + 4 + 1 = 
9 bytes
+                       {{name: "tag2", value: []byte("val2")}}, // 4 + 4 + 1 = 
9 bytes
                }
 
-               expectedSize := 2*8 + 2*8 + 10 + 11 + 11 // seriesIDs + 
userKeys + data + tags
+               expectedSize := 2*8 + 2*8 + 10 + 9 + 9 // seriesIDs + userKeys 
+ data + tags
                actualSize := es.size()
 
-               assert.Equal(t, expectedSize, actualSize, "elements size 
calculation should be accurate")
-
+               assert.Equal(t, expectedSize, actualSize, "elements size 
calculation should be correct")
                releaseElements(es)
        })
-
-       t.Run("empty element size", func(t *testing.T) {
-               e := generateElement()
-               expectedSize := 8 + 8 // just seriesID + userKey
-               actualSize := e.size()
-
-               assert.Equal(t, expectedSize, actualSize, "empty element should 
have minimal size")
-
-               releaseElement(e)
-       })
 }
 
 func TestElementsSorting(t *testing.T) {
@@ -303,21 +185,17 @@ func TestElementsSorting(t *testing.T) {
                        {{name: "tag3"}},
                }
 
-               // Test Len
-               assert.Equal(t, 3, es.Len(), "Len should return number of 
elements")
-
-               // Test Less
-               assert.True(t, es.Less(1, 0), "seriesID 1 < seriesID 2")
-               assert.False(t, es.Less(0, 1), "seriesID 2 > seriesID 1")
+               // Test sort.Interface methods
+               assert.Equal(t, 3, es.Len(), "Len() should return correct 
length")
+               assert.True(t, es.Less(1, 0), "Less(1, 0) should be true 
(seriesID 1 < 2)")
+               assert.False(t, es.Less(0, 1), "Less(0, 1) should be false 
(seriesID 2 > 1)")
 
                // Test Swap
                es.Swap(0, 1)
-               assert.Equal(t, common.SeriesID(1), es.seriesIDs[0], "swap 
should exchange seriesIDs")
-               assert.Equal(t, common.SeriesID(2), es.seriesIDs[1], "swap 
should exchange seriesIDs")
-               assert.Equal(t, int64(100), es.userKeys[0], "swap should 
exchange userKeys")
-               assert.Equal(t, int64(200), es.userKeys[1], "swap should 
exchange userKeys")
-               assert.Equal(t, []byte("data1"), es.data[0], "swap should 
exchange data")
-               assert.Equal(t, []byte("data2"), es.data[1], "swap should 
exchange data")
+               assert.Equal(t, common.SeriesID(1), es.seriesIDs[0], "Swap 
should exchange seriesIDs")
+               assert.Equal(t, common.SeriesID(2), es.seriesIDs[1], "Swap 
should exchange seriesIDs")
+               assert.Equal(t, int64(100), es.userKeys[0], "Swap should 
exchange userKeys")
+               assert.Equal(t, int64(200), es.userKeys[1], "Swap should 
exchange userKeys")
 
                releaseElements(es)
        })
@@ -335,25 +213,20 @@ func TestElementsSorting(t *testing.T) {
 
                sort.Sort(es)
 
-               // With same seriesID, should sort by userKey
+               // When seriesID is the same, should sort by userKey
                expectedUserKeys := []int64{100, 200, 300}
-               assert.Equal(t, expectedUserKeys, es.userKeys, "same seriesID 
should sort by userKey")
+               assert.Equal(t, expectedUserKeys, es.userKeys, "elements with 
same seriesID should be sorted by userKey")
 
-               // Verify data follows the sorting
-               assert.Equal(t, []byte("data100"), es.data[0])
-               assert.Equal(t, []byte("data200"), es.data[1])
-               assert.Equal(t, []byte("data300"), es.data[2])
+               // Verify data follows the userKey sorting
+               assert.Equal(t, []byte("data100"), es.data[0], "data should 
follow userKey sorting")
+               assert.Equal(t, []byte("data200"), es.data[1], "data should 
follow userKey sorting")
+               assert.Equal(t, []byte("data300"), es.data[2], "data should 
follow userKey sorting")
 
                releaseElements(es)
        })
 }
 
 func TestNilSafety(t *testing.T) {
-       t.Run("release nil element", func(_ *testing.T) {
-               // Should not panic
-               releaseElement(nil)
-       })
-
        t.Run("release nil elements", func(_ *testing.T) {
                // Should not panic
                releaseElements(nil)
@@ -364,12 +237,6 @@ func TestNilSafety(t *testing.T) {
                releaseTag(nil)
        })
 
-       t.Run("release non-pooled element", func(_ *testing.T) {
-               e := &element{pooled: false}
-               // Should not panic or add to pool
-               releaseElement(e)
-       })
-
        t.Run("release non-pooled elements", func(_ *testing.T) {
                es := &elements{pooled: false}
                // Should not panic or add to pool
diff --git a/banyand/internal/sidx/introducer.go 
b/banyand/internal/sidx/introducer.go
new file mode 100644
index 00000000..7dd43e14
--- /dev/null
+++ b/banyand/internal/sidx/introducer.go
@@ -0,0 +1,217 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
+)
+
+type introduction struct {
+       memPart *memPart
+       applied chan struct{}
+}
+
+func (i *introduction) reset() {
+       i.memPart = nil
+       i.applied = nil
+}
+
+var introductionPool = pool.Register[*introduction]("sidx-introduction")
+
+func generateIntroduction() *introduction {
+       v := introductionPool.Get()
+       if v == nil {
+               return &introduction{}
+       }
+       intro := v
+       intro.reset()
+       return intro
+}
+
+func releaseIntroduction(i *introduction) {
+       introductionPool.Put(i)
+}
+
+type flusherIntroduction struct {
+       flushed map[uint64]*part
+       applied chan struct{}
+}
+
+func (i *flusherIntroduction) reset() {
+       for k := range i.flushed {
+               delete(i.flushed, k)
+       }
+       i.applied = nil
+}
+
+var flusherIntroductionPool = 
pool.Register[*flusherIntroduction]("sidx-flusher-introduction")
+
+func generateFlusherIntroduction() *flusherIntroduction {
+       v := flusherIntroductionPool.Get()
+       if v == nil {
+               return &flusherIntroduction{
+                       flushed: make(map[uint64]*part),
+               }
+       }
+       fi := v
+       fi.reset()
+       return fi
+}
+
+func releaseFlusherIntroduction(i *flusherIntroduction) {
+       flusherIntroductionPool.Put(i)
+}
+
+type mergerIntroduction struct {
+       merged  map[uint64]struct{}
+       newPart *part
+       applied chan struct{}
+}
+
+func (i *mergerIntroduction) reset() {
+       for k := range i.merged {
+               delete(i.merged, k)
+       }
+       i.newPart = nil
+       i.applied = nil
+}
+
+var mergerIntroductionPool = 
pool.Register[*mergerIntroduction]("sidx-merger-introduction")
+
+func generateMergerIntroduction() *mergerIntroduction {
+       v := mergerIntroductionPool.Get()
+       if v == nil {
+               return &mergerIntroduction{
+                       merged: make(map[uint64]struct{}),
+               }
+       }
+       mi := v
+       mi.reset()
+       return mi
+}
+
+func releaseMergerIntroduction(i *mergerIntroduction) {
+       mergerIntroductionPool.Put(i)
+}
+
+func (s *sidx) introducerLoop(flushCh chan *flusherIntroduction, mergeCh chan 
*mergerIntroduction, watcherCh watcher.Channel, epoch uint64) {
+       var introducerWatchers watcher.Epochs
+       defer s.loopCloser.Done()
+       for {
+               select {
+               case <-s.loopCloser.CloseNotify():
+                       return
+               case next := <-s.introductions:
+                       s.incTotalIntroduceLoopStarted("mem")
+                       s.introduceMemPart(next, epoch)
+                       s.incTotalIntroduceLoopFinished("mem")
+                       epoch++
+               case next := <-flushCh:
+                       s.incTotalIntroduceLoopStarted("flush")
+                       s.introduceFlushed(next, epoch)
+                       s.incTotalIntroduceLoopFinished("flush")
+                       s.gc.clean()
+                       epoch++
+               case next := <-mergeCh:
+                       s.incTotalIntroduceLoopStarted("merge")
+                       s.introduceMerged(next, epoch)
+                       s.incTotalIntroduceLoopFinished("merge")
+                       s.gc.clean()
+                       epoch++
+               case epochWatcher := <-watcherCh:
+                       introducerWatchers.Add(epochWatcher)
+               }
+               curEpoch := s.currentEpoch()
+               introducerWatchers.Notify(curEpoch)
+       }
+}
+
+func (s *sidx) introduceMemPart(nextIntroduction *introduction, epoch uint64) {
+       cur := s.currentSnapshot()
+       if cur != nil {
+               defer cur.decRef()
+       } else {
+               cur = generateSnapshot()
+       }
+
+       next := nextIntroduction.memPart
+       nextSnp := cur.copyAllTo(epoch)
+
+       // Convert memPart to part and wrap it
+       part := openMemPart(next)
+       pw := newPartWrapper(part)
+       nextSnp.parts = append(nextSnp.parts, pw)
+
+       s.replaceSnapshot(nextSnp)
+       if nextIntroduction.applied != nil {
+               close(nextIntroduction.applied)
+       }
+}
+
+func (s *sidx) introduceFlushed(nextIntroduction *flusherIntroduction, epoch 
uint64) {
+       cur := s.currentSnapshot()
+       if cur == nil {
+               s.l.Panic().Msg("current snapshot is nil")
+       }
+       defer cur.decRef()
+       nextSnp := cur.merge(epoch, nextIntroduction.flushed)
+       s.replaceSnapshot(nextSnp)
+       s.persistSnapshot(nextSnp)
+       if nextIntroduction.applied != nil {
+               close(nextIntroduction.applied)
+       }
+}
+
+func (s *sidx) introduceMerged(nextIntroduction *mergerIntroduction, epoch 
uint64) {
+       cur := s.currentSnapshot()
+       if cur == nil {
+               s.l.Panic().Msg("current snapshot is nil")
+               return
+       }
+       defer cur.decRef()
+       nextSnp := cur.remove(epoch, nextIntroduction.merged)
+
+       // Wrap the new part
+       pw := newPartWrapper(nextIntroduction.newPart)
+       nextSnp.parts = append(nextSnp.parts, pw)
+
+       s.replaceSnapshot(nextSnp)
+       s.persistSnapshot(nextSnp)
+       if nextIntroduction.applied != nil {
+               close(nextIntroduction.applied)
+       }
+}
+
+func (s *sidx) replaceSnapshot(next *snapshot) {
+       s.Lock()
+       defer s.Unlock()
+       if s.snapshot != nil {
+               s.snapshot.decRef()
+       }
+       s.snapshot = next
+}
+
+func (s *sidx) currentEpoch() uint64 {
+       snap := s.currentSnapshot()
+       if snap == nil {
+               return 0
+       }
+       defer snap.decRef()
+       return snap.epoch
+}
diff --git a/banyand/internal/sidx/part_wrapper.go 
b/banyand/internal/sidx/part_wrapper.go
index ca5d83aa..c5c41137 100644
--- a/banyand/internal/sidx/part_wrapper.go
+++ b/banyand/internal/sidx/part_wrapper.go
@@ -73,11 +73,13 @@ type partWrapper struct {
 // newPartWrapper creates a new partWrapper with an initial reference count of 
1.
 // The part starts in the active state.
 func newPartWrapper(p *part) *partWrapper {
-       return &partWrapper{
+       pw := &partWrapper{
                p:     p,
                ref:   1,
                state: int32(partStateActive),
        }
+
+       return pw
 }
 
 // acquire increments the reference count atomically.
@@ -202,6 +204,12 @@ func (pw *partWrapper) isRemoved() bool {
        return atomic.LoadInt32(&pw.state) == int32(partStateRemoved)
 }
 
+// isMemPart returns true if this wrapper contains a memory part.
+func (pw *partWrapper) isMemPart() bool {
+       // A memory part typically has no file system path or is stored in 
memory
+       return pw.p != nil && pw.p.path == ""
+}
+
 // String returns a string representation of the partWrapper.
 func (pw *partWrapper) String() string {
        state := pw.getState()
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
new file mode 100644
index 00000000..4c7fdc4f
--- /dev/null
+++ b/banyand/internal/sidx/sidx.go
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "context"
+       "sync"
+       "sync/atomic"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
+)
+
+// sidx implements the SIDX interface with introduction channels for async 
operations.
+type sidx struct {
+       snapshot                   *snapshot
+       introductions              chan *introduction
+       flushCh                    chan *flusherIntroduction
+       mergeCh                    chan *mergerIntroduction
+       loopCloser                 *run.Closer
+       gc                         *gc
+       l                          *logger.Logger
+       totalIntroduceLoopStarted  atomic.Int64
+       totalIntroduceLoopFinished atomic.Int64
+       mu                         sync.RWMutex
+}
+
+// NewSIDX creates a new SIDX instance with introduction channels.
+func NewSIDX(opts *Options) (SIDX, error) {
+       if opts == nil {
+               opts = NewDefaultOptions()
+       }
+
+       if err := opts.Validate(); err != nil {
+               return nil, err
+       }
+
+       s := &sidx{
+               introductions: make(chan *introduction),
+               flushCh:       make(chan *flusherIntroduction),
+               mergeCh:       make(chan *mergerIntroduction),
+               loopCloser:    run.NewCloser(1),
+               l:             logger.GetLogger().Named("sidx"),
+       }
+
+       // Initialize garbage collector
+       s.gc = newGC(opts)
+
+       // Start introducer loop
+       watcherCh := make(watcher.Channel, 10)
+       go s.introducerLoop(s.flushCh, s.mergeCh, watcherCh, 0)
+
+       return s, nil
+}
+
+// Write implements SIDX interface.
+func (s *sidx) Write(ctx context.Context, reqs []WriteRequest) error {
+       // Validate requests
+       for _, req := range reqs {
+               if err := req.Validate(); err != nil {
+                       return err
+               }
+       }
+
+       // Create elements from requests
+       es := generateElements()
+       defer releaseElements(es)
+
+       for _, req := range reqs {
+               es.mustAppend(req.SeriesID, req.Key, req.Data, req.Tags)
+       }
+
+       // Create memory part from elements
+       mp := generateMemPart()
+       mp.mustInitFromElements(es)
+
+       // Create introduction
+       intro := generateIntroduction()
+       intro.memPart = mp
+       intro.applied = make(chan struct{})
+
+       // Send to introducer loop
+       select {
+       case s.introductions <- intro:
+               // Wait for introduction to be applied
+               <-intro.applied
+               releaseIntroduction(intro)
+               return nil
+       case <-ctx.Done():
+               releaseIntroduction(intro)
+               releaseMemPart(mp)
+               return ctx.Err()
+       }
+}
+
+// Query implements SIDX interface.
+func (s *sidx) Query(ctx context.Context, req QueryRequest) (QueryResult, 
error) {
+       if err := req.Validate(); err != nil {
+               return nil, err
+       }
+
+       // Get current snapshot
+       snap := s.currentSnapshot()
+       if snap == nil {
+               return &emptyQueryResult{}, nil
+       }
+
+       // Create query result
+       qr := &queryResult{
+               snapshot: snap,
+               request:  req,
+               ctx:      ctx,
+       }
+
+       return qr, nil
+}
+
+// Stats implements SIDX interface.
+func (s *sidx) Stats(_ context.Context) (*Stats, error) {
+       snap := s.currentSnapshot()
+       if snap == nil {
+               return &Stats{}, nil
+       }
+       defer snap.decRef()
+
+       stats := &Stats{
+               PartCount: int64(snap.getPartCount()),
+       }
+
+       // Load atomic counters
+       stats.QueryCount.Store(s.totalIntroduceLoopStarted.Load())
+       stats.WriteCount.Store(s.totalIntroduceLoopFinished.Load())
+
+       return stats, nil
+}
+
+// Flush implements Flusher interface.
+func (s *sidx) Flush() error {
+       // Get current memory parts that need flushing
+       snap := s.currentSnapshot()
+       if snap == nil {
+               return nil
+       }
+       defer snap.decRef()
+
+       // Create flush introduction
+       flushIntro := generateFlusherIntroduction()
+       flushIntro.applied = make(chan struct{})
+
+       // Select memory parts to flush (simplified logic)
+       for _, pw := range snap.parts {
+               if pw.isMemPart() && pw.isActive() {
+                       flushIntro.flushed[pw.ID()] = pw.p
+               }
+       }
+
+       if len(flushIntro.flushed) == 0 {
+               releaseFlusherIntroduction(flushIntro)
+               return nil
+       }
+
+       // Send to introducer loop
+       s.flushCh <- flushIntro
+
+       // Wait for flush to complete
+       <-flushIntro.applied
+       releaseFlusherIntroduction(flushIntro)
+
+       return nil
+}
+
+// Merge implements Merger interface.
+func (s *sidx) Merge() error {
+       // Get current snapshot
+       snap := s.currentSnapshot()
+       if snap == nil {
+               return nil
+       }
+       defer snap.decRef()
+
+       // Create merge introduction
+       mergeIntro := generateMergerIntroduction()
+       mergeIntro.applied = make(chan struct{})
+
+       // Select parts to merge (simplified logic - merge first 2 parts)
+       var partsToMerge []*partWrapper
+       for _, pw := range snap.parts {
+               if pw.isActive() && !pw.isMemPart() {
+                       partsToMerge = append(partsToMerge, pw)
+                       if len(partsToMerge) >= 2 {
+                               break
+                       }
+               }
+       }
+
+       if len(partsToMerge) < 2 {
+               releaseMergerIntroduction(mergeIntro)
+               return nil
+       }
+
+       // Mark parts for merging
+       for _, pw := range partsToMerge {
+               mergeIntro.merged[pw.ID()] = struct{}{}
+       }
+
+       // Create new merged part (simplified - would actually merge the parts)
+       mergeIntro.newPart = partsToMerge[0].p
+
+       // Send to introducer loop
+       s.mergeCh <- mergeIntro
+
+       // Wait for merge to complete
+       <-mergeIntro.applied
+       releaseMergerIntroduction(mergeIntro)
+
+       return nil
+}
+
+// Close implements SIDX interface.
+func (s *sidx) Close() error {
+       s.loopCloser.CloseThenWait()
+
+       // Close current snapshot
+       s.mu.Lock()
+       if s.snapshot != nil {
+               s.snapshot.decRef()
+               s.snapshot = nil
+       }
+       s.mu.Unlock()
+
+       return nil
+}
+
+// currentSnapshot returns the current snapshot with incremented reference 
count.
+func (s *sidx) currentSnapshot() *snapshot {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+
+       if s.snapshot == nil {
+               return nil
+       }
+
+       if s.snapshot.acquire() {
+               return s.snapshot
+       }
+
+       return nil
+}
+
+// decRef decrements the snapshot reference count (helper for snapshot 
interface).
+func (s *snapshot) decRef() {
+       s.release()
+}
+
+// Helper methods for metrics.
+func (s *sidx) incTotalIntroduceLoopStarted(_ string) {
+       s.totalIntroduceLoopStarted.Add(1)
+}
+
+func (s *sidx) incTotalIntroduceLoopFinished(_ string) {
+       s.totalIntroduceLoopFinished.Add(1)
+}
+
+// persistSnapshot persists the snapshot to disk (placeholder).
+func (s *sidx) persistSnapshot(_ *snapshot) {
+       // TODO: Implement snapshot persistence
+}
+
+// Lock/Unlock methods for introducer loop.
+func (s *sidx) Lock() {
+       s.mu.Lock()
+}
+
+func (s *sidx) Unlock() {
+       s.mu.Unlock()
+}
+
+// gc represents garbage collector (placeholder).
+type gc struct{}
+
+func newGC(_ *Options) *gc {
+       return &gc{}
+}
+
+func (g *gc) clean() {
+       // TODO: Implement garbage collection
+}
+
+// emptyQueryResult represents an empty query result.
+type emptyQueryResult struct{}
+
+func (e *emptyQueryResult) Pull() *QueryResponse {
+       return nil
+}
+
+func (e *emptyQueryResult) Release() {
+       // Nothing to release
+}
+
+// queryResult implements QueryResult interface.
+type queryResult struct {
+       ctx      context.Context
+       snapshot *snapshot
+       request  QueryRequest
+       released bool
+}
+
+func (qr *queryResult) Pull() *QueryResponse {
+       if qr.released {
+               return nil
+       }
+
+       // Simple implementation - return empty response
+       // TODO: Implement actual query logic
+       return &QueryResponse{
+               Keys: []int64{},
+               Data: [][]byte{},
+               Tags: [][]Tag{},
+               SIDs: []common.SeriesID{},
+               Metadata: ResponseMetadata{
+                       ExecutionTimeMs: 0,
+               },
+       }
+}
+
+func (qr *queryResult) Release() {
+       if qr.released {
+               return
+       }
+       qr.released = true
+       if qr.snapshot != nil {
+               qr.snapshot.decRef()
+               qr.snapshot = nil
+       }
+}
diff --git a/banyand/internal/sidx/snapshot.go 
b/banyand/internal/sidx/snapshot.go
index 08f9eeb7..135a909e 100644
--- a/banyand/internal/sidx/snapshot.go
+++ b/banyand/internal/sidx/snapshot.go
@@ -303,3 +303,60 @@ func releaseSnapshot(s *snapshot) {
        s.reset()
        snapshotPool.Put(s)
 }
+
+// copyAllTo creates a new snapshot with all parts from current snapshot.
+func (s *snapshot) copyAllTo(epoch uint64) *snapshot {
+       result := generateSnapshot()
+       result.parts = make([]*partWrapper, len(s.parts))
+       result.epoch = epoch
+       result.ref = 1
+       result.released.Store(false)
+
+       // Copy all parts and acquire references
+       copy(result.parts, s.parts)
+       for _, pw := range result.parts {
+               if pw != nil {
+                       pw.acquire()
+               }
+       }
+
+       return result
+}
+
+// merge creates a new snapshot by merging flushed parts into the current 
snapshot.
+func (s *snapshot) merge(epoch uint64, flushed map[uint64]*part) *snapshot {
+       result := s.copyAllTo(epoch)
+
+       // Add flushed parts to the snapshot
+       for partID, part := range flushed {
+               // Set the part ID from the map key
+               if part != nil && part.partMetadata != nil {
+                       part.partMetadata.ID = partID
+               }
+               // Create part wrapper for the flushed part
+               pw := newPartWrapper(part)
+               result.parts = append(result.parts, pw)
+       }
+
+       result.sortPartsByEpoch()
+       return result
+}
+
+// remove creates a new snapshot by removing specified parts.
+func (s *snapshot) remove(epoch uint64, toRemove map[uint64]struct{}) 
*snapshot {
+       result := generateSnapshot()
+       result.epoch = epoch
+       result.ref = 1
+       result.released.Store(false)
+
+       // Copy parts except those being removed
+       for _, pw := range s.parts {
+               if _, shouldRemove := toRemove[pw.ID()]; !shouldRemove {
+                       if pw.acquire() {
+                               result.parts = append(result.parts, pw)
+                       }
+               }
+       }
+
+       return result
+}

Reply via email to