This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new f23caf08 Fix the unset partID (#799)
f23caf08 is described below
commit f23caf08871b963bbb07b44141457660ff487568
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Oct 1 14:38:07 2025 +0800
Fix the unset partID (#799)
---
banyand/internal/sidx/block_reader_test.go | 2 +-
banyand/internal/sidx/interfaces.go | 4 +--
banyand/internal/sidx/iter_test.go | 14 +++++-----
banyand/internal/sidx/merge.go | 24 ++++++++--------
banyand/internal/sidx/merge_test.go | 3 +-
banyand/internal/sidx/multi_sidx_query_test.go | 8 +++---
banyand/internal/sidx/part.go | 3 +-
banyand/internal/sidx/part_iter_test.go | 16 +++++------
banyand/internal/sidx/part_test.go | 4 +--
banyand/internal/sidx/part_wrapper.go | 28 +++++++++++++++----
banyand/internal/sidx/part_wrapper_test.go | 14 +++++++++-
banyand/internal/sidx/sidx.go | 6 ++--
banyand/trace/flusher.go | 2 +-
banyand/trace/merger.go | 38 +++++++++++++++++++++-----
14 files changed, 108 insertions(+), 58 deletions(-)
diff --git a/banyand/internal/sidx/block_reader_test.go
b/banyand/internal/sidx/block_reader_test.go
index 83660e61..06ef81b3 100644
--- a/banyand/internal/sidx/block_reader_test.go
+++ b/banyand/internal/sidx/block_reader_test.go
@@ -206,7 +206,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
mpp = append(mpp, mp)
mp.mustInitFromElements(elems)
mp.mustFlush(fileSystem,
partPath(tmpPath, uint64(i)))
- filePW := newPartWrapper(nil,
mustOpenPart(partPath(tmpPath, uint64(i)), fileSystem))
+ filePW := newPartWrapper(nil,
mustOpenPart(uint64(i), partPath(tmpPath, uint64(i)), fileSystem))
filePW.p.partMetadata.ID = uint64(i)
fpp = append(fpp, filePW)
pp = append(pp, filePW.p)
diff --git a/banyand/internal/sidx/interfaces.go
b/banyand/internal/sidx/interfaces.go
index 411c9e2a..f514886f 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -52,9 +52,9 @@ type SIDX interface {
// Flush flushes the SIDX instance to disk.
Flush() error
// Merge merges the specified parts into a new part.
- Merge(closeCh <-chan struct{}) error
+ Merge(closeCh <-chan struct{}) (uint64, error)
// MergeMemPart merges the mem parts into a new part.
- MergeMemParts(closeCh <-chan struct{}) error
+ MergeMemParts(closeCh <-chan struct{}) (uint64, error)
// PartsToSync returns the parts to sync.
PartsToSync() []*part
// StreamingParts returns the streaming parts.
diff --git a/banyand/internal/sidx/iter_test.go
b/banyand/internal/sidx/iter_test.go
index 53f34e6f..24d61235 100644
--- a/banyand/internal/sidx/iter_test.go
+++ b/banyand/internal/sidx/iter_test.go
@@ -233,7 +233,7 @@ func TestIterComprehensive(t *testing.T) {
if partType == "file_based" {
partDir :=
filepath.Join(tempDir, fmt.Sprintf("%s_%s_part%d", partType, tc.name, i))
mp.mustFlush(testFS,
partDir)
- testPart =
mustOpenPart(partDir, testFS)
+ testPart =
mustOpenPart(uint64(i), partDir, testFS)
} else {
testPart =
openMemPart(mp)
}
@@ -278,7 +278,7 @@ func TestIterEdgeCases(t *testing.T) {
partDir := filepath.Join(tempDir, "empty_series")
mp.mustFlush(testFS, partDir)
- testPart := mustOpenPart(partDir, testFS)
+ testPart := mustOpenPart(1, partDir, testFS)
defer testPart.close()
bma := generateBlockMetadataArray()
@@ -317,9 +317,9 @@ func TestIterEdgeCases(t *testing.T) {
mp1.mustFlush(testFS, partDir1)
mp2.mustFlush(testFS, partDir2)
- testPart1 := mustOpenPart(partDir1, testFS)
+ testPart1 := mustOpenPart(1, partDir1, testFS)
defer testPart1.close()
- testPart2 := mustOpenPart(partDir2, testFS)
+ testPart2 := mustOpenPart(2, partDir2, testFS)
defer testPart2.close()
bma := generateBlockMetadataArray()
@@ -346,7 +346,7 @@ func TestIterEdgeCases(t *testing.T) {
partDir := filepath.Join(tempDir, "single_part")
mp.mustFlush(testFS, partDir)
- testPart := mustOpenPart(partDir, testFS)
+ testPart := mustOpenPart(1, partDir, testFS)
defer testPart.close()
bma := generateBlockMetadataArray()
@@ -427,9 +427,9 @@ func TestIterOrdering(t *testing.T) {
mp1.mustFlush(testFS, partDir1)
mp2.mustFlush(testFS, partDir2)
- testPart1 := mustOpenPart(partDir1, testFS)
+ testPart1 := mustOpenPart(1, partDir1, testFS)
defer testPart1.close()
- testPart2 := mustOpenPart(partDir2, testFS)
+ testPart2 := mustOpenPart(2, partDir2, testFS)
defer testPart2.close()
bma := generateBlockMetadataArray()
diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go
index 9acbdcd3..e384bd95 100644
--- a/banyand/internal/sidx/merge.go
+++ b/banyand/internal/sidx/merge.go
@@ -31,11 +31,11 @@ var (
)
// Merge implements Merger interface.
-func (s *sidx) Merge(closeCh <-chan struct{}) error {
+func (s *sidx) Merge(closeCh <-chan struct{}) (uint64, error) {
// Get current snapshot
snap := s.currentSnapshot()
if snap == nil {
- return nil
+ return 0, nil
}
defer snap.decRef()
@@ -53,7 +53,7 @@ func (s *sidx) Merge(closeCh <-chan struct{}) error {
}
if len(partsToMerge) < 2 {
- return nil
+ return 0, nil
}
// Mark parts for merging
@@ -67,7 +67,7 @@ func (s *sidx) Merge(closeCh <-chan struct{}) error {
// Create new merged part
newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge,
newPartID, s.root)
if err != nil {
- return err
+ return 0, err
}
mergeIntro.newPart = newPart
@@ -77,13 +77,13 @@ func (s *sidx) Merge(closeCh <-chan struct{}) error {
// Wait for merge to complete
<-mergeIntro.applied
- return nil
+ return uint64(len(partsToMerge)), nil
}
-func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error {
+func (s *sidx) MergeMemParts(closeCh <-chan struct{}) (uint64, error) {
snap := s.currentSnapshot()
if snap == nil {
- return nil
+ return 0, nil
}
defer snap.decRef()
@@ -92,7 +92,7 @@ func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error {
defer releaseMergerIntroduction(mergeIntro)
mergeIntro.applied = make(chan struct{})
- // Select parts to merge (all active non-memory parts)
+ // Select parts to merge (all active memory parts)
var partsToMerge []*partWrapper
for _, pw := range snap.parts {
if pw.isActive() && pw.isMemPart() {
@@ -101,7 +101,7 @@ func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error
{
}
if len(partsToMerge) < 2 {
- return nil
+ return 0, nil
}
// Mark parts for merging
@@ -115,7 +115,7 @@ func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error
{
// Create new merged part
newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge,
newPartID, s.root)
if err != nil {
- return err
+ return 0, err
}
mergeIntro.newPart = newPart
@@ -125,7 +125,7 @@ func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error
{
// Wait for merge to complete
<-mergeIntro.applied
- return nil
+ return uint64(len(partsToMerge)), nil
}
func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{},
parts []*partWrapper, partID uint64, root string) (*partWrapper, error) {
@@ -158,7 +158,7 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh
<-chan struct{}, par
}
pm.mustWriteMetadata(fileSystem, dstPath)
fileSystem.SyncPath(dstPath)
- p := mustOpenPart(dstPath, fileSystem)
+ p := mustOpenPart(partID, dstPath, fileSystem)
return newPartWrapper(nil, p), nil
}
diff --git a/banyand/internal/sidx/merge_test.go
b/banyand/internal/sidx/merge_test.go
index bf6eba95..3bdaa78e 100644
--- a/banyand/internal/sidx/merge_test.go
+++ b/banyand/internal/sidx/merge_test.go
@@ -418,9 +418,8 @@ func Test_mergeParts(t *testing.T) {
mp.mustInitFromElements(es)
partPath := filepath.Join(tmpPath,
"part_"+string(rune('0'+i)))
mp.mustFlush(fileSystem, partPath)
- filePart := mustOpenPart(partPath,
fileSystem)
+ filePart := mustOpenPart(uint64(i),
partPath, fileSystem)
filePW := newPartWrapper(nil, filePart)
- filePW.p.partMetadata.ID = uint64(i)
fpp = append(fpp, filePW)
ReleaseMemPart(mp)
}
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go
b/banyand/internal/sidx/multi_sidx_query_test.go
index 3249a50a..853e2c16 100644
--- a/banyand/internal/sidx/multi_sidx_query_test.go
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -62,12 +62,12 @@ func (m *mockSIDX) Flush() error {
return nil
}
-func (m *mockSIDX) Merge(_ <-chan struct{}) error {
- return nil
+func (m *mockSIDX) Merge(_ <-chan struct{}) (uint64, error) {
+ return 0, nil
}
-func (m *mockSIDX) MergeMemParts(_ <-chan struct{}) error {
- return nil
+func (m *mockSIDX) MergeMemParts(_ <-chan struct{}) (uint64, error) {
+ return 0, nil
}
func (m *mockSIDX) PartsToSync() []*part {
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index 5b02efbb..3413812a 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -104,7 +104,7 @@ func (p *part) ID() uint64 {
// mustOpenPart opens a part from the specified path using the given file
system.
// It opens all standard files and discovers tag files automatically.
// Panics if any required file cannot be opened.
-func mustOpenPart(path string, fileSystem fs.FileSystem) *part {
+func mustOpenPart(partID uint64, path string, fileSystem fs.FileSystem) *part {
p := &part{
path: path,
fileSystem: fileSystem,
@@ -121,6 +121,7 @@ func mustOpenPart(path string, fileSystem fs.FileSystem)
*part {
p.close()
logger.GetLogger().Panic().Err(err).Str("path",
path).Msg("failed to load part metadata")
}
+ p.partMetadata.ID = partID
// Load primary block metadata from primary.bin.
p.loadPrimaryBlockMetadata()
diff --git a/banyand/internal/sidx/part_iter_test.go
b/banyand/internal/sidx/part_iter_test.go
index eb67ab1d..a64fdde5 100644
--- a/banyand/internal/sidx/part_iter_test.go
+++ b/banyand/internal/sidx/part_iter_test.go
@@ -314,7 +314,7 @@ func TestPartIterVerification(t *testing.T) {
partDir := filepath.Join(tempDir,
fmt.Sprintf("part_%s", tt.name))
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Run the test case
@@ -376,7 +376,7 @@ func TestPartIterEdgeCases(t *testing.T) {
partDir := filepath.Join(tempDir, "empty_series_test")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Test with empty series list
@@ -417,7 +417,7 @@ func TestPartIterEdgeCases(t *testing.T) {
partDir := filepath.Join(tempDir, "no_match_key_range")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Test with non-overlapping key range
@@ -458,7 +458,7 @@ func TestPartIterEdgeCases(t *testing.T) {
partDir := filepath.Join(tempDir, "no_match_series")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Test with different series ID
@@ -504,7 +504,7 @@ func TestPartIterBlockFilter(t *testing.T) {
partDir := filepath.Join(tempDir, "nil_filter")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Test with nil blockFilter
@@ -546,7 +546,7 @@ func TestPartIterBlockFilter(t *testing.T) {
partDir := filepath.Join(tempDir, "allow_all_filter")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Create a mock filter that allows all blocks
@@ -591,7 +591,7 @@ func TestPartIterBlockFilter(t *testing.T) {
partDir := filepath.Join(tempDir, "skip_all_filter")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Create a mock filter that skips all blocks
@@ -636,7 +636,7 @@ func TestPartIterBlockFilter(t *testing.T) {
partDir := filepath.Join(tempDir, "error_filter")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Create a mock filter that returns an error
diff --git a/banyand/internal/sidx/part_test.go
b/banyand/internal/sidx/part_test.go
index 728a92ca..d89f8e89 100644
--- a/banyand/internal/sidx/part_test.go
+++ b/banyand/internal/sidx/part_test.go
@@ -139,7 +139,7 @@ func TestPartStringRepresentation(t *testing.T) {
require.NoError(t, err)
}
- part := mustOpenPart(tempDir, testFS)
+ part := mustOpenPart(pm.ID, tempDir, testFS)
expectedString := fmt.Sprintf("sidx part %d at %s", pm.ID, tempDir)
assert.Equal(t, expectedString, part.String())
@@ -390,7 +390,7 @@ func TestMemPartFlushAndReadAllRoundTrip(t *testing.T) {
mp.mustFlush(testFS, partDir)
// Step 3: Open the flushed part from disk
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Step 4: Read all elements back from part
diff --git a/banyand/internal/sidx/part_wrapper.go
b/banyand/internal/sidx/part_wrapper.go
index cd589f99..42244f80 100644
--- a/banyand/internal/sidx/part_wrapper.go
+++ b/banyand/internal/sidx/part_wrapper.go
@@ -168,11 +168,7 @@ func (pw *partWrapper) markForRemoval() {
}
// ID returns the unique identifier of the part.
-// Returns 0 if the part is nil.
func (pw *partWrapper) ID() uint64 {
- if pw.p == nil || pw.p.partMetadata == nil {
- return 0
- }
return pw.p.partMetadata.ID
}
@@ -218,12 +214,32 @@ func (pw *partWrapper) String() string {
}
if pw.mp != nil {
+ var id uint64
+ if pw.mp.partMetadata != nil {
+ id = pw.mp.partMetadata.ID
+ }
return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d,
memPart=true}",
- pw.ID(), state, refCount)
+ id, state, refCount)
+ }
+
+ if pw.p == nil {
+ return fmt.Sprintf("partWrapper{id=nil, state=%s, ref=%d,
part=nil}", state, refCount)
+ }
+
+ // Handle case where p.partMetadata might be nil after cleanup
+ var id uint64
+ var path string
+ if pw.p.partMetadata != nil {
+ id = pw.p.partMetadata.ID
+ }
+ if pw.p.path != "" {
+ path = pw.p.path
+ } else {
+ path = "unknown"
}
return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d, path=%s}",
- pw.ID(), state, refCount, pw.p.path)
+ id, state, refCount, path)
}
// overlapsKeyRange checks if the part overlaps with the given key range.
diff --git a/banyand/internal/sidx/part_wrapper_test.go
b/banyand/internal/sidx/part_wrapper_test.go
index 4ca6af43..9a1a0971 100644
--- a/banyand/internal/sidx/part_wrapper_test.go
+++ b/banyand/internal/sidx/part_wrapper_test.go
@@ -229,7 +229,19 @@ func TestPartWrapper_MultipleReleases(t *testing.T) {
pw := newPartWrapper(nil, p)
- // Release once (should reach 0)
+ // Test multiple releases before cleanup
+ pw.acquire() // ref count = 2
+ pw.acquire() // ref count = 3
+
+ pw.release() // ref count = 2
+ assert.Equal(t, int32(2), pw.refCount())
+ assert.True(t, pw.isActive())
+
+ pw.release() // ref count = 1
+ assert.Equal(t, int32(1), pw.refCount())
+ assert.True(t, pw.isActive())
+
+ // Final release should trigger cleanup and set ref to 0
pw.release()
assert.Equal(t, int32(0), pw.refCount())
assert.True(t, pw.isRemoved())
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 64777d6b..2436b379 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -459,8 +459,7 @@ func (s *sidx) Flush() error {
Str("part_path", partPath).
Msg("flushing sidx part")
}
- newPW := newPartWrapper(nil, mustOpenPart(partPath,
s.fileSystem))
- newPW.p.partMetadata.ID = pw.ID()
+ newPW := newPartWrapper(nil, mustOpenPart(pw.ID(), partPath,
s.fileSystem))
flushIntro.flushed[newPW.ID()] = newPW
}
@@ -868,8 +867,7 @@ func (s *sidx) loadSnapshot(epoch uint64, loadedParts
[]uint64) {
continue
}
partPath := partPath(s.root, id)
- part := mustOpenPart(partPath, s.fileSystem)
- part.partMetadata.ID = id
+ part := mustOpenPart(id, partPath, s.fileSystem)
pw := newPartWrapper(nil, part)
snp.addPart(pw)
if s.curPartID < id {
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 76096b4b..ce8338af 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -192,7 +192,7 @@ func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh
chan *mergerIntroductio
// merge memory must not be closed by the tsTable.close
closeCh := make(chan struct{})
newPart, err :=
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
- currentMergedIDs, mergeCh, closeCh, "mem")
+ currentMergedIDs, mergeCh, closeCh, mergeTypeMem)
close(closeCh)
if err != nil {
if errors.Is(err, errClosed) {
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 148aa6fa..fe2fb194 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -33,6 +33,11 @@ import (
var mergeMaxConcurrencyCh = make(chan struct{}, cgroups.CPUs())
+var (
+ mergeTypeMem = "mem"
+ mergeTypeFile = "file"
+)
+
func (tst *tsTable) mergeLoop(merges chan *mergerIntroduction, flusherNotifier
watcher.Channel) {
defer tst.loopCloser.Done()
@@ -95,7 +100,7 @@ func (tst *tsTable) mergeSnapshot(curSnapshot *snapshot,
merges chan *mergerIntr
return nil, nil
}
if _, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMerger,
dst,
- toBeMerged, merges, tst.loopCloser.CloseNotify(), "file"); err
!= nil {
+ toBeMerged, merges, tst.loopCloser.CloseNotify(),
mergeTypeFile); err != nil {
return dst, err
}
return dst, nil
@@ -112,12 +117,6 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator
snapshotCreator, part
if err != nil {
return nil, err
}
- for _, sidxInstance := range tst.getAllSidx() {
- if err := sidxInstance.MergeMemParts(closeCh); err != nil {
- tst.l.Warn().Err(err).Msg("sidx merge failed")
- return nil, err
- }
- }
elapsed := time.Since(start)
tst.incTotalMergeLatency(elapsed.Seconds(), typ)
tst.incTotalMerged(1, typ)
@@ -157,6 +156,31 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator
snapshotCreator, part
Msg("background merger merges unbalanced parts")
}
}
+ for sidxName, sidxInstance := range tst.getAllSidx() {
+ start = time.Now()
+ var mergedPartsCount uint64
+ var err error
+ if typ == mergeTypeMem {
+ mergedPartsCount, err =
sidxInstance.MergeMemParts(closeCh)
+ if err != nil {
+ tst.l.Warn().Err(err).Msg("sidx merge mem parts
failed")
+ return nil, err
+ }
+ } else {
+ mergedPartsCount, err = sidxInstance.Merge(closeCh)
+ if err != nil {
+ tst.l.Warn().Err(err).Msg("sidx merge file
parts failed")
+ return nil, err
+ }
+ }
+ elapsed = time.Since(start)
+ tst.incTotalMergeLatency(elapsed.Seconds(),
fmt.Sprintf("%s_%s", typ, sidxName))
+ tst.incTotalMerged(1, fmt.Sprintf("%s_%s", typ, sidxName))
+ tst.incTotalMergedParts(int(mergedPartsCount),
fmt.Sprintf("%s_%s", typ, sidxName))
+ if elapsed > 30*time.Second {
+ tst.l.Warn().Uint64("mergedPartsCount",
mergedPartsCount).Str("sidxName", sidxName).Dur("elapsed", elapsed).Msg("sidx
merge parts took too long")
+ }
+ }
mi := generateMergerIntroduction()
defer releaseMergerIntroduction(mi)