This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 838755fa Fix missing sidx data when take snapshot of trace data (#850)
838755fa is described below
commit 838755fa18de433661b4237f426146ce9041c759
Author: mrproliu <[email protected]>
AuthorDate: Thu Nov 20 11:33:13 2025 +0900
Fix missing sidx data when take snapshot of trace data (#850)
* Fix missing sidx data when take snapshot of trace data
---
banyand/internal/sidx/interfaces.go | 2 +
banyand/internal/sidx/introducer.go | 28 +++++++
banyand/internal/sidx/sidx_test.go | 131 +++++++++++++++++++++++++++++++
banyand/trace/snapshot.go | 7 ++
banyand/trace/streaming_pipeline_test.go | 2 +
5 files changed, 170 insertions(+)
diff --git a/banyand/internal/sidx/interfaces.go
b/banyand/internal/sidx/interfaces.go
index cbe1e41b..c819b1e3 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -63,6 +63,8 @@ type SIDX interface {
PartPaths(partIDs map[uint64]struct{}) map[uint64]string
// IntroduceSynced introduces a synced map to the SIDX instance.
IntroduceSynced(partIDsToSync map[uint64]struct{}) func()
+ // TakeFileSnapshot creates a snapshot of the SIDX files at the
specified destination path.
+ TakeFileSnapshot(dst string) error
}
// WriteRequest contains data for a single write operation within a batch.
diff --git a/banyand/internal/sidx/introducer.go
b/banyand/internal/sidx/introducer.go
index 535d361a..9bdefde8 100644
--- a/banyand/internal/sidx/introducer.go
+++ b/banyand/internal/sidx/introducer.go
@@ -18,6 +18,9 @@
package sidx
import (
+ "fmt"
+ "path/filepath"
+
"github.com/apache/skywalking-banyandb/pkg/pool"
)
@@ -146,6 +149,31 @@ func (s *sidx) IntroduceSynced(partIDsToSync
map[uint64]struct{}) func() {
return cur.decRef
}
+func (s *sidx) TakeFileSnapshot(dst string) error {
+ currentSnapshot := s.currentSnapshot()
+ if currentSnapshot == nil {
+ return nil
+ }
+ defer currentSnapshot.decRef()
+ for _, pw := range currentSnapshot.parts {
+ if pw.mp != nil {
+ continue
+ }
+
+ part := pw.p
+ srcPath := part.path
+ destPath := filepath.Join(dst, filepath.Base(srcPath))
+
+ if err := s.fileSystem.CreateHardLink(srcPath, destPath, nil);
err != nil {
+ return fmt.Errorf("failed to take file snapshot %s:
%w", srcPath, err)
+ }
+ }
+
+ parent := filepath.Dir(dst)
+ s.fileSystem.SyncPath(parent)
+ return nil
+}
+
func (s *sidx) replaceSnapshot(next *snapshot) {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/banyand/internal/sidx/sidx_test.go
b/banyand/internal/sidx/sidx_test.go
index f727faa7..bd7f7bba 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -18,8 +18,14 @@
package sidx
import (
+ "bytes"
"context"
+ "crypto/sha256"
"fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
"sync"
"testing"
"time"
@@ -32,8 +38,11 @@ import (
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
const (
@@ -47,6 +56,88 @@ func waitForIntroducerLoop() {
time.Sleep(10 * time.Millisecond)
}
+// compareDirectories compares two directories recursively and returns whether
they are identical.
+func compareDirectories(t *testing.T, originalDir, snapshotDir string) {
+ t.Helper()
+
+ // Get all files in both directories
+ originalFiles, err := collectFiles(originalDir)
+ require.NoError(t, err, "Failed to collect files from original
directory")
+
+ snapshotFiles, err := collectFiles(snapshotDir)
+ require.NoError(t, err, "Failed to collect files from snapshot
directory")
+
+ // Compare number of files
+ require.Equal(t, len(originalFiles), len(snapshotFiles),
+ "Number of files should be equal. Original: %d, Snapshot: %d",
+ len(originalFiles), len(snapshotFiles))
+
+ // Sort files for consistent comparison
+ sort.Strings(originalFiles)
+ sort.Strings(snapshotFiles)
+
+ // Compare each file
+ for idx, originalFile := range originalFiles {
+ snapshotFile := snapshotFiles[idx]
+
+ // Compare file sizes
+ originalInfo, err := os.Stat(originalFile)
+ require.NoError(t, err)
+
+ snapshotInfo, err := os.Stat(snapshotFile)
+ require.NoError(t, err)
+
+ require.Equal(t, originalInfo.Size(), snapshotInfo.Size(),
+ "File size mismatch at index %d", idx)
+
+ // Compare file contents using hash
+ originalHash, err := calculateFileHash(originalFile)
+ require.NoError(t, err)
+
+ snapshotHash, err := calculateFileHash(snapshotFile)
+ require.NoError(t, err)
+
+ require.True(t, bytes.Equal(originalHash, snapshotHash),
+ "File content mismatch at index %d", idx)
+ }
+}
+
+// collectFiles recursively collects all regular files in a directory.
+func collectFiles(rootDir string) ([]string, error) {
+ var files []string
+
+ err := filepath.Walk(rootDir, func(path string, info os.FileInfo,
walkErr error) error {
+ if walkErr != nil {
+ return walkErr
+ }
+
+ // Skip directories, only collect regular files
+ if !info.IsDir() {
+ files = append(files, path)
+ }
+
+ return nil
+ })
+
+ return files, err
+}
+
+// calculateFileHash calculates SHA-256 hash of a file.
+func calculateFileHash(filePath string) ([]byte, error) {
+ file, err := os.Open(filePath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open file %s: %w", filePath,
err)
+ }
+ defer file.Close()
+
+ hash := sha256.New()
+ if _, err := io.Copy(hash, file); err != nil {
+ return nil, fmt.Errorf("failed to calculate hash for file %s:
%w", filePath, err)
+ }
+
+ return hash.Sum(nil), nil
+}
+
func createTestOptions(t *testing.T) *Options {
opts := NewDefaultOptions()
opts.Memory = protector.NewMemory(observability.NewBypassRegistry())
@@ -149,6 +240,46 @@ func TestSIDX_Write_BatchRequest(t *testing.T) {
assert.Equal(t, int64(1), stats.WriteCount.Load()) // One batch write
}
+func TestSIDX_Take_File_Snapshot(t *testing.T) {
+ logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })
+
+ t.Run("Take snapshot of existing sidx", func(t *testing.T) {
+ dir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+
+ idx := createTestSIDX(t)
+ defer func() {
+ assert.NoError(t, idx.Close())
+ }()
+
+ // Test batch write requests
+ reqs := []WriteRequest{
+ createTestWriteRequest(1, 100, "data1",
createTestTag("tag1", "value1")),
+ createTestWriteRequest(1, 101, "data2",
createTestTag("tag1", "value2")),
+ createTestWriteRequest(2, 200, "data3",
createTestTag("tag2", "value3")),
+ }
+
+ writeTestData(t, idx, reqs, 2, 2) // Test with segmentID=2,
partID=2
+
+ raw := idx.(*sidx)
+ originalDir := raw.root
+ flushIntro, err := raw.Flush(map[uint64]struct{}{2: {}})
+ require.NoError(t, err)
+ require.NotNil(t, flushIntro)
+ raw.IntroduceFlushed(flushIntro)
+ flushIntro.Release()
+
+ err = idx.TakeFileSnapshot(snapshotDir)
+ assert.NoError(t, err)
+ compareDirectories(t, originalDir, snapshotDir)
+ })
+}
+
func TestSIDX_Write_Validation(t *testing.T) {
sidx := createTestSIDX(t)
defer func() {
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 915ab951..03eed89f 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -204,6 +204,13 @@ func parseSnapshot(name string) (uint64, error) {
}
func (tst *tsTable) TakeFileSnapshot(dst string) error {
+ for k, v := range tst.sidxMap {
+ indexDir := filepath.Join(dst, k)
+ tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
+ if err := v.TakeFileSnapshot(indexDir); err != nil {
+ return fmt.Errorf("failed to take file snapshot for
index, %s: %w", k, err)
+ }
+ }
snapshot := tst.currentSnapshot()
if snapshot == nil {
return fmt.Errorf("no current snapshot available")
diff --git a/banyand/trace/streaming_pipeline_test.go
b/banyand/trace/streaming_pipeline_test.go
index a36ae9e5..9e313f70 100644
--- a/banyand/trace/streaming_pipeline_test.go
+++ b/banyand/trace/streaming_pipeline_test.go
@@ -81,6 +81,7 @@ func (f *fakeSIDX) StreamingParts(map[uint64]struct{},
string, uint32, string) (
}
func (f *fakeSIDX) PartPaths(map[uint64]struct{}) map[uint64]string { return
map[uint64]string{} }
func (f *fakeSIDX) IntroduceSynced(map[uint64]struct{}) func() { return
func() {} }
+func (f *fakeSIDX) TakeFileSnapshot(_ string) error { return
nil }
type fakeSIDXWithErr struct {
*fakeSIDX
@@ -653,6 +654,7 @@ func (f *fakeSIDXInfinite) PartPaths(map[uint64]struct{})
map[uint64]string {
return map[uint64]string{}
}
func (f *fakeSIDXInfinite) IntroduceSynced(map[uint64]struct{}) func() {
return func() {} }
+func (f *fakeSIDXInfinite) TakeFileSnapshot(_ string) error {
return nil }
// TestStreamSIDXTraceBatches_InfiniteChannelContinuesUntilCanceled verifies
that
// the streaming pipeline continues streaming from an infinite channel until
context is canceled.