This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/snapshot
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 2ef0390599f19cfbcc8531f117cca5f8a8c7269d
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Aug 21 15:30:14 2025 +0700

    Update introducer test cases
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 banyand/internal/sidx/TODO.md            |  18 +-
 banyand/internal/sidx/introducer_test.go | 410 +++++++++++++++++++++++++++++++
 2 files changed, 419 insertions(+), 9 deletions(-)

diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md
index 506f6dc8..22f082bf 100644
--- a/banyand/internal/sidx/TODO.md
+++ b/banyand/internal/sidx/TODO.md
@@ -279,15 +279,15 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [x] Graceful shutdown handling
   - [x] No deadlocks in channel communication
 
-### 5.3 Introduction Types (`introducer.go`)
-- [ ] memIntroduction, flusherIntroduction, mergerIntroduction
-- [ ] Object pooling for introduction structures
-- [ ] Channel synchronization with applied notifications
-- [ ] **Test Cases**:
-  - [ ] Introduction pooling reduces allocations
-  - [ ] Channel synchronization correctness
-  - [ ] Applied notifications work reliably
-  - [ ] Introduction reset for reuse
+### 5.3 Introduction Types (`introducer.go`) ✅
+- [x] memIntroduction, flusherIntroduction, mergerIntroduction
+- [x] Object pooling for introduction structures
+- [x] Channel synchronization with applied notifications
+- [x] **Test Cases**:
+  - [x] Introduction pooling reduces allocations
+  - [x] Channel synchronization correctness
+  - [x] Applied notifications work reliably
+  - [x] Introduction reset for reuse
 
 ### 5.4 Snapshot Replacement (`snapshot.go`)
 - [ ] Atomic updates with reference counting
diff --git a/banyand/internal/sidx/introducer_test.go 
b/banyand/internal/sidx/introducer_test.go
new file mode 100644
index 00000000..df4fb04c
--- /dev/null
+++ b/banyand/internal/sidx/introducer_test.go
@@ -0,0 +1,410 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestIntroductionPooling(t *testing.T) {
+       t.Run("introduction pooling reduces allocations", func(t *testing.T) {
+               // Test that the pool works by verifying reuse
+               intro1 := generateIntroduction()
+               releaseIntroduction(intro1)
+
+               intro2 := generateIntroduction()
+               // With pooling, we might get the same instance back
+               require.NotNil(t, intro2, "pool should provide introduction 
instance")
+               releaseIntroduction(intro2)
+
+               // Test multiple allocations work correctly
+               var intros []*introduction
+               for i := 0; i < 10; i++ {
+                       intro := generateIntroduction()
+                       intro.memPart = &memPart{}
+                       intro.applied = make(chan struct{})
+                       intros = append(intros, intro)
+               }
+
+               for _, intro := range intros {
+                       releaseIntroduction(intro)
+               }
+       })
+
+       t.Run("flusher introduction pooling reduces allocations", func(t 
*testing.T) {
+               intro1 := generateFlusherIntroduction()
+               releaseFlusherIntroduction(intro1)
+
+               intro2 := generateFlusherIntroduction()
+               require.NotNil(t, intro2, "pool should provide flusher 
introduction instance")
+               require.NotNil(t, intro2.flushed, "flushed map should be 
initialized")
+               releaseFlusherIntroduction(intro2)
+
+               // Test multiple allocations
+               var intros []*flusherIntroduction
+               for i := 0; i < 10; i++ {
+                       intro := generateFlusherIntroduction()
+                       intro.flushed[uint64(i)] = &part{}
+                       intro.applied = make(chan struct{})
+                       intros = append(intros, intro)
+               }
+
+               for _, intro := range intros {
+                       releaseFlusherIntroduction(intro)
+               }
+       })
+
+       t.Run("merger introduction pooling reduces allocations", func(t 
*testing.T) {
+               intro1 := generateMergerIntroduction()
+               releaseMergerIntroduction(intro1)
+
+               intro2 := generateMergerIntroduction()
+               require.NotNil(t, intro2, "pool should provide merger 
introduction instance")
+               require.NotNil(t, intro2.merged, "merged map should be 
initialized")
+               releaseMergerIntroduction(intro2)
+
+               // Test multiple allocations
+               var intros []*mergerIntroduction
+               for i := 0; i < 10; i++ {
+                       intro := generateMergerIntroduction()
+                       intro.merged[uint64(i)] = struct{}{}
+                       intro.newPart = &part{}
+                       intro.applied = make(chan struct{})
+                       intros = append(intros, intro)
+               }
+
+               for _, intro := range intros {
+                       releaseMergerIntroduction(intro)
+               }
+       })
+}
+
+func TestIntroductionReset(t *testing.T) {
+       t.Run("introduction reset for reuse", func(t *testing.T) {
+               intro := generateIntroduction()
+
+               // Set up introduction with data
+               intro.memPart = &memPart{}
+               intro.applied = make(chan struct{})
+
+               // Reset the introduction
+               intro.reset()
+
+               // Verify all fields are cleared
+               assert.Nil(t, intro.memPart, "memPart should be nil")
+               assert.Nil(t, intro.applied, "applied channel should be nil")
+
+               releaseIntroduction(intro)
+       })
+
+       t.Run("flusher introduction reset for reuse", func(t *testing.T) {
+               intro := generateFlusherIntroduction()
+
+               // Set up flusher introduction with data
+               intro.flushed[1] = &part{}
+               intro.flushed[2] = &part{}
+               intro.applied = make(chan struct{})
+
+               // Reset the flusher introduction
+               intro.reset()
+
+               // Verify all fields are cleared
+               assert.Len(t, intro.flushed, 0, "flushed map should be empty")
+               assert.Nil(t, intro.applied, "applied channel should be nil")
+
+               releaseFlusherIntroduction(intro)
+       })
+
+       t.Run("merger introduction reset for reuse", func(t *testing.T) {
+               intro := generateMergerIntroduction()
+
+               // Set up merger introduction with data
+               intro.merged[1] = struct{}{}
+               intro.merged[2] = struct{}{}
+               intro.newPart = &part{}
+               intro.applied = make(chan struct{})
+
+               // Reset the merger introduction
+               intro.reset()
+
+               // Verify all fields are cleared
+               assert.Len(t, intro.merged, 0, "merged map should be empty")
+               assert.Nil(t, intro.newPart, "newPart should be nil")
+               assert.Nil(t, intro.applied, "applied channel should be nil")
+
+               releaseMergerIntroduction(intro)
+       })
+}
+
+func TestChannelSynchronization(t *testing.T) {
+       t.Run("applied notifications work reliably", func(t *testing.T) {
+               intro := generateIntroduction()
+               intro.applied = make(chan struct{})
+
+               // Start a goroutine that waits for the notification
+               done := make(chan bool)
+               go func() {
+                       select {
+                       case <-intro.applied:
+                               done <- true
+                       case <-time.After(100 * time.Millisecond):
+                               done <- false
+                       }
+               }()
+
+               // Close the applied channel to simulate completion
+               close(intro.applied)
+
+               // Check that the notification was received
+               result := <-done
+               assert.True(t, result, "applied notification should be 
received")
+
+               releaseIntroduction(intro)
+       })
+
+       t.Run("multiple waiters on applied channel", func(t *testing.T) {
+               intro := generateFlusherIntroduction()
+               intro.applied = make(chan struct{})
+
+               const numWaiters = 5
+               var wg sync.WaitGroup
+               results := make(chan bool, numWaiters)
+
+               // Start multiple waiters
+               for i := 0; i < numWaiters; i++ {
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               select {
+                               case <-intro.applied:
+                                       results <- true
+                               case <-time.After(100 * time.Millisecond):
+                                       results <- false
+                               }
+                       }()
+               }
+
+               // Close the applied channel
+               close(intro.applied)
+               wg.Wait()
+               close(results)
+
+               // All waiters should receive the notification
+               successCount := 0
+               for result := range results {
+                       if result {
+                               successCount++
+                       }
+               }
+               assert.Equal(t, numWaiters, successCount, "all waiters should 
receive notification")
+
+               releaseFlusherIntroduction(intro)
+       })
+
+       t.Run("channel synchronization correctness", func(t *testing.T) {
+               intro := generateMergerIntroduction()
+               intro.applied = make(chan struct{})
+
+               // Test that reading from a closed channel doesn't block
+               close(intro.applied)
+
+               // Multiple reads should work
+               for i := 0; i < 3; i++ {
+                       select {
+                       case <-intro.applied:
+                               // Should not block
+                       default:
+                               t.Errorf("reading from closed channel should 
not block")
+                       }
+               }
+
+               releaseMergerIntroduction(intro)
+       })
+}
+
+func TestIntroductionGeneration(t *testing.T) {
+       t.Run("introduction generation creates valid instances", func(t 
*testing.T) {
+               intro := generateIntroduction()
+               require.NotNil(t, intro, "generated introduction should not be 
nil")
+               assert.Nil(t, intro.memPart, "initial memPart should be nil")
+               assert.Nil(t, intro.applied, "initial applied channel should be 
nil")
+               releaseIntroduction(intro)
+       })
+
+       t.Run("flusher introduction generation creates valid instances", func(t 
*testing.T) {
+               intro := generateFlusherIntroduction()
+               require.NotNil(t, intro, "generated flusher introduction should 
not be nil")
+               require.NotNil(t, intro.flushed, "flushed map should be 
initialized")
+               assert.Len(t, intro.flushed, 0, "flushed map should be empty")
+               assert.Nil(t, intro.applied, "initial applied channel should be 
nil")
+               releaseFlusherIntroduction(intro)
+       })
+
+       t.Run("merger introduction generation creates valid instances", func(t 
*testing.T) {
+               intro := generateMergerIntroduction()
+               require.NotNil(t, intro, "generated merger introduction should 
not be nil")
+               require.NotNil(t, intro.merged, "merged map should be 
initialized")
+               assert.Len(t, intro.merged, 0, "merged map should be empty")
+               assert.Nil(t, intro.newPart, "initial newPart should be nil")
+               assert.Nil(t, intro.applied, "initial applied channel should be 
nil")
+               releaseMergerIntroduction(intro)
+       })
+}
+
+func TestConcurrentPoolAccess(t *testing.T) {
+       t.Run("concurrent pool access is safe", func(t *testing.T) {
+               const numGoroutines = 10
+               const operationsPerGoroutine = 100
+
+               var wg sync.WaitGroup
+
+               // Test concurrent access to introduction pool
+               for i := 0; i < numGoroutines; i++ {
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               for j := 0; j < operationsPerGoroutine; j++ {
+                                       intro := generateIntroduction()
+                                       intro.memPart = &memPart{}
+                                       intro.applied = make(chan struct{})
+                                       releaseIntroduction(intro)
+                               }
+                       }()
+               }
+
+               wg.Wait()
+               // Test should complete without data races
+       })
+
+       t.Run("concurrent flusher introduction pool access", func(t *testing.T) 
{
+               const numGoroutines = 10
+               const operationsPerGoroutine = 100
+
+               var wg sync.WaitGroup
+
+               for i := 0; i < numGoroutines; i++ {
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               for j := 0; j < operationsPerGoroutine; j++ {
+                                       intro := generateFlusherIntroduction()
+                                       intro.flushed[uint64(j)] = &part{}
+                                       intro.applied = make(chan struct{})
+                                       releaseFlusherIntroduction(intro)
+                               }
+                       }()
+               }
+
+               wg.Wait()
+       })
+
+       t.Run("concurrent merger introduction pool access", func(t *testing.T) {
+               const numGoroutines = 10
+               const operationsPerGoroutine = 100
+
+               var wg sync.WaitGroup
+
+               for i := 0; i < numGoroutines; i++ {
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               for j := 0; j < operationsPerGoroutine; j++ {
+                                       intro := generateMergerIntroduction()
+                                       intro.merged[uint64(j)] = struct{}{}
+                                       intro.newPart = &part{}
+                                       intro.applied = make(chan struct{})
+                                       releaseMergerIntroduction(intro)
+                               }
+                       }()
+               }
+
+               wg.Wait()
+       })
+}
+
+func TestIntroductionMapOperations(t *testing.T) {
+       t.Run("flusher introduction map operations", func(t *testing.T) {
+               intro := generateFlusherIntroduction()
+
+               // Add parts to flushed map
+               part1 := &part{}
+               part2 := &part{}
+               intro.flushed[1] = part1
+               intro.flushed[2] = part2
+
+               // Verify map contents
+               assert.Equal(t, part1, intro.flushed[1], "part1 should be 
retrievable")
+               assert.Equal(t, part2, intro.flushed[2], "part2 should be 
retrievable")
+               assert.Len(t, intro.flushed, 2, "flushed map should have 2 
entries")
+
+               // Reset should clear the map
+               intro.reset()
+               assert.Len(t, intro.flushed, 0, "flushed map should be empty 
after reset")
+
+               releaseFlusherIntroduction(intro)
+       })
+
+       t.Run("merger introduction map operations", func(t *testing.T) {
+               intro := generateMergerIntroduction()
+
+               // Add IDs to merged map
+               intro.merged[1] = struct{}{}
+               intro.merged[2] = struct{}{}
+               intro.merged[3] = struct{}{}
+
+               // Verify map contents
+               _, exists1 := intro.merged[1]
+               _, exists2 := intro.merged[2]
+               _, exists3 := intro.merged[3]
+               _, exists4 := intro.merged[4]
+
+               assert.True(t, exists1, "ID 1 should exist in merged map")
+               assert.True(t, exists2, "ID 2 should exist in merged map")
+               assert.True(t, exists3, "ID 3 should exist in merged map")
+               assert.False(t, exists4, "ID 4 should not exist in merged map")
+               assert.Len(t, intro.merged, 3, "merged map should have 3 
entries")
+
+               // Reset should clear the map
+               intro.reset()
+               assert.Len(t, intro.merged, 0, "merged map should be empty 
after reset")
+
+               releaseMergerIntroduction(intro)
+       })
+}
+
+func TestNilSafetyForIntroductions(t *testing.T) {
+       t.Run("release nil introduction", func(_ *testing.T) {
+               // Should not panic
+               releaseIntroduction(nil)
+       })
+
+       t.Run("release nil flusher introduction", func(_ *testing.T) {
+               // Should not panic
+               releaseFlusherIntroduction(nil)
+       })
+
+       t.Run("release nil merger introduction", func(_ *testing.T) {
+               // Should not panic
+               releaseMergerIntroduction(nil)
+       })
+}
\ No newline at end of file

Reply via email to