This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 513610c3 Fix trace syncer flush handling (#855)
513610c3 is described below

commit 513610c397ad8fa2b8fee635d06a01c5f1cc1106
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Nov 20 21:01:58 2025 +0800

    Fix trace syncer flush handling (#855)
---
 banyand/trace/introducer.go | 24 +++++++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)

diff --git a/banyand/trace/introducer.go b/banyand/trace/introducer.go
index 4bf4b33a..be2940c2 100644
--- a/banyand/trace/introducer.go
+++ b/banyand/trace/introducer.go
@@ -198,7 +198,7 @@ func (tst *tsTable) introducerLoopWithSync(flushCh chan 
*flusherIntroduction, me
                        epoch++
                case next := <-flushCh:
                        tst.incTotalIntroduceLoopStarted("flush")
-                       tst.introduceFlushed(next, epoch)
+                       tst.introduceFlushedForSync(next, epoch)
                        tst.incTotalIntroduceLoopFinished("flush")
                        tst.gc.clean()
                        epoch++
@@ -261,6 +261,28 @@ func (tst *tsTable) introduceFlushed(nextIntroduction 
*flusherIntroduction, epoc
        }
 }
 
+// introduceFlushedForSync introduces the flushed trace parts for syncing.
+// The SIDX parts are flushed before the trace parts so the syncer can always 
find
+// the corresponding index on disk once a flushed trace part becomes visible.
+func (tst *tsTable) introduceFlushedForSync(nextIntroduction 
*flusherIntroduction, epoch uint64) {
+       for name, sidxFlusherIntroduced := range 
nextIntroduction.sidxFlusherIntroduced {
+               tst.mustGetSidx(name).IntroduceFlushed(sidxFlusherIntroduced)
+       }
+       cur := tst.currentSnapshot()
+       if cur == nil {
+               tst.l.Panic().Msg("current snapshot is nil")
+       }
+       defer cur.decRef()
+       nextSnp := cur.merge(epoch, nextIntroduction.flushed)
+       nextSnp.creator = snapshotCreatorFlusher
+       tst.replaceSnapshot(&nextSnp)
+       tst.persistSnapshot(&nextSnp)
+
+       if nextIntroduction.applied != nil {
+               close(nextIntroduction.applied)
+       }
+}
+
 func (tst *tsTable) introduceMerged(nextIntroduction *mergerIntroduction, 
epoch uint64) {
        cur := tst.currentSnapshot()
        if cur == nil {

Reply via email to