This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/trace-merge in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 2dde7da2c04608fcd3fa005e9c11ad6799086c61 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Mon Sep 8 04:51:56 2025 +0000 refactor: simplify SIDX interface and improve part handling - Updated SIDX and Merger interfaces to remove unnecessary parameters, enhancing clarity and usability. - Refactored related implementations and tests to align with the new interface signatures. - Introduced partWrapper for better part management in the flusherIntroduction structure. - Improved validation logic in WriteRequest to streamline error handling. --- banyand/internal/sidx/interfaces.go | 7 ++----- banyand/internal/sidx/introducer.go | 4 ++-- banyand/internal/sidx/introducer_test.go | 12 ++++++------ banyand/internal/sidx/merge.go | 11 +++++++---- banyand/internal/sidx/multi_sidx_query_test.go | 4 ++-- banyand/internal/sidx/part_wrapper.go | 2 +- banyand/internal/sidx/sidx.go | 11 +++++++++-- banyand/internal/sidx/sidx_test.go | 26 +++++++++++++------------- banyand/internal/sidx/snapshot.go | 25 +++++++++++-------------- banyand/internal/sidx/snapshot_test.go | 10 +++++----- banyand/trace/block_metadata.go | 3 +++ banyand/trace/block_reader.go | 2 +- banyand/trace/merger.go | 6 +----- banyand/trace/svc_standalone.go | 2 ++ banyand/trace/write_standalone.go | 2 +- 15 files changed, 66 insertions(+), 61 deletions(-) diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go index d02708a2..c130e04a 100644 --- a/banyand/internal/sidx/interfaces.go +++ b/banyand/internal/sidx/interfaces.go @@ -37,7 +37,7 @@ import ( type SIDX interface { // Write performs batch write operations. All writes must be submitted as batches. // Elements within each batch should be pre-sorted by the caller for optimal performance. - Write(ctx context.Context, reqs []WriteRequest, partID uint64) error + Write(ctx context.Context, reqs []WriteRequest) error // Query executes a query with key range and tag filtering. // Returns a QueryResponse directly with all results loaded. @@ -76,7 +76,7 @@ type Merger interface { // and coordinate with the introducer loop for snapshot updates. // This operation is user-controlled and synchronous. // Returns error if merge operation fails. - Merge(partIDs []uint64, newPartID uint64, closeCh <-chan struct{}) error + Merge(closeCh <-chan struct{}) error } // Writer handles write path operations for batch processing. @@ -452,9 +452,6 @@ func (wr WriteRequest) Validate() error { if tag.Name == "" { return fmt.Errorf("tag[%d] name cannot be empty", i) } - if len(tag.Value) == 0 { - return fmt.Errorf("tag[%d] value cannot be empty", i) - } } return nil } diff --git a/banyand/internal/sidx/introducer.go b/banyand/internal/sidx/introducer.go index c75700ae..b3801a65 100644 --- a/banyand/internal/sidx/introducer.go +++ b/banyand/internal/sidx/introducer.go @@ -49,7 +49,7 @@ func releaseIntroduction(i *introduction) { } type flusherIntroduction struct { - flushed map[uint64]*part + flushed map[uint64]*partWrapper applied chan struct{} } @@ -66,7 +66,7 @@ func generateFlusherIntroduction() *flusherIntroduction { v := flusherIntroductionPool.Get() if v == nil { return &flusherIntroduction{ - flushed: make(map[uint64]*part), + flushed: make(map[uint64]*partWrapper), } } fi := v diff --git a/banyand/internal/sidx/introducer_test.go b/banyand/internal/sidx/introducer_test.go index 14b92d12..f601e995 100644 --- a/banyand/internal/sidx/introducer_test.go +++ b/banyand/internal/sidx/introducer_test.go @@ -64,7 +64,7 @@ func TestIntroductionPooling(t *testing.T) { var intros []*flusherIntroduction for i := 0; i < 10; i++ { intro := generateFlusherIntroduction() - intro.flushed[uint64(i)] = &part{} + intro.flushed[uint64(i)] = &partWrapper{} intro.applied = make(chan struct{}) intros = append(intros, intro) } @@ -121,8 +121,8 @@ func TestIntroductionReset(t *testing.T) { intro := generateFlusherIntroduction() // Set up flusher introduction with data - intro.flushed[1] = &part{} - intro.flushed[2] = &part{} + intro.flushed[1] = &partWrapper{} + intro.flushed[2] = &partWrapper{} intro.applied = make(chan struct{}) // Reset the flusher introduction @@ -308,7 +308,7 @@ func TestConcurrentPoolAccess(t *testing.T) { defer wg.Done() for j := 0; j < operationsPerGoroutine; j++ { intro := generateFlusherIntroduction() - intro.flushed[uint64(j)] = &part{} + intro.flushed[uint64(j)] = &partWrapper{} intro.applied = make(chan struct{}) releaseFlusherIntroduction(intro) } @@ -347,8 +347,8 @@ func TestIntroductionMapOperations(t *testing.T) { intro := generateFlusherIntroduction() // Add parts to flushed map - part1 := &part{} - part2 := &part{} + part1 := &partWrapper{} + part2 := &partWrapper{} intro.flushed[1] = part1 intro.flushed[2] = part2 diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go index d1d3c608..a8d2733a 100644 --- a/banyand/internal/sidx/merge.go +++ b/banyand/internal/sidx/merge.go @@ -19,7 +19,7 @@ package sidx import ( "fmt" - "slices" + "sync/atomic" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" @@ -31,7 +31,7 @@ var ( ) // Merge implements Merger interface. -func (s *sidx) Merge(partIDs []uint64, newPartID uint64, closeCh <-chan struct{}) error { +func (s *sidx) Merge(closeCh <-chan struct{}) error { // Get current snapshot snap := s.currentSnapshot() if snap == nil { @@ -44,10 +44,10 @@ func (s *sidx) Merge(partIDs []uint64, newPartID uint64, closeCh <-chan struct{} defer releaseMergerIntroduction(mergeIntro) mergeIntro.applied = make(chan struct{}) - // Select parts to merge + // Select parts to merge (all active non-memory parts) var partsToMerge []*partWrapper for _, pw := range snap.parts { - if pw.isActive() && !pw.isMemPart() && slices.Contains(partIDs, pw.ID()) { + if pw.isActive() && !pw.isMemPart() { partsToMerge = append(partsToMerge, pw) } } @@ -61,6 +61,9 @@ func (s *sidx) Merge(partIDs []uint64, newPartID uint64, closeCh <-chan struct{} mergeIntro.merged[pw.ID()] = struct{}{} } + // Generate new part ID using atomic increment + newPartID := atomic.AddUint64(&s.curPartID, 1) + // Create new merged part newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge, newPartID, s.root) if err != nil { diff --git a/banyand/internal/sidx/multi_sidx_query_test.go b/banyand/internal/sidx/multi_sidx_query_test.go index 2ff19b9c..e33ad7af 100644 --- a/banyand/internal/sidx/multi_sidx_query_test.go +++ b/banyand/internal/sidx/multi_sidx_query_test.go @@ -37,7 +37,7 @@ type mockSIDX struct { delay bool } -func (m *mockSIDX) Write(_ context.Context, _ []WriteRequest, _ uint64) error { +func (m *mockSIDX) Write(_ context.Context, _ []WriteRequest) error { return nil // Not implemented for tests } @@ -60,7 +60,7 @@ func (m *mockSIDX) Flush() error { return nil } -func (m *mockSIDX) Merge(_ []uint64, _ uint64, _ <-chan struct{}) error { +func (m *mockSIDX) Merge(_ <-chan struct{}) error { return nil } diff --git a/banyand/internal/sidx/part_wrapper.go b/banyand/internal/sidx/part_wrapper.go index 7a134a6d..75c2f503 100644 --- a/banyand/internal/sidx/part_wrapper.go +++ b/banyand/internal/sidx/part_wrapper.go @@ -217,7 +217,7 @@ func (pw *partWrapper) isRemoved() bool { // isMemPart returns true if this wrapper contains a memory part. func (pw *partWrapper) isMemPart() bool { // A memory part typically has no file system path or is stored in memory - return pw.mp != nil || (pw.p != nil && pw.p.path == "") + return pw.mp != nil } // String returns a string representation of the partWrapper. diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go index 7507e531..4515d983 100644 --- a/banyand/internal/sidx/sidx.go +++ b/banyand/internal/sidx/sidx.go @@ -60,6 +60,7 @@ type sidx struct { totalIntroduceLoopFinished atomic.Int64 totalQueries atomic.Int64 totalWrites atomic.Int64 + curPartID uint64 mu sync.RWMutex } @@ -98,7 +99,7 @@ func NewSIDX(fileSystem fs.FileSystem, opts *Options) (SIDX, error) { } // Write implements SIDX interface. -func (s *sidx) Write(ctx context.Context, reqs []WriteRequest, partID uint64) error { +func (s *sidx) Write(ctx context.Context, reqs []WriteRequest) error { // Validate requests for _, req := range reqs { if err := req.Validate(); err != nil { @@ -121,6 +122,9 @@ func (s *sidx) Write(ctx context.Context, reqs []WriteRequest, partID uint64) er mp := generateMemPart() mp.mustInitFromElements(es) + // Generate part ID using atomic increment + partID := atomic.AddUint64(&s.curPartID, 1) + // Create introduction intro := generateIntroduction() intro.memPart = mp @@ -418,7 +422,7 @@ func (s *sidx) Flush() error { partPath := partPath(s.root, pw.ID()) pw.mp.mustFlush(s.fileSystem, partPath) p := mustOpenPart(partPath, s.fileSystem) - flushIntro.flushed[p.partMetadata.ID] = p + flushIntro.flushed[p.partMetadata.ID] = pw } if len(flushIntro.flushed) == 0 { @@ -826,6 +830,9 @@ func (s *sidx) loadSnapshot(epoch uint64, loadedParts []uint64) { part.partMetadata.ID = id pw := newPartWrapper(nil, part) snp.addPart(pw) + if s.curPartID < id { + s.curPartID = id + } } s.gc.registerSnapshot(snp) s.gc.clean() diff --git a/banyand/internal/sidx/sidx_test.go b/banyand/internal/sidx/sidx_test.go index 3dfd1f00..47d82f6e 100644 --- a/banyand/internal/sidx/sidx_test.go +++ b/banyand/internal/sidx/sidx_test.go @@ -100,7 +100,7 @@ func TestSIDX_Write_SingleRequest(t *testing.T) { createTestWriteRequest(1, 100, "data1", createTestTag("tag1", "value1")), } - err := sidx.Write(ctx, reqs, partIDForTesting) + err := sidx.Write(ctx, reqs) assert.NoError(t, err) // Verify stats @@ -124,7 +124,7 @@ func TestSIDX_Write_BatchRequest(t *testing.T) { createTestWriteRequest(2, 200, "data3", createTestTag("tag2", "value3")), } - err := sidx.Write(ctx, reqs, partIDForTesting) + err := sidx.Write(ctx, reqs) assert.NoError(t, err) // Verify stats @@ -170,7 +170,7 @@ func TestSIDX_Write_Validation(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := sidx.Write(ctx, []WriteRequest{tt.req}, partIDForTesting) + err := sidx.Write(ctx, []WriteRequest{tt.req}) if tt.expectErr { assert.Error(t, err) } else { @@ -199,7 +199,7 @@ func TestSIDX_Write_WithTags(t *testing.T) { createTestWriteRequest(1, 100, "trace-data", tags...), } - err := sidx.Write(ctx, reqs, partIDForTesting) + err := sidx.Write(ctx, reqs) assert.NoError(t, err) } @@ -218,7 +218,7 @@ func TestSIDX_Query_BasicQuery(t *testing.T) { createTestWriteRequest(1, 100, "data1"), createTestWriteRequest(1, 101, "data2"), } - err := sidx.Write(ctx, reqs, partIDForTesting) + err := sidx.Write(ctx, reqs) require.NoError(t, err) // Wait for introducer loop to process @@ -268,7 +268,7 @@ func TestSIDX_Query_KeyRangeFilter(t *testing.T) { createTestWriteRequest(1, 150, "data150"), createTestWriteRequest(1, 200, "data200"), } - err := sidx.Write(ctx, reqs, partIDForTesting) + err := sidx.Write(ctx, reqs) require.NoError(t, err) // Wait for introducer loop to process @@ -322,7 +322,7 @@ func TestSIDX_Query_Ordering(t *testing.T) { createTestWriteRequest(3, 75, "series3-data75"), createTestWriteRequest(3, 175, "series3-data175"), } - err := sidx.Write(ctx, reqs, partIDForTesting) + err := sidx.Write(ctx, reqs) require.NoError(t, err) // Wait for introducer loop to process @@ -479,7 +479,7 @@ func TestSIDX_WriteQueryIntegration(t *testing.T) { createTestWriteRequest(2, 180, "series2-data2", createTestTag("env", "dev")), } - err := sidx.Write(ctx, reqs, partIDForTesting) + err := sidx.Write(ctx, reqs) require.NoError(t, err) // Test 1: Query single series @@ -540,7 +540,7 @@ func TestSIDX_DataConsistency(t *testing.T) { reqs = append(reqs, createTestWriteRequest(1, key, data)) } - err := sidx.Write(ctx, reqs, partIDForTesting) + err := sidx.Write(ctx, reqs) require.NoError(t, err) // Query back and verify data integrity @@ -587,7 +587,7 @@ func TestSIDX_LargeDataset(t *testing.T) { )) } - err := sidx.Write(ctx, reqs, partIDForTesting) + err := sidx.Write(ctx, reqs) require.NoError(t, err) // Query back and verify we can handle large result sets @@ -634,7 +634,7 @@ func TestSIDX_ConcurrentWrites(t *testing.T) { reqs = append(reqs, createTestWriteRequest(seriesID, key, data)) } - if err := sidx.Write(ctx, reqs, partIDForTesting); err != nil { + if err := sidx.Write(ctx, reqs); err != nil { errors <- err } }(g) @@ -666,7 +666,7 @@ func TestSIDX_ConcurrentReadsWrites(t *testing.T) { initialReqs := []WriteRequest{ createTestWriteRequest(1, 100, "initial-data"), } - err := sidx.Write(ctx, initialReqs, partIDForTesting) + err := sidx.Write(ctx, initialReqs) require.NoError(t, err) var wg sync.WaitGroup @@ -708,7 +708,7 @@ func TestSIDX_ConcurrentReadsWrites(t *testing.T) { int64(writeCount), fmt.Sprintf("writer-%d-data-%d", writerID, writeCount), ) - sidx.Write(ctx, []WriteRequest{req}, partIDForTesting) // Ignore errors during concurrent stress + sidx.Write(ctx, []WriteRequest{req}) // Ignore errors during concurrent stress writeCount++ } }(i) diff --git a/banyand/internal/sidx/snapshot.go b/banyand/internal/sidx/snapshot.go index 2edfd3b9..e06fcc41 100644 --- a/banyand/internal/sidx/snapshot.go +++ b/banyand/internal/sidx/snapshot.go @@ -370,22 +370,19 @@ func (s *snapshot) copyAllTo(epoch uint64) *snapshot { } // merge creates a new snapshot by merging flushed parts into the current snapshot. -func (s *snapshot) merge(epoch uint64, flushed map[uint64]*part) *snapshot { - result := s.copyAllTo(epoch) - - // Add flushed parts to the snapshot - for partID, part := range flushed { - // Set the part ID from the map key - if part != nil && part.partMetadata != nil { - part.partMetadata.ID = partID +func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper) *snapshot { + var result snapshot + result.epoch = nextEpoch + result.ref = 1 + for i := 0; i < len(s.parts); i++ { + if n, ok := nextParts[s.parts[i].ID()]; ok { + result.parts = append(result.parts, n) + continue } - // Create part wrapper for the flushed part - pw := newPartWrapper(nil, part) - result.parts = append(result.parts, pw) + s.parts[i].acquire() + result.parts = append(result.parts, s.parts[i]) } - - result.sortPartsByEpoch() - return result + return &result } // remove creates a new snapshot by removing specified parts. diff --git a/banyand/internal/sidx/snapshot_test.go b/banyand/internal/sidx/snapshot_test.go index dc94d291..9029edaf 100644 --- a/banyand/internal/sidx/snapshot_test.go +++ b/banyand/internal/sidx/snapshot_test.go @@ -430,7 +430,7 @@ func TestSnapshotReplacement_Basic(t *testing.T) { Tags: []Tag{{Name: "test", Value: []byte("snapshot-replacement")}}, } - if err := sidx.Write(ctx, []WriteRequest{req}, partIDForTesting); err != nil { + if err := sidx.Write(ctx, []WriteRequest{req}); err != nil { t.Errorf("write %d failed: %v", i, err) } @@ -521,7 +521,7 @@ func TestSnapshotReplacement_ConcurrentReadsConsistentData(t *testing.T) { }, } - if err := sidx.Write(ctx, reqs, partIDForTesting); err != nil { + if err := sidx.Write(ctx, reqs); err != nil { t.Errorf("write %d failed: %v", i, err) } @@ -610,7 +610,7 @@ func TestSnapshotReplacement_NoDataRacesDuringReplacement(t *testing.T) { }, }, } - sidx.Write(ctx, reqs, partIDForTesting) + sidx.Write(ctx, reqs) case 1: // Stats operation - accesses current snapshot sidx.Stats(ctx) @@ -670,7 +670,7 @@ func TestSnapshotReplacement_MemoryLeaksPrevention(t *testing.T) { }, } - if writeErr := sidx.Write(ctx, reqs, partIDForTesting); writeErr != nil { + if writeErr := sidx.Write(ctx, reqs); writeErr != nil { t.Errorf("batch %d write %d failed: %v", i, j, writeErr) } } @@ -731,7 +731,7 @@ func TestSnapshotReplacement_MemoryLeaksPrevention(t *testing.T) { }, } - if writeErr := sidx.Write(ctx, reqs, partIDForTesting); writeErr != nil { + if writeErr := sidx.Write(ctx, reqs); writeErr != nil { t.Errorf("concurrent writer %d write %d failed: %v", writerID, j, writeErr) } diff --git a/banyand/trace/block_metadata.go b/banyand/trace/block_metadata.go index 3fbd0f22..eb0275b1 100644 --- a/banyand/trace/block_metadata.go +++ b/banyand/trace/block_metadata.go @@ -164,6 +164,9 @@ func (bm *blockMetadata) unmarshal(src []byte, tagType map[string]pbv1.ValueType return nil, fmt.Errorf("cannot unmarshal blockMetadata from less than %d bytes", traceIDLen) } bm.traceID = strings.TrimRight(string(src[:traceIDLen]), "\x00") + if len(tagType) == 0 { + fmt.Println("tagType is empty") + } bm.tagType = tagType src = src[traceIDLen:] src, n := encoding.BytesToVarUint64(src) diff --git a/banyand/trace/block_reader.go b/banyand/trace/block_reader.go index def4fe09..ad1ef4c1 100644 --- a/banyand/trace/block_reader.go +++ b/banyand/trace/block_reader.go @@ -188,9 +188,9 @@ func (br *blockReader) nextMetadata() error { for key, tv := range br.block.bm.tagType { tagType[key] = tv } + head.tagType = tagType if head.nextBlockMetadata() { heap.Fix(&br.pih, 0) - br.pih[0].block.bm.tagType = tagType br.block = &br.pih[0].block return nil } diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go index 095268cd..d6ba9411 100644 --- a/banyand/trace/merger.go +++ b/banyand/trace/merger.go @@ -112,12 +112,8 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator snapshotCreator, part if err != nil { return nil, err } - partIDs := make([]uint64, 0, len(parts)) - for _, pw := range parts { - partIDs = append(partIDs, pw.ID()) - } for sidxName, sidxInstance := range tst.sidxMap { - if err := sidxInstance.Merge(partIDs, newPartID, closeCh); err != nil { + if err := sidxInstance.Merge(closeCh); err != nil { tst.l.Warn().Err(err).Str("sidx", sidxName).Msg("sidx merge failed") return nil, err } diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go index 05e4e858..950c3b3d 100644 --- a/banyand/trace/svc_standalone.go +++ b/banyand/trace/svc_standalone.go @@ -73,6 +73,8 @@ func (s *standalone) FlagSet() *run.FlagSet { fs.DurationVar(&s.option.flushTimeout, "trace-flush-timeout", defaultFlushTimeout, "the timeout for trace data flush") fs.IntVar(&s.maxDiskUsagePercent, "trace-max-disk-usage-percent", 95, "the maximum disk usage percentage") fs.IntVar(&s.maxFileSnapshotNum, "trace-max-file-snapshot-num", 2, "the maximum number of file snapshots") + s.option.mergePolicy = newDefaultMergePolicy() + fs.VarP(&s.option.mergePolicy.maxFanOutSize, "trace-max-fan-out-size", "", "the upper bound of a single file size after merge of trace") // Additional flags can be added here return fs } diff --git a/banyand/trace/write_standalone.go b/banyand/trace/write_standalone.go index b2abd73b..c381eb72 100644 --- a/banyand/trace/write_standalone.go +++ b/banyand/trace/write_standalone.go @@ -362,7 +362,7 @@ func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus. w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx instance") continue } - if err := sidxInstance.Write(ctx, sidxReqs, es.tsTable.curPartID); err != nil { + if err := sidxInstance.Write(ctx, sidxReqs); err != nil { w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary index") } }