This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 838755fa Fix missing sidx data when take snapshot of trace data (#850)
838755fa is described below

commit 838755fa18de433661b4237f426146ce9041c759
Author: mrproliu <[email protected]>
AuthorDate: Thu Nov 20 11:33:13 2025 +0900

    Fix missing sidx data when take snapshot of trace data (#850)
    
    * Fix missing sidx data when take snapshot of trace data
---
 banyand/internal/sidx/interfaces.go      |   2 +
 banyand/internal/sidx/introducer.go      |  28 +++++++
 banyand/internal/sidx/sidx_test.go       | 131 +++++++++++++++++++++++++++++++
 banyand/trace/snapshot.go                |   7 ++
 banyand/trace/streaming_pipeline_test.go |   2 +
 5 files changed, 170 insertions(+)

diff --git a/banyand/internal/sidx/interfaces.go 
b/banyand/internal/sidx/interfaces.go
index cbe1e41b..c819b1e3 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -63,6 +63,8 @@ type SIDX interface {
        PartPaths(partIDs map[uint64]struct{}) map[uint64]string
        // IntroduceSynced introduces a synced map to the SIDX instance.
        IntroduceSynced(partIDsToSync map[uint64]struct{}) func()
+       // TakeFileSnapshot creates a snapshot of the SIDX files at the 
specified destination path.
+       TakeFileSnapshot(dst string) error
 }
 
 // WriteRequest contains data for a single write operation within a batch.
diff --git a/banyand/internal/sidx/introducer.go 
b/banyand/internal/sidx/introducer.go
index 535d361a..9bdefde8 100644
--- a/banyand/internal/sidx/introducer.go
+++ b/banyand/internal/sidx/introducer.go
@@ -18,6 +18,9 @@
 package sidx
 
 import (
+       "fmt"
+       "path/filepath"
+
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
@@ -146,6 +149,31 @@ func (s *sidx) IntroduceSynced(partIDsToSync 
map[uint64]struct{}) func() {
        return cur.decRef
 }
 
+func (s *sidx) TakeFileSnapshot(dst string) error {
+       currentSnapshot := s.currentSnapshot()
+       if currentSnapshot == nil {
+               return nil
+       }
+       defer currentSnapshot.decRef()
+       for _, pw := range currentSnapshot.parts {
+               if pw.mp != nil {
+                       continue
+               }
+
+               part := pw.p
+               srcPath := part.path
+               destPath := filepath.Join(dst, filepath.Base(srcPath))
+
+               if err := s.fileSystem.CreateHardLink(srcPath, destPath, nil); 
err != nil {
+                       return fmt.Errorf("failed to take file snapshot %s: 
%w", srcPath, err)
+               }
+       }
+
+       parent := filepath.Dir(dst)
+       s.fileSystem.SyncPath(parent)
+       return nil
+}
+
 func (s *sidx) replaceSnapshot(next *snapshot) {
        s.mu.Lock()
        defer s.mu.Unlock()
diff --git a/banyand/internal/sidx/sidx_test.go 
b/banyand/internal/sidx/sidx_test.go
index f727faa7..bd7f7bba 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -18,8 +18,14 @@
 package sidx
 
 import (
+       "bytes"
        "context"
+       "crypto/sha256"
        "fmt"
+       "io"
+       "os"
+       "path/filepath"
+       "sort"
        "sync"
        "testing"
        "time"
@@ -32,8 +38,11 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 const (
@@ -47,6 +56,88 @@ func waitForIntroducerLoop() {
        time.Sleep(10 * time.Millisecond)
 }
 
+// compareDirectories compares two directories recursively and returns whether 
they are identical.
+func compareDirectories(t *testing.T, originalDir, snapshotDir string) {
+       t.Helper()
+
+       // Get all files in both directories
+       originalFiles, err := collectFiles(originalDir)
+       require.NoError(t, err, "Failed to collect files from original 
directory")
+
+       snapshotFiles, err := collectFiles(snapshotDir)
+       require.NoError(t, err, "Failed to collect files from snapshot 
directory")
+
+       // Compare number of files
+       require.Equal(t, len(originalFiles), len(snapshotFiles),
+               "Number of files should be equal. Original: %d, Snapshot: %d",
+               len(originalFiles), len(snapshotFiles))
+
+       // Sort files for consistent comparison
+       sort.Strings(originalFiles)
+       sort.Strings(snapshotFiles)
+
+       // Compare each file
+       for idx, originalFile := range originalFiles {
+               snapshotFile := snapshotFiles[idx]
+
+               // Compare file sizes
+               originalInfo, err := os.Stat(originalFile)
+               require.NoError(t, err)
+
+               snapshotInfo, err := os.Stat(snapshotFile)
+               require.NoError(t, err)
+
+               require.Equal(t, originalInfo.Size(), snapshotInfo.Size(),
+                       "File size mismatch at index %d", idx)
+
+               // Compare file contents using hash
+               originalHash, err := calculateFileHash(originalFile)
+               require.NoError(t, err)
+
+               snapshotHash, err := calculateFileHash(snapshotFile)
+               require.NoError(t, err)
+
+               require.True(t, bytes.Equal(originalHash, snapshotHash),
+                       "File content mismatch at index %d", idx)
+       }
+}
+
+// collectFiles recursively collects all regular files in a directory.
+func collectFiles(rootDir string) ([]string, error) {
+       var files []string
+
+       err := filepath.Walk(rootDir, func(path string, info os.FileInfo, 
walkErr error) error {
+               if walkErr != nil {
+                       return walkErr
+               }
+
+               // Skip directories, only collect regular files
+               if !info.IsDir() {
+                       files = append(files, path)
+               }
+
+               return nil
+       })
+
+       return files, err
+}
+
+// calculateFileHash calculates SHA-256 hash of a file.
+func calculateFileHash(filePath string) ([]byte, error) {
+       file, err := os.Open(filePath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to open file %s: %w", filePath, 
err)
+       }
+       defer file.Close()
+
+       hash := sha256.New()
+       if _, err := io.Copy(hash, file); err != nil {
+               return nil, fmt.Errorf("failed to calculate hash for file %s: 
%w", filePath, err)
+       }
+
+       return hash.Sum(nil), nil
+}
+
 func createTestOptions(t *testing.T) *Options {
        opts := NewDefaultOptions()
        opts.Memory = protector.NewMemory(observability.NewBypassRegistry())
@@ -149,6 +240,46 @@ func TestSIDX_Write_BatchRequest(t *testing.T) {
        assert.Equal(t, int64(1), stats.WriteCount.Load()) // One batch write
 }
 
+func TestSIDX_Take_File_Snapshot(t *testing.T) {
+       logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })
+
+       t.Run("Take snapshot of existing sidx", func(t *testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               snapshotDir := filepath.Join(dir, "snapshot")
+
+               idx := createTestSIDX(t)
+               defer func() {
+                       assert.NoError(t, idx.Close())
+               }()
+
+               // Test batch write requests
+               reqs := []WriteRequest{
+                       createTestWriteRequest(1, 100, "data1", 
createTestTag("tag1", "value1")),
+                       createTestWriteRequest(1, 101, "data2", 
createTestTag("tag1", "value2")),
+                       createTestWriteRequest(2, 200, "data3", 
createTestTag("tag2", "value3")),
+               }
+
+               writeTestData(t, idx, reqs, 2, 2) // Test with segmentID=2, 
partID=2
+
+               raw := idx.(*sidx)
+               originalDir := raw.root
+               flushIntro, err := raw.Flush(map[uint64]struct{}{2: {}})
+               require.NoError(t, err)
+               require.NotNil(t, flushIntro)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               err = idx.TakeFileSnapshot(snapshotDir)
+               assert.NoError(t, err)
+               compareDirectories(t, originalDir, snapshotDir)
+       })
+}
+
 func TestSIDX_Write_Validation(t *testing.T) {
        sidx := createTestSIDX(t)
        defer func() {
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 915ab951..03eed89f 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -204,6 +204,13 @@ func parseSnapshot(name string) (uint64, error) {
 }
 
 func (tst *tsTable) TakeFileSnapshot(dst string) error {
+       for k, v := range tst.sidxMap {
+               indexDir := filepath.Join(dst, k)
+               tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
+               if err := v.TakeFileSnapshot(indexDir); err != nil {
+                       return fmt.Errorf("failed to take file snapshot for 
index, %s: %w", k, err)
+               }
+       }
        snapshot := tst.currentSnapshot()
        if snapshot == nil {
                return fmt.Errorf("no current snapshot available")
diff --git a/banyand/trace/streaming_pipeline_test.go 
b/banyand/trace/streaming_pipeline_test.go
index a36ae9e5..9e313f70 100644
--- a/banyand/trace/streaming_pipeline_test.go
+++ b/banyand/trace/streaming_pipeline_test.go
@@ -81,6 +81,7 @@ func (f *fakeSIDX) StreamingParts(map[uint64]struct{}, 
string, uint32, string) (
 }
 func (f *fakeSIDX) PartPaths(map[uint64]struct{}) map[uint64]string { return 
map[uint64]string{} }
 func (f *fakeSIDX) IntroduceSynced(map[uint64]struct{}) func()      { return 
func() {} }
+func (f *fakeSIDX) TakeFileSnapshot(_ string) error                 { return 
nil }
 
 type fakeSIDXWithErr struct {
        *fakeSIDX
@@ -653,6 +654,7 @@ func (f *fakeSIDXInfinite) PartPaths(map[uint64]struct{}) 
map[uint64]string {
        return map[uint64]string{}
 }
 func (f *fakeSIDXInfinite) IntroduceSynced(map[uint64]struct{}) func() { 
return func() {} }
+func (f *fakeSIDXInfinite) TakeFileSnapshot(_ string) error            { 
return nil }
 
 // TestStreamSIDXTraceBatches_InfiniteChannelContinuesUntilCanceled verifies 
that
 // the streaming pipeline continues streaming from an infinite channel until 
context is canceled.

Reply via email to