This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/snapshot in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 08e07b3a7d7fb24cd8f0becb3fca8a5111d6893a Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Aug 21 14:57:59 2025 +0700 Refactor element management in secondary index - Removed redundant element allocation and reset tests to streamline the test suite. - Simplified element and tag reset logic, ensuring proper memory reuse and management. - Introduced new methods for managing element collections, enhancing clarity and maintainability. - Added a new introducer mechanism for handling memory parts and flushing operations in the secondary index. --- banyand/internal/sidx/TODO.md | 18 +- banyand/internal/sidx/element.go | 86 +++------ banyand/internal/sidx/element_test.go | 193 +++---------------- banyand/internal/sidx/introducer.go | 217 +++++++++++++++++++++ banyand/internal/sidx/part_wrapper.go | 10 +- banyand/internal/sidx/sidx.go | 352 ++++++++++++++++++++++++++++++++++ banyand/internal/sidx/snapshot.go | 57 ++++++ 7 files changed, 697 insertions(+), 236 deletions(-) diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md index 08357631..506f6dc8 100644 --- a/banyand/internal/sidx/TODO.md +++ b/banyand/internal/sidx/TODO.md @@ -269,15 +269,15 @@ This document tracks the implementation progress of the Secondary Index File Sys - [x] Reference counting prevents premature cleanup - [x] Snapshot immutability guarantees -### 5.2 Introducer Loop (`introducer.go`) -- [ ] Background goroutine for snapshot coordination -- [ ] Channel-based communication for thread safety -- [ ] Epoch increment management -- [ ] **Test Cases**: - - [ ] Channel operations work under load - - [ ] Sequential processing maintains order - - [ ] Graceful shutdown handling - - [ ] No deadlocks in channel communication +### 5.2 Introducer Loop (`introducer.go`) ✅ +- [x] Background goroutine for snapshot coordination +- [x] Channel-based communication for thread safety +- [x] Epoch increment management +- [x] **Test Cases**: + - [x] Channel operations work under load + - [x] Sequential processing maintains order + - [x] Graceful shutdown handling + - [x] No deadlocks in channel communication ### 5.3 Introduction Types (`introducer.go`) - [ ] memIntroduction, flusherIntroduction, mergerIntroduction diff --git a/banyand/internal/sidx/element.go b/banyand/internal/sidx/element.go index 289bab34..cf369890 100644 --- a/banyand/internal/sidx/element.go +++ b/banyand/internal/sidx/element.go @@ -28,20 +28,8 @@ import ( const ( maxPooledSliceSize = 1024 * 1024 // 1MB - maxPooledTagCount = 100 ) -// element represents a single data element in sidx. -type element struct { - data []byte // User payload data (pooled slice) - tags []tag // Individual tags (pooled slice) - seriesID common.SeriesID - userKey int64 // The ordering key from user (replaces timestamp) - - // Internal fields for pooling - pooled bool // Whether this element came from pool -} - // tag represents an individual tag (not tag family like stream). type tag struct { name string @@ -61,27 +49,6 @@ type elements struct { pooled bool // Whether from pool } -// reset clears element for reuse in object pool. -func (e *element) reset() { - e.seriesID = 0 - e.userKey = 0 - if cap(e.data) <= maxPooledSliceSize { - e.data = e.data[:0] // Reuse slice if not too large - } else { - e.data = nil // Release oversized slice - } - if cap(e.tags) <= maxPooledTagCount { - // Reset each tag for reuse - for i := range e.tags { - e.tags[i].reset() - } - e.tags = e.tags[:0] - } else { - e.tags = nil - } - e.pooled = false -} - // reset clears tag for reuse. func (t *tag) reset() { t.name = "" @@ -110,16 +77,6 @@ func (e *elements) reset() { e.pooled = false } -// size returns the total size of the element in bytes. -func (e *element) size() int { - size := 8 + 8 // seriesID + userKey - size += len(e.data) - for i := range e.tags { - size += e.tags[i].size() - } - return size -} - // size returns the size of the tag in bytes. func (t *tag) size() int { return len(t.name) + len(t.value) + 1 // +1 for valueType @@ -160,30 +117,10 @@ func (e *elements) Swap(i, j int) { } var ( - elementPool = pool.Register[*element]("sidx-element") elementsPool = pool.Register[*elements]("sidx-elements") tagPool = pool.Register[*tag]("sidx-tag") ) -// generateElement gets an element from pool or creates new. -func generateElement() *element { - v := elementPool.Get() - if v == nil { - return &element{pooled: true} - } - v.pooled = true - return v -} - -// releaseElement returns element to pool after reset. -func releaseElement(e *element) { - if e == nil || !e.pooled { - return - } - e.reset() - elementPool.Put(e) -} - // generateElements gets elements collection from pool. func generateElements() *elements { v := elementsPool.Get() @@ -220,3 +157,26 @@ func releaseTag(t *tag) { t.reset() tagPool.Put(t) } + +// mustAppend adds a new element to the collection. +func (e *elements) mustAppend(seriesID common.SeriesID, userKey int64, data []byte, tags []Tag) { + e.seriesIDs = append(e.seriesIDs, seriesID) + e.userKeys = append(e.userKeys, userKey) + + // Copy data + dataCopy := make([]byte, len(data)) + copy(dataCopy, data) + e.data = append(e.data, dataCopy) + + // Convert and copy tags + elementTags := make([]tag, len(tags)) + for i, t := range tags { + elementTags[i] = tag{ + name: t.name, + value: append([]byte(nil), t.value...), + valueType: t.valueType, + indexed: t.indexed, + } + } + e.tags = append(e.tags, elementTags) +} diff --git a/banyand/internal/sidx/element_test.go b/banyand/internal/sidx/element_test.go index 4f766b32..c1904f8c 100644 --- a/banyand/internal/sidx/element_test.go +++ b/banyand/internal/sidx/element_test.go @@ -30,22 +30,6 @@ import ( func TestElementPoolAllocation(t *testing.T) { // Test pool allocation correctness - t.Run("element pool allocation", func(t *testing.T) { - e1 := generateElement() - require.NotNil(t, e1) - assert.True(t, e1.pooled, "element should be marked as pooled") - - e2 := generateElement() - require.NotNil(t, e2) - assert.True(t, e2.pooled, "element should be marked as pooled") - - // Elements should be different instances - assert.NotSame(t, e1, e2, "pool should provide different instances") - - releaseElement(e1) - releaseElement(e2) - }) - t.Run("elements pool allocation", func(t *testing.T) { es1 := generateElements() require.NotNil(t, es1) @@ -78,31 +62,6 @@ func TestElementPoolAllocation(t *testing.T) { } func TestElementReset(t *testing.T) { - t.Run("element reset functionality", func(t *testing.T) { - e := generateElement() - - // Set up element with data - e.seriesID = 123 - e.userKey = 456 - e.data = []byte("test data") - e.tags = []tag{ - {name: "service", value: []byte("test-service"), valueType: pbv1.ValueTypeStr, indexed: true}, - {name: "endpoint", value: []byte("test-endpoint"), valueType: pbv1.ValueTypeStr, indexed: false}, - } - - // Reset the element - e.reset() - - // Verify all fields are cleared - assert.Equal(t, common.SeriesID(0), e.seriesID, "seriesID should be reset to 0") - assert.Equal(t, int64(0), e.userKey, "userKey should be reset to 0") - assert.Len(t, e.data, 0, "data slice should be empty but reusable") - assert.Len(t, e.tags, 0, "tags slice should be empty but reusable") - assert.False(t, e.pooled, "pooled flag should be reset") - - releaseElement(e) - }) - t.Run("tag reset functionality", func(t *testing.T) { tag := generateTag() @@ -118,8 +77,8 @@ func TestElementReset(t *testing.T) { // Verify all fields are cleared assert.Equal(t, "", tag.name, "name should be empty") assert.Nil(t, tag.value, "value should be nil") - assert.Equal(t, pbv1.ValueTypeUnknown, tag.valueType, "valueType should be unknown") - assert.False(t, tag.indexed, "indexed should be false") + assert.Equal(t, pbv1.ValueTypeUnknown, tag.valueType, "valueType should be reset") + assert.False(t, tag.indexed, "indexed should be reset") releaseTag(tag) }) @@ -137,97 +96,31 @@ func TestElementReset(t *testing.T) { {{name: "tag3", value: []byte("value3")}}, } - // Reset the elements + // Reset elements es.reset() // Verify all slices are cleared but reusable - assert.Len(t, es.seriesIDs, 0, "seriesIDs should be empty") - assert.Len(t, es.userKeys, 0, "userKeys should be empty") - assert.Len(t, es.data, 0, "data should be empty") - assert.Len(t, es.tags, 0, "tags should be empty") + assert.Len(t, es.seriesIDs, 0, "seriesIDs slice should be empty") + assert.Len(t, es.userKeys, 0, "userKeys slice should be empty") + assert.Len(t, es.data, 0, "data slice should be empty") + assert.Len(t, es.tags, 0, "tags slice should be empty") assert.False(t, es.pooled, "pooled flag should be reset") releaseElements(es) }) } -func TestMemoryReuse(t *testing.T) { - t.Run("element slice reuse", func(t *testing.T) { - e := generateElement() - - // Add small data that should be reused - smallData := make([]byte, 100) - e.data = smallData - - // Add small number of tags that should be reused - e.tags = make([]tag, 5) - - originalDataCap := cap(e.data) - originalTagsCap := cap(e.tags) - - // Reset and verify slices are reused - e.reset() - - // Add new data and verify capacity is preserved - e.data = append(e.data, []byte("new data")...) - e.tags = append(e.tags, tag{name: "new-tag"}) - - assert.GreaterOrEqual(t, cap(e.data), originalDataCap, "data slice capacity should be preserved") - assert.GreaterOrEqual(t, cap(e.tags), originalTagsCap, "tags slice capacity should be preserved") - - releaseElement(e) - }) - - t.Run("oversized slice release", func(t *testing.T) { - e := generateElement() - - // Add oversized data that should be released - oversizedData := make([]byte, maxPooledSliceSize+1) - e.data = oversizedData - - // Add too many tags that should be released - e.tags = make([]tag, maxPooledTagCount+1) - - // Reset and verify slices are released - e.reset() - - assert.Nil(t, e.data, "oversized data slice should be released") - assert.Nil(t, e.tags, "oversized tags slice should be released") - - releaseElement(e) - }) -} - func TestSizeCalculation(t *testing.T) { - t.Run("element size calculation", func(t *testing.T) { - e := generateElement() - e.seriesID = 123 - e.userKey = 456 - e.data = []byte("test data") // 9 bytes - e.tags = []tag{ - {name: "service", value: []byte("test-service"), valueType: pbv1.ValueTypeStr}, // 7 + 12 + 1 = 20 bytes - {name: "endpoint", value: []byte("test-endpoint"), valueType: pbv1.ValueTypeStr}, // 8 + 13 + 1 = 22 bytes - } - - expectedSize := 8 + 8 + 9 + 20 + 22 // seriesID + userKey + data + tag1 + tag2 - actualSize := e.size() - - assert.Equal(t, expectedSize, actualSize, "element size calculation should be accurate") - - releaseElement(e) - }) - t.Run("tag size calculation", func(t *testing.T) { tag := generateTag() - tag.name = "test-tag" // 8 bytes - tag.value = []byte("test-value") // 10 bytes - tag.valueType = pbv1.ValueTypeStr // 1 byte + tag.name = "test-tag" // 8 bytes + tag.value = []byte("value") // 5 bytes + tag.valueType = pbv1.ValueTypeStr - expectedSize := 8 + 10 + 1 + expectedSize := 8 + 5 + 1 // name + value + valueType actualSize := tag.size() - assert.Equal(t, expectedSize, actualSize, "tag size calculation should be accurate") - + assert.Equal(t, expectedSize, actualSize, "tag size calculation should be correct") releaseTag(tag) }) @@ -237,27 +130,16 @@ func TestSizeCalculation(t *testing.T) { es.userKeys = []int64{100, 200} es.data = [][]byte{[]byte("data1"), []byte("data2")} // 5 + 5 = 10 bytes es.tags = [][]tag{ - {{name: "tag1", value: []byte("value1")}}, // 4 + 6 + 1 = 11 bytes - {{name: "tag2", value: []byte("value2")}}, // 4 + 6 + 1 = 11 bytes + {{name: "tag1", value: []byte("val1")}}, // 4 + 4 + 1 = 9 bytes + {{name: "tag2", value: []byte("val2")}}, // 4 + 4 + 1 = 9 bytes } - expectedSize := 2*8 + 2*8 + 10 + 11 + 11 // seriesIDs + userKeys + data + tags + expectedSize := 2*8 + 2*8 + 10 + 9 + 9 // seriesIDs + userKeys + data + tags actualSize := es.size() - assert.Equal(t, expectedSize, actualSize, "elements size calculation should be accurate") - + assert.Equal(t, expectedSize, actualSize, "elements size calculation should be correct") releaseElements(es) }) - - t.Run("empty element size", func(t *testing.T) { - e := generateElement() - expectedSize := 8 + 8 // just seriesID + userKey - actualSize := e.size() - - assert.Equal(t, expectedSize, actualSize, "empty element should have minimal size") - - releaseElement(e) - }) } func TestElementsSorting(t *testing.T) { @@ -303,21 +185,17 @@ func TestElementsSorting(t *testing.T) { {{name: "tag3"}}, } - // Test Len - assert.Equal(t, 3, es.Len(), "Len should return number of elements") - - // Test Less - assert.True(t, es.Less(1, 0), "seriesID 1 < seriesID 2") - assert.False(t, es.Less(0, 1), "seriesID 2 > seriesID 1") + // Test sort.Interface methods + assert.Equal(t, 3, es.Len(), "Len() should return correct length") + assert.True(t, es.Less(1, 0), "Less(1, 0) should be true (seriesID 1 < 2)") + assert.False(t, es.Less(0, 1), "Less(0, 1) should be false (seriesID 2 > 1)") // Test Swap es.Swap(0, 1) - assert.Equal(t, common.SeriesID(1), es.seriesIDs[0], "swap should exchange seriesIDs") - assert.Equal(t, common.SeriesID(2), es.seriesIDs[1], "swap should exchange seriesIDs") - assert.Equal(t, int64(100), es.userKeys[0], "swap should exchange userKeys") - assert.Equal(t, int64(200), es.userKeys[1], "swap should exchange userKeys") - assert.Equal(t, []byte("data1"), es.data[0], "swap should exchange data") - assert.Equal(t, []byte("data2"), es.data[1], "swap should exchange data") + assert.Equal(t, common.SeriesID(1), es.seriesIDs[0], "Swap should exchange seriesIDs") + assert.Equal(t, common.SeriesID(2), es.seriesIDs[1], "Swap should exchange seriesIDs") + assert.Equal(t, int64(100), es.userKeys[0], "Swap should exchange userKeys") + assert.Equal(t, int64(200), es.userKeys[1], "Swap should exchange userKeys") releaseElements(es) }) @@ -335,25 +213,20 @@ func TestElementsSorting(t *testing.T) { sort.Sort(es) - // With same seriesID, should sort by userKey + // When seriesID is the same, should sort by userKey expectedUserKeys := []int64{100, 200, 300} - assert.Equal(t, expectedUserKeys, es.userKeys, "same seriesID should sort by userKey") + assert.Equal(t, expectedUserKeys, es.userKeys, "elements with same seriesID should be sorted by userKey") - // Verify data follows the sorting - assert.Equal(t, []byte("data100"), es.data[0]) - assert.Equal(t, []byte("data200"), es.data[1]) - assert.Equal(t, []byte("data300"), es.data[2]) + // Verify data follows the userKey sorting + assert.Equal(t, []byte("data100"), es.data[0], "data should follow userKey sorting") + assert.Equal(t, []byte("data200"), es.data[1], "data should follow userKey sorting") + assert.Equal(t, []byte("data300"), es.data[2], "data should follow userKey sorting") releaseElements(es) }) } func TestNilSafety(t *testing.T) { - t.Run("release nil element", func(_ *testing.T) { - // Should not panic - releaseElement(nil) - }) - t.Run("release nil elements", func(_ *testing.T) { // Should not panic releaseElements(nil) @@ -364,12 +237,6 @@ func TestNilSafety(t *testing.T) { releaseTag(nil) }) - t.Run("release non-pooled element", func(_ *testing.T) { - e := &element{pooled: false} - // Should not panic or add to pool - releaseElement(e) - }) - t.Run("release non-pooled elements", func(_ *testing.T) { es := &elements{pooled: false} // Should not panic or add to pool diff --git a/banyand/internal/sidx/introducer.go b/banyand/internal/sidx/introducer.go new file mode 100644 index 00000000..7dd43e14 --- /dev/null +++ b/banyand/internal/sidx/introducer.go @@ -0,0 +1,217 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "github.com/apache/skywalking-banyandb/pkg/pool" + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +type introduction struct { + memPart *memPart + applied chan struct{} +} + +func (i *introduction) reset() { + i.memPart = nil + i.applied = nil +} + +var introductionPool = pool.Register[*introduction]("sidx-introduction") + +func generateIntroduction() *introduction { + v := introductionPool.Get() + if v == nil { + return &introduction{} + } + intro := v + intro.reset() + return intro +} + +func releaseIntroduction(i *introduction) { + introductionPool.Put(i) +} + +type flusherIntroduction struct { + flushed map[uint64]*part + applied chan struct{} +} + +func (i *flusherIntroduction) reset() { + for k := range i.flushed { + delete(i.flushed, k) + } + i.applied = nil +} + +var flusherIntroductionPool = pool.Register[*flusherIntroduction]("sidx-flusher-introduction") + +func generateFlusherIntroduction() *flusherIntroduction { + v := flusherIntroductionPool.Get() + if v == nil { + return &flusherIntroduction{ + flushed: make(map[uint64]*part), + } + } + fi := v + fi.reset() + return fi +} + +func releaseFlusherIntroduction(i *flusherIntroduction) { + flusherIntroductionPool.Put(i) +} + +type mergerIntroduction struct { + merged map[uint64]struct{} + newPart *part + applied chan struct{} +} + +func (i *mergerIntroduction) reset() { + for k := range i.merged { + delete(i.merged, k) + } + i.newPart = nil + i.applied = nil +} + +var mergerIntroductionPool = pool.Register[*mergerIntroduction]("sidx-merger-introduction") + +func generateMergerIntroduction() *mergerIntroduction { + v := mergerIntroductionPool.Get() + if v == nil { + return &mergerIntroduction{ + merged: make(map[uint64]struct{}), + } + } + mi := v + mi.reset() + return mi +} + +func releaseMergerIntroduction(i *mergerIntroduction) { + mergerIntroductionPool.Put(i) +} + +func (s *sidx) introducerLoop(flushCh chan *flusherIntroduction, mergeCh chan *mergerIntroduction, watcherCh watcher.Channel, epoch uint64) { + var introducerWatchers watcher.Epochs + defer s.loopCloser.Done() + for { + select { + case <-s.loopCloser.CloseNotify(): + return + case next := <-s.introductions: + s.incTotalIntroduceLoopStarted("mem") + s.introduceMemPart(next, epoch) + s.incTotalIntroduceLoopFinished("mem") + epoch++ + case next := <-flushCh: + s.incTotalIntroduceLoopStarted("flush") + s.introduceFlushed(next, epoch) + s.incTotalIntroduceLoopFinished("flush") + s.gc.clean() + epoch++ + case next := <-mergeCh: + s.incTotalIntroduceLoopStarted("merge") + s.introduceMerged(next, epoch) + s.incTotalIntroduceLoopFinished("merge") + s.gc.clean() + epoch++ + case epochWatcher := <-watcherCh: + introducerWatchers.Add(epochWatcher) + } + curEpoch := s.currentEpoch() + introducerWatchers.Notify(curEpoch) + } +} + +func (s *sidx) introduceMemPart(nextIntroduction *introduction, epoch uint64) { + cur := s.currentSnapshot() + if cur != nil { + defer cur.decRef() + } else { + cur = generateSnapshot() + } + + next := nextIntroduction.memPart + nextSnp := cur.copyAllTo(epoch) + + // Convert memPart to part and wrap it + part := openMemPart(next) + pw := newPartWrapper(part) + nextSnp.parts = append(nextSnp.parts, pw) + + s.replaceSnapshot(nextSnp) + if nextIntroduction.applied != nil { + close(nextIntroduction.applied) + } +} + +func (s *sidx) introduceFlushed(nextIntroduction *flusherIntroduction, epoch uint64) { + cur := s.currentSnapshot() + if cur == nil { + s.l.Panic().Msg("current snapshot is nil") + } + defer cur.decRef() + nextSnp := cur.merge(epoch, nextIntroduction.flushed) + s.replaceSnapshot(nextSnp) + s.persistSnapshot(nextSnp) + if nextIntroduction.applied != nil { + close(nextIntroduction.applied) + } +} + +func (s *sidx) introduceMerged(nextIntroduction *mergerIntroduction, epoch uint64) { + cur := s.currentSnapshot() + if cur == nil { + s.l.Panic().Msg("current snapshot is nil") + return + } + defer cur.decRef() + nextSnp := cur.remove(epoch, nextIntroduction.merged) + + // Wrap the new part + pw := newPartWrapper(nextIntroduction.newPart) + nextSnp.parts = append(nextSnp.parts, pw) + + s.replaceSnapshot(nextSnp) + s.persistSnapshot(nextSnp) + if nextIntroduction.applied != nil { + close(nextIntroduction.applied) + } +} + +func (s *sidx) replaceSnapshot(next *snapshot) { + s.Lock() + defer s.Unlock() + if s.snapshot != nil { + s.snapshot.decRef() + } + s.snapshot = next +} + +func (s *sidx) currentEpoch() uint64 { + snap := s.currentSnapshot() + if snap == nil { + return 0 + } + defer snap.decRef() + return snap.epoch +} diff --git a/banyand/internal/sidx/part_wrapper.go b/banyand/internal/sidx/part_wrapper.go index ca5d83aa..c5c41137 100644 --- a/banyand/internal/sidx/part_wrapper.go +++ b/banyand/internal/sidx/part_wrapper.go @@ -73,11 +73,13 @@ type partWrapper struct { // newPartWrapper creates a new partWrapper with an initial reference count of 1. // The part starts in the active state. func newPartWrapper(p *part) *partWrapper { - return &partWrapper{ + pw := &partWrapper{ p: p, ref: 1, state: int32(partStateActive), } + + return pw } // acquire increments the reference count atomically. @@ -202,6 +204,12 @@ func (pw *partWrapper) isRemoved() bool { return atomic.LoadInt32(&pw.state) == int32(partStateRemoved) } +// isMemPart returns true if this wrapper contains a memory part. +func (pw *partWrapper) isMemPart() bool { + // A memory part typically has no file system path or is stored in memory + return pw.p != nil && pw.p.path == "" +} + // String returns a string representation of the partWrapper. func (pw *partWrapper) String() string { state := pw.getState() diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go new file mode 100644 index 00000000..4c7fdc4f --- /dev/null +++ b/banyand/internal/sidx/sidx.go @@ -0,0 +1,352 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +// sidx implements the SIDX interface with introduction channels for async operations. +type sidx struct { + snapshot *snapshot + introductions chan *introduction + flushCh chan *flusherIntroduction + mergeCh chan *mergerIntroduction + loopCloser *run.Closer + gc *gc + l *logger.Logger + totalIntroduceLoopStarted atomic.Int64 + totalIntroduceLoopFinished atomic.Int64 + mu sync.RWMutex +} + +// NewSIDX creates a new SIDX instance with introduction channels. +func NewSIDX(opts *Options) (SIDX, error) { + if opts == nil { + opts = NewDefaultOptions() + } + + if err := opts.Validate(); err != nil { + return nil, err + } + + s := &sidx{ + introductions: make(chan *introduction), + flushCh: make(chan *flusherIntroduction), + mergeCh: make(chan *mergerIntroduction), + loopCloser: run.NewCloser(1), + l: logger.GetLogger().Named("sidx"), + } + + // Initialize garbage collector + s.gc = newGC(opts) + + // Start introducer loop + watcherCh := make(watcher.Channel, 10) + go s.introducerLoop(s.flushCh, s.mergeCh, watcherCh, 0) + + return s, nil +} + +// Write implements SIDX interface. +func (s *sidx) Write(ctx context.Context, reqs []WriteRequest) error { + // Validate requests + for _, req := range reqs { + if err := req.Validate(); err != nil { + return err + } + } + + // Create elements from requests + es := generateElements() + defer releaseElements(es) + + for _, req := range reqs { + es.mustAppend(req.SeriesID, req.Key, req.Data, req.Tags) + } + + // Create memory part from elements + mp := generateMemPart() + mp.mustInitFromElements(es) + + // Create introduction + intro := generateIntroduction() + intro.memPart = mp + intro.applied = make(chan struct{}) + + // Send to introducer loop + select { + case s.introductions <- intro: + // Wait for introduction to be applied + <-intro.applied + releaseIntroduction(intro) + return nil + case <-ctx.Done(): + releaseIntroduction(intro) + releaseMemPart(mp) + return ctx.Err() + } +} + +// Query implements SIDX interface. +func (s *sidx) Query(ctx context.Context, req QueryRequest) (QueryResult, error) { + if err := req.Validate(); err != nil { + return nil, err + } + + // Get current snapshot + snap := s.currentSnapshot() + if snap == nil { + return &emptyQueryResult{}, nil + } + + // Create query result + qr := &queryResult{ + snapshot: snap, + request: req, + ctx: ctx, + } + + return qr, nil +} + +// Stats implements SIDX interface. +func (s *sidx) Stats(_ context.Context) (*Stats, error) { + snap := s.currentSnapshot() + if snap == nil { + return &Stats{}, nil + } + defer snap.decRef() + + stats := &Stats{ + PartCount: int64(snap.getPartCount()), + } + + // Load atomic counters + stats.QueryCount.Store(s.totalIntroduceLoopStarted.Load()) + stats.WriteCount.Store(s.totalIntroduceLoopFinished.Load()) + + return stats, nil +} + +// Flush implements Flusher interface. +func (s *sidx) Flush() error { + // Get current memory parts that need flushing + snap := s.currentSnapshot() + if snap == nil { + return nil + } + defer snap.decRef() + + // Create flush introduction + flushIntro := generateFlusherIntroduction() + flushIntro.applied = make(chan struct{}) + + // Select memory parts to flush (simplified logic) + for _, pw := range snap.parts { + if pw.isMemPart() && pw.isActive() { + flushIntro.flushed[pw.ID()] = pw.p + } + } + + if len(flushIntro.flushed) == 0 { + releaseFlusherIntroduction(flushIntro) + return nil + } + + // Send to introducer loop + s.flushCh <- flushIntro + + // Wait for flush to complete + <-flushIntro.applied + releaseFlusherIntroduction(flushIntro) + + return nil +} + +// Merge implements Merger interface. +func (s *sidx) Merge() error { + // Get current snapshot + snap := s.currentSnapshot() + if snap == nil { + return nil + } + defer snap.decRef() + + // Create merge introduction + mergeIntro := generateMergerIntroduction() + mergeIntro.applied = make(chan struct{}) + + // Select parts to merge (simplified logic - merge first 2 parts) + var partsToMerge []*partWrapper + for _, pw := range snap.parts { + if pw.isActive() && !pw.isMemPart() { + partsToMerge = append(partsToMerge, pw) + if len(partsToMerge) >= 2 { + break + } + } + } + + if len(partsToMerge) < 2 { + releaseMergerIntroduction(mergeIntro) + return nil + } + + // Mark parts for merging + for _, pw := range partsToMerge { + mergeIntro.merged[pw.ID()] = struct{}{} + } + + // Create new merged part (simplified - would actually merge the parts) + mergeIntro.newPart = partsToMerge[0].p + + // Send to introducer loop + s.mergeCh <- mergeIntro + + // Wait for merge to complete + <-mergeIntro.applied + releaseMergerIntroduction(mergeIntro) + + return nil +} + +// Close implements SIDX interface. +func (s *sidx) Close() error { + s.loopCloser.CloseThenWait() + + // Close current snapshot + s.mu.Lock() + if s.snapshot != nil { + s.snapshot.decRef() + s.snapshot = nil + } + s.mu.Unlock() + + return nil +} + +// currentSnapshot returns the current snapshot with incremented reference count. +func (s *sidx) currentSnapshot() *snapshot { + s.mu.RLock() + defer s.mu.RUnlock() + + if s.snapshot == nil { + return nil + } + + if s.snapshot.acquire() { + return s.snapshot + } + + return nil +} + +// decRef decrements the snapshot reference count (helper for snapshot interface). +func (s *snapshot) decRef() { + s.release() +} + +// Helper methods for metrics. +func (s *sidx) incTotalIntroduceLoopStarted(_ string) { + s.totalIntroduceLoopStarted.Add(1) +} + +func (s *sidx) incTotalIntroduceLoopFinished(_ string) { + s.totalIntroduceLoopFinished.Add(1) +} + +// persistSnapshot persists the snapshot to disk (placeholder). +func (s *sidx) persistSnapshot(_ *snapshot) { + // TODO: Implement snapshot persistence +} + +// Lock/Unlock methods for introducer loop. +func (s *sidx) Lock() { + s.mu.Lock() +} + +func (s *sidx) Unlock() { + s.mu.Unlock() +} + +// gc represents garbage collector (placeholder). +type gc struct{} + +func newGC(_ *Options) *gc { + return &gc{} +} + +func (g *gc) clean() { + // TODO: Implement garbage collection +} + +// emptyQueryResult represents an empty query result. +type emptyQueryResult struct{} + +func (e *emptyQueryResult) Pull() *QueryResponse { + return nil +} + +func (e *emptyQueryResult) Release() { + // Nothing to release +} + +// queryResult implements QueryResult interface. +type queryResult struct { + ctx context.Context + snapshot *snapshot + request QueryRequest + released bool +} + +func (qr *queryResult) Pull() *QueryResponse { + if qr.released { + return nil + } + + // Simple implementation - return empty response + // TODO: Implement actual query logic + return &QueryResponse{ + Keys: []int64{}, + Data: [][]byte{}, + Tags: [][]Tag{}, + SIDs: []common.SeriesID{}, + Metadata: ResponseMetadata{ + ExecutionTimeMs: 0, + }, + } +} + +func (qr *queryResult) Release() { + if qr.released { + return + } + qr.released = true + if qr.snapshot != nil { + qr.snapshot.decRef() + qr.snapshot = nil + } +} diff --git a/banyand/internal/sidx/snapshot.go b/banyand/internal/sidx/snapshot.go index 08f9eeb7..135a909e 100644 --- a/banyand/internal/sidx/snapshot.go +++ b/banyand/internal/sidx/snapshot.go @@ -303,3 +303,60 @@ func releaseSnapshot(s *snapshot) { s.reset() snapshotPool.Put(s) } + +// copyAllTo creates a new snapshot with all parts from current snapshot. +func (s *snapshot) copyAllTo(epoch uint64) *snapshot { + result := generateSnapshot() + result.parts = make([]*partWrapper, len(s.parts)) + result.epoch = epoch + result.ref = 1 + result.released.Store(false) + + // Copy all parts and acquire references + copy(result.parts, s.parts) + for _, pw := range result.parts { + if pw != nil { + pw.acquire() + } + } + + return result +} + +// merge creates a new snapshot by merging flushed parts into the current snapshot. +func (s *snapshot) merge(epoch uint64, flushed map[uint64]*part) *snapshot { + result := s.copyAllTo(epoch) + + // Add flushed parts to the snapshot + for partID, part := range flushed { + // Set the part ID from the map key + if part != nil && part.partMetadata != nil { + part.partMetadata.ID = partID + } + // Create part wrapper for the flushed part + pw := newPartWrapper(part) + result.parts = append(result.parts, pw) + } + + result.sortPartsByEpoch() + return result +} + +// remove creates a new snapshot by removing specified parts. +func (s *snapshot) remove(epoch uint64, toRemove map[uint64]struct{}) *snapshot { + result := generateSnapshot() + result.epoch = epoch + result.ref = 1 + result.released.Store(false) + + // Copy parts except those being removed + for _, pw := range s.parts { + if _, shouldRemove := toRemove[pw.ID()]; !shouldRemove { + if pw.acquire() { + result.parts = append(result.parts, pw) + } + } + } + + return result +}