This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 513610c3 Fix trace syncer flush handling (#855)
513610c3 is described below
commit 513610c397ad8fa2b8fee635d06a01c5f1cc1106
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Nov 20 21:01:58 2025 +0800
Fix trace syncer flush handling (#855)
---
banyand/trace/introducer.go | 24 +++++++++++++++++++++++-
1 file changed, 23 insertions(+), 1 deletion(-)
diff --git a/banyand/trace/introducer.go b/banyand/trace/introducer.go
index 4bf4b33a..be2940c2 100644
--- a/banyand/trace/introducer.go
+++ b/banyand/trace/introducer.go
@@ -198,7 +198,7 @@ func (tst *tsTable) introducerLoopWithSync(flushCh chan
*flusherIntroduction, me
epoch++
case next := <-flushCh:
tst.incTotalIntroduceLoopStarted("flush")
- tst.introduceFlushed(next, epoch)
+ tst.introduceFlushedForSync(next, epoch)
tst.incTotalIntroduceLoopFinished("flush")
tst.gc.clean()
epoch++
@@ -261,6 +261,28 @@ func (tst *tsTable) introduceFlushed(nextIntroduction
*flusherIntroduction, epoc
}
}
+// introduceFlushedForSync introduces the flushed trace parts for syncing.
+// The SIDX parts are flushed before the trace parts so the syncer can always
find
+// the corresponding index on disk once a flushed trace part becomes visible.
+func (tst *tsTable) introduceFlushedForSync(nextIntroduction
*flusherIntroduction, epoch uint64) {
+ for name, sidxFlusherIntroduced := range
nextIntroduction.sidxFlusherIntroduced {
+ tst.mustGetSidx(name).IntroduceFlushed(sidxFlusherIntroduced)
+ }
+ cur := tst.currentSnapshot()
+ if cur == nil {
+ tst.l.Panic().Msg("current snapshot is nil")
+ }
+ defer cur.decRef()
+ nextSnp := cur.merge(epoch, nextIntroduction.flushed)
+ nextSnp.creator = snapshotCreatorFlusher
+ tst.replaceSnapshot(&nextSnp)
+ tst.persistSnapshot(&nextSnp)
+
+ if nextIntroduction.applied != nil {
+ close(nextIntroduction.applied)
+ }
+}
+
func (tst *tsTable) introduceMerged(nextIntroduction *mergerIntroduction,
epoch uint64) {
cur := tst.currentSnapshot()
if cur == nil {