This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/element in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit ec817ff17f398531fd0a790d95e85d9f633b3620 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Mon Aug 18 22:15:22 2025 +0700 Add partWrapper structure and comprehensive tests for reference counting and lifecycle management - Introduced `part_wrapper.go` and `part_wrapper_test.go` files to implement the `partWrapper` structure, which provides thread-safe reference counting and manages the lifecycle of parts. --- banyand/internal/sidx/part_wrapper.go | 217 +++++++++++++++++++ banyand/internal/sidx/part_wrapper_test.go | 333 +++++++++++++++++++++++++++++ 2 files changed, 550 insertions(+) diff --git a/banyand/internal/sidx/part_wrapper.go b/banyand/internal/sidx/part_wrapper.go new file mode 100644 index 00000000..d808449a --- /dev/null +++ b/banyand/internal/sidx/part_wrapper.go @@ -0,0 +1,217 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "fmt" + "sync/atomic" + + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// partWrapperState represents the state of a partWrapper in its lifecycle. +type partWrapperState int32 + +const ( + // partStateActive indicates the part is active and available for use. + partStateActive partWrapperState = iota + // partStateRemoving indicates the part is being removed but may still have references. + partStateRemoving + // partStateRemoved indicates the part has been fully removed and cleaned up. + partStateRemoved +) + +// String returns a string representation of the part state. +func (s partWrapperState) String() string { + switch s { + case partStateActive: + return "active" + case partStateRemoving: + return "removing" + case partStateRemoved: + return "removed" + default: + return fmt.Sprintf("unknown(%d)", int32(s)) + } +} + +// partWrapper provides thread-safe reference counting for parts. +// It enables safe concurrent access to parts while managing their lifecycle. +// When the reference count reaches zero, the underlying part is cleaned up. +type partWrapper struct { + // p is the underlying part. It can be nil for memory parts. + p *part + + // ref is the atomic reference counter. + // It starts at 1 when the wrapper is created. + ref int32 + + // state tracks the lifecycle state of the part. + // State transitions: active -> removing -> removed + state int32 + + // removable indicates if the part should be removed from disk when dereferenced. + // This is typically true for parts that have been merged or are no longer needed. + removable atomic.Bool +} + +// newPartWrapper creates a new partWrapper with an initial reference count of 1. +// The part starts in the active state. +func newPartWrapper(p *part) *partWrapper { + return &partWrapper{ + p: p, + ref: 1, + state: int32(partStateActive), + } +} + +// acquire increments the reference count atomically. +// Returns true if the reference was successfully acquired (part is still active), +// false if the part is being removed or has been removed. +func (pw *partWrapper) acquire() bool { + // Check state first to avoid unnecessary atomic operations + if atomic.LoadInt32(&pw.state) != int32(partStateActive) { + return false + } + + // Try to increment reference count + for { + oldRef := atomic.LoadInt32(&pw.ref) + if oldRef <= 0 { + // Reference count is already zero or negative, cannot acquire + return false + } + + // Double-check state hasn't changed + if atomic.LoadInt32(&pw.state) != int32(partStateActive) { + return false + } + + // Try to atomically increment + if atomic.CompareAndSwapInt32(&pw.ref, oldRef, oldRef+1) { + return true + } + // Retry if CAS failed + } +} + +// release decrements the reference count atomically. +// When the reference count reaches zero, the part is cleaned up. +// This method is safe to call multiple times. +func (pw *partWrapper) release() { + newRef := atomic.AddInt32(&pw.ref, -1) + if newRef > 0 { + return + } + + if newRef < 0 { + // This shouldn't happen in correct usage, but log it for debugging + logger.GetLogger().Warn(). + Int32("ref", newRef). + Str("part", pw.String()). + Msg("partWrapper reference count went negative") + return + } + + // Reference count reached zero, perform cleanup + pw.cleanup() +} + +// cleanup performs the actual cleanup when reference count reaches zero. +// This includes closing the part and potentially removing files from disk. +func (pw *partWrapper) cleanup() { + // Mark as removed + atomic.StoreInt32(&pw.state, int32(partStateRemoved)) + + if pw.p == nil { + return + } + + // Close the part to release file handles + pw.p.close() + + // Remove from disk if marked as removable + if pw.removable.Load() && pw.p.fileSystem != nil { + go func(partPath string, fileSystem interface{}) { + // Use a goroutine for potentially slow disk operations + // to avoid blocking the caller + if fs, ok := fileSystem.(interface{ MustRMAll(string) }); ok { + fs.MustRMAll(partPath) + } + }(pw.p.path, pw.p.fileSystem) + } +} + +// markForRemoval marks the part as removable and transitions it to the removing state. +// Once marked for removal, no new references can be acquired. +// The part will be physically removed when the reference count reaches zero. +func (pw *partWrapper) markForRemoval() { + // Transition to removing state + atomic.StoreInt32(&pw.state, int32(partStateRemoving)) + // Mark as removable for cleanup + pw.removable.Store(true) +} + +// ID returns the unique identifier of the part. +// Returns 0 if the part is nil. +func (pw *partWrapper) ID() uint64 { + if pw.p == nil || pw.p.partMetadata == nil { + return 0 + } + return pw.p.partMetadata.ID +} + +// refCount returns the current reference count. +// This is primarily for testing and debugging. +func (pw *partWrapper) refCount() int32 { + return atomic.LoadInt32(&pw.ref) +} + +// getState returns the current state of the part. +func (pw *partWrapper) getState() partWrapperState { + return partWrapperState(atomic.LoadInt32(&pw.state)) +} + +// isActive returns true if the part is in the active state. +func (pw *partWrapper) isActive() bool { + return atomic.LoadInt32(&pw.state) == int32(partStateActive) +} + +// isRemoving returns true if the part is in the removing state. +func (pw *partWrapper) isRemoving() bool { + return atomic.LoadInt32(&pw.state) == int32(partStateRemoving) +} + +// isRemoved returns true if the part is in the removed state. +func (pw *partWrapper) isRemoved() bool { + return atomic.LoadInt32(&pw.state) == int32(partStateRemoved) +} + +// String returns a string representation of the partWrapper. +func (pw *partWrapper) String() string { + state := pw.getState() + refCount := pw.refCount() + + if pw.p == nil { + return fmt.Sprintf("partWrapper{id=nil, state=%s, ref=%d}", state, refCount) + } + + return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d, path=%s}", + pw.ID(), state, refCount, pw.p.path) +} + diff --git a/banyand/internal/sidx/part_wrapper_test.go b/banyand/internal/sidx/part_wrapper_test.go new file mode 100644 index 00000000..ff0d1d7d --- /dev/null +++ b/banyand/internal/sidx/part_wrapper_test.go @@ -0,0 +1,333 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPartWrapper_BasicLifecycle(t *testing.T) { + // Create a mock part + p := &part{ + path: "/test/part/001", + partMetadata: &partMetadata{ID: 1}, + } + + // Create wrapper + pw := newPartWrapper(p) + require.NotNil(t, pw) + assert.Equal(t, int32(1), pw.refCount()) + assert.True(t, pw.isActive()) + assert.Equal(t, uint64(1), pw.ID()) + + // Test acquire + assert.True(t, pw.acquire()) + assert.Equal(t, int32(2), pw.refCount()) + + // Test multiple acquires + assert.True(t, pw.acquire()) + assert.True(t, pw.acquire()) + assert.Equal(t, int32(4), pw.refCount()) + + // Test releases + pw.release() + assert.Equal(t, int32(3), pw.refCount()) + pw.release() + assert.Equal(t, int32(2), pw.refCount()) + pw.release() + assert.Equal(t, int32(1), pw.refCount()) + + // Final release should trigger cleanup + pw.release() + assert.Equal(t, int32(0), pw.refCount()) + assert.True(t, pw.isRemoved()) +} + +func TestPartWrapper_StateTransitions(t *testing.T) { + p := &part{ + path: "/test/part/002", + partMetadata: &partMetadata{ID: 2}, + } + + pw := newPartWrapper(p) + + // Initial state should be active + assert.True(t, pw.isActive()) + assert.False(t, pw.isRemoving()) + assert.False(t, pw.isRemoved()) + assert.Equal(t, partStateActive, pw.getState()) + + // Mark for removal + pw.markForRemoval() + assert.False(t, pw.isActive()) + assert.True(t, pw.isRemoving()) + assert.False(t, pw.isRemoved()) + assert.Equal(t, partStateRemoving, pw.getState()) + + // Should not be able to acquire new references + assert.False(t, pw.acquire()) + assert.Equal(t, int32(1), pw.refCount()) + + // Release should transition to removed + pw.release() + assert.False(t, pw.isActive()) + assert.False(t, pw.isRemoving()) + assert.True(t, pw.isRemoved()) + assert.Equal(t, partStateRemoved, pw.getState()) +} + +func TestPartWrapper_ConcurrentReferenceCounting(t *testing.T) { + p := &part{ + path: "/test/part/003", + partMetadata: &partMetadata{ID: 3}, + } + + pw := newPartWrapper(p) + + const numGoroutines = 100 + const operationsPerGoroutine = 1000 + + var wg sync.WaitGroup + var successfulAcquires int64 + var successfulReleases int64 + + // Start multiple goroutines that acquire and release references + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < operationsPerGoroutine; j++ { + if pw.acquire() { + atomic.AddInt64(&successfulAcquires, 1) + // Hold the reference briefly + time.Sleep(time.Microsecond) + pw.release() + atomic.AddInt64(&successfulReleases, 1) + } + } + }() + } + + wg.Wait() + + // All successful acquires should have corresponding releases + assert.Equal(t, successfulAcquires, successfulReleases) + + // Reference count should be back to 1 (initial reference) + assert.Equal(t, int32(1), pw.refCount()) + + // Final cleanup + pw.release() + assert.Equal(t, int32(0), pw.refCount()) +} + +func TestPartWrapper_ConcurrentAcquireWithMarkForRemoval(t *testing.T) { + p := &part{ + path: "/test/part/004", + partMetadata: &partMetadata{ID: 4}, + } + + pw := newPartWrapper(p) + + const numGoroutines = 20 + var wg sync.WaitGroup + var successfulAcquires int64 + var failedAcquires int64 + var startBarrier sync.WaitGroup + + startBarrier.Add(1) // Barrier to synchronize goroutine start + + // Start goroutines trying to acquire references + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + startBarrier.Wait() // Wait for all goroutines to be ready + for j := 0; j < 500; j++ { + if pw.acquire() { + atomic.AddInt64(&successfulAcquires, 1) + time.Sleep(time.Microsecond) + pw.release() + } else { + atomic.AddInt64(&failedAcquires, 1) + } + } + }() + } + + // Wait a bit, then mark for removal while goroutines are running + time.Sleep(5 * time.Millisecond) + pw.markForRemoval() + startBarrier.Done() // Release all goroutines to start working + + wg.Wait() + + t.Logf("Successful acquires: %d, Failed acquires: %d", + successfulAcquires, failedAcquires) + + // Should have some failed acquires after marking for removal + // Note: This may be 0 if all goroutines acquired before markForRemoval, + // which is acceptable behavior + t.Logf("Failed acquires: %d (may be 0 due to timing)", failedAcquires) + + // Reference count should be 1 (initial reference) + assert.Equal(t, int32(1), pw.refCount()) + assert.True(t, pw.isRemoving()) + + // Final cleanup + pw.release() + assert.Equal(t, int32(0), pw.refCount()) + assert.True(t, pw.isRemoved()) +} + +func TestPartWrapper_NilPart(t *testing.T) { + pw := newPartWrapper(nil) + require.NotNil(t, pw) + + assert.Equal(t, int32(1), pw.refCount()) + assert.Equal(t, uint64(0), pw.ID()) // Should return 0 for nil part + assert.True(t, pw.isActive()) + + // Test acquire/release with nil part + assert.True(t, pw.acquire()) + assert.Equal(t, int32(2), pw.refCount()) + + pw.release() + assert.Equal(t, int32(1), pw.refCount()) + + pw.release() + assert.Equal(t, int32(0), pw.refCount()) + assert.True(t, pw.isRemoved()) +} + +func TestPartWrapper_MultipleReleases(t *testing.T) { + p := &part{ + path: "/test/part/005", + partMetadata: &partMetadata{ID: 5}, + } + + pw := newPartWrapper(p) + + // Release once (should reach 0) + pw.release() + assert.Equal(t, int32(0), pw.refCount()) + assert.True(t, pw.isRemoved()) + + // Additional releases should not cause issues (though they log warnings) + pw.release() + pw.release() + assert.Equal(t, int32(-2), pw.refCount()) // Goes negative but doesn't break +} + +func TestPartWrapper_StringRepresentation(t *testing.T) { + // Test with nil part + pw1 := newPartWrapper(nil) + str1 := pw1.String() + assert.Contains(t, str1, "id=nil") + assert.Contains(t, str1, "state=active") + assert.Contains(t, str1, "ref=1") + + // Test with real part + p := &part{ + path: "/test/part/006", + partMetadata: &partMetadata{ID: 6}, + } + pw2 := newPartWrapper(p) + str2 := pw2.String() + assert.Contains(t, str2, "id=6") + assert.Contains(t, str2, "state=active") + assert.Contains(t, str2, "ref=1") + assert.Contains(t, str2, "path=/test/part/006") + + // Test state changes in string + pw2.markForRemoval() + str3 := pw2.String() + assert.Contains(t, str3, "state=removing") + + pw2.release() + str4 := pw2.String() + assert.Contains(t, str4, "state=removed") +} + +func TestPartWrapper_StateStringRepresentation(t *testing.T) { + assert.Equal(t, "active", partStateActive.String()) + assert.Equal(t, "removing", partStateRemoving.String()) + assert.Equal(t, "removed", partStateRemoved.String()) + assert.Contains(t, partWrapperState(999).String(), "unknown") +} + +func TestPartWrapper_CleanupWithRemovableFlag(t *testing.T) { + p := &part{ + path: "/test/part/007", + partMetadata: &partMetadata{ID: 7}, + } + + pw := newPartWrapper(p) + + // Test that removable flag is initially false + assert.False(t, pw.removable.Load()) + + // Mark for removal sets the flag + pw.markForRemoval() + assert.True(t, pw.removable.Load()) + assert.True(t, pw.isRemoving()) + + // Release should trigger cleanup + pw.release() + assert.True(t, pw.isRemoved()) +} + +// Benchmark tests. +func BenchmarkPartWrapper_AcquireRelease(b *testing.B) { + p := &part{ + path: "/bench/part", + partMetadata: &partMetadata{ID: 1}, + } + pw := newPartWrapper(p) + defer pw.release() // cleanup + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if pw.acquire() { + pw.release() + } + } + }) +} + +func BenchmarkPartWrapper_StateCheck(b *testing.B) { + p := &part{ + path: "/bench/part", + partMetadata: &partMetadata{ID: 1}, + } + pw := newPartWrapper(p) + defer pw.release() // cleanup + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = pw.isActive() + } +} +