This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug-hq-sync
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 3d40ac1afeb937b28889b6bc3e8fd76cb0a3441d
Author: Hongtao Gao <[email protected]>
AuthorDate: Sun Apr 12 01:55:04 2026 +0000

    fix(handoff): prevent size limit bypass and sidx timestamp corruption in 
handoff replay
    
    - Replace canEnqueue+updateTotalSize with atomic tryReserveSize to close
      TOCTOU race that allowed concurrent enqueues to exceed the size limit.
    - Read manifest.json for sidx parts in readPartFromHandoff, populating
      MinTimestamp/MaxTimestamp with the same fallback logic as streaming sync
      (pointer field → SegmentID legacy → warn). Previously sidx parts were
      replayed with MinTimestamp=0, causing rejection or seg-19700101 creation.
    - Surface metadata read/parse errors instead of silently swallowing them.
    - Add queryTraceByService to integration test to verify sidx replay via
      indexed tag lookup, which the existing trace_id-only query couldn't catch.
    - Clean up orphaned parts when no snapshot references them in tsTable init.
---
 banyand/trace/handoff_replay_test.go | 258 -----------------------------------
 1 file changed, 258 deletions(-)

diff --git a/banyand/trace/handoff_replay_test.go 
b/banyand/trace/handoff_replay_test.go
index 87335f2a1..786373526 100644
--- a/banyand/trace/handoff_replay_test.go
+++ b/banyand/trace/handoff_replay_test.go
@@ -428,30 +428,6 @@ func createTestSidxPart(t *testing.T, fileSystem 
fs.FileSystem, root string, par
                _, err = lf.Write(content)
                require.NoError(t, err)
                require.NoError(t, lf.Close())
-       }
-
-       return partPath
-}
-
-func TestHandoffController_ReadPartFromHandoff_CoreMetadata(t *testing.T) {
-       tempDir, defFn := test.Space(require.New(t))
-       defer defFn()
-
-       fileSystem := fs.NewLocalFileSystem()
-       l := logger.GetLogger("test")
-
-       sourceRoot := filepath.Join(tempDir, "source")
-       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
-       partID := uint64(0x60)
-       partPath := filepath.Join(sourceRoot, partName(partID))
-       fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
-
-       coreFiles := map[string][]byte{
-               "primary.bin": []byte("primary data"),
-               "spans.bin":   []byte("spans data"),
-               "meta.bin":    []byte("meta data"),
-               "metadata.json": 
[]byte(`{"compressedSizeBytes":1024,"uncompressedSpanSizeBytes":2048,` +
-                       
`"totalCount":50,"blocksCount":5,"minTimestamp":1700000000,"maxTimestamp":1700001000}`),
                "tag.type":       []byte("tag type"),
                "traceID.filter": []byte("filter"),
        }
@@ -471,52 +447,6 @@ func 
TestHandoffController_ReadPartFromHandoff_CoreMetadata(t *testing.T) {
 
        require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
PartTypeCore, partPath, "group1", 1))
 
-       streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, 
partID, PartTypeCore)
-       require.NoError(t, err)
-       defer release()
-
-       assert.Equal(t, uint64(1024), streamingPart.CompressedSizeBytes)
-       assert.Equal(t, uint64(2048), streamingPart.UncompressedSizeBytes)
-       assert.Equal(t, uint64(50), streamingPart.TotalCount)
-       assert.Equal(t, uint64(5), streamingPart.BlocksCount)
-       assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp)
-       assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp)
-}
-
-func TestHandoffController_ReadPartFromHandoff_CoreMissingMetadata(t 
*testing.T) {
-       tempDir, defFn := test.Space(require.New(t))
-       defer defFn()
-
-       fileSystem := fs.NewLocalFileSystem()
-       l := logger.GetLogger("test")
-
-       sourceRoot := filepath.Join(tempDir, "source")
-       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
-       partID := uint64(0x61)
-       partPath := filepath.Join(sourceRoot, partName(partID))
-       fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
-
-       coreFiles := map[string][]byte{
-               "primary.bin": []byte("primary data"),
-               "spans.bin":   []byte("spans data"),
-               "meta.bin":    []byte("meta data"),
-       }
-       for filename, content := range coreFiles {
-               filePath := filepath.Join(partPath, filename)
-               lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
-               require.NoError(t, err)
-               _, err = lf.Write(content)
-               require.NoError(t, err)
-               require.NoError(t, lf.Close())
-       }
-
-       nodeAddr := testNodeAddrPrimary
-       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
-       require.NoError(t, err)
-       defer controller.close()
-
-       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
PartTypeCore, partPath, "group1", 1))
-
        _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, 
PartTypeCore)
        require.Error(t, readErr)
        assert.Contains(t, readErr.Error(), "failed to read metadata.json")
@@ -548,191 +478,3 @@ func 
TestHandoffController_ReadPartFromHandoff_CoreInvalidMetadata(t *testing.T)
                _, err = lf.Write(content)
                require.NoError(t, err)
                require.NoError(t, lf.Close())
-       }
-
-       nodeAddr := testNodeAddrPrimary
-       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
-       require.NoError(t, err)
-       defer controller.close()
-
-       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
PartTypeCore, partPath, "group1", 1))
-
-       _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, 
PartTypeCore)
-       require.Error(t, readErr)
-       assert.Contains(t, readErr.Error(), "failed to parse metadata.json")
-}
-
-func TestHandoffController_ReadPartFromHandoff_SidxWithTimestamps(t 
*testing.T) {
-       tempDir, defFn := test.Space(require.New(t))
-       defer defFn()
-
-       fileSystem := fs.NewLocalFileSystem()
-       l := logger.GetLogger("test")
-
-       sourceRoot := filepath.Join(tempDir, "source")
-       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
-       partID := uint64(0x70)
-
-       manifest := []byte(`{
-               "minTimestamp": 1700000000,
-               "maxTimestamp": 1700001000,
-               "compressedSizeBytes": 512,
-               "uncompressedSizeBytes": 1024,
-               "totalCount": 20,
-               "blocksCount": 2,
-               "minKey": 10,
-               "maxKey": 200,
-               "segmentID": 1700099999
-       }`)
-       sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, 
manifest)
-
-       nodeAddr := testNodeAddrPrimary
-       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
-       require.NoError(t, err)
-       defer controller.close()
-
-       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
"sidx_trace_id", sourcePath, "group1", 1))
-
-       streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, 
partID, "sidx_trace_id")
-       require.NoError(t, err)
-       defer release()
-
-       assert.Equal(t, partID, streamingPart.ID)
-       assert.Equal(t, "group1", streamingPart.Group)
-       assert.Equal(t, "sidx_trace_id", streamingPart.PartType)
-       assert.Equal(t, uint64(512), streamingPart.CompressedSizeBytes)
-       assert.Equal(t, uint64(1024), streamingPart.UncompressedSizeBytes)
-       assert.Equal(t, uint64(20), streamingPart.TotalCount)
-       assert.Equal(t, uint64(2), streamingPart.BlocksCount)
-       assert.Equal(t, int64(10), streamingPart.MinKey)
-       assert.Equal(t, int64(200), streamingPart.MaxKey)
-       // MinTimestamp comes from the pointer field, not SegmentID
-       assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp)
-       assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp)
-}
-
-func TestHandoffController_ReadPartFromHandoff_SidxWithSegmentIDFallback(t 
*testing.T) {
-       tempDir, defFn := test.Space(require.New(t))
-       defer defFn()
-
-       fileSystem := fs.NewLocalFileSystem()
-       l := logger.GetLogger("test")
-
-       sourceRoot := filepath.Join(tempDir, "source")
-       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
-       partID := uint64(0x71)
-
-       // Legacy manifest: no minTimestamp/maxTimestamp, only segmentID
-       manifest := []byte(`{
-               "compressedSizeBytes": 256,
-               "uncompressedSizeBytes": 512,
-               "totalCount": 10,
-               "blocksCount": 1,
-               "minKey": 5,
-               "maxKey": 100,
-               "segmentID": 1700050000
-       }`)
-       sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, 
manifest)
-
-       nodeAddr := testNodeAddrPrimary
-       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
-       require.NoError(t, err)
-       defer controller.close()
-
-       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
"sidx_trace_id", sourcePath, "group1", 1))
-
-       streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, 
partID, "sidx_trace_id")
-       require.NoError(t, err)
-       defer release()
-
-       // MinTimestamp falls back to SegmentID; MaxTimestamp falls back to 
MinTimestamp
-       assert.Equal(t, int64(1700050000), streamingPart.MinTimestamp)
-       assert.Equal(t, int64(1700050000), streamingPart.MaxTimestamp)
-       assert.Equal(t, uint64(256), streamingPart.CompressedSizeBytes)
-}
-
-func TestHandoffController_ReadPartFromHandoff_SidxMissingManifest(t 
*testing.T) {
-       tempDir, defFn := test.Space(require.New(t))
-       defer defFn()
-
-       fileSystem := fs.NewLocalFileSystem()
-       l := logger.GetLogger("test")
-
-       sourceRoot := filepath.Join(tempDir, "source")
-       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
-       partID := uint64(0x72)
-
-       // Create sidx part without manifest.json
-       sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, nil)
-
-       nodeAddr := testNodeAddrPrimary
-       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
-       require.NoError(t, err)
-       defer controller.close()
-
-       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
"sidx_trace_id", sourcePath, "group1", 1))
-
-       _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, 
"sidx_trace_id")
-       require.Error(t, readErr)
-       assert.Contains(t, readErr.Error(), "failed to read manifest.json")
-}
-
-func TestHandoffController_ReadPartFromHandoff_SidxInvalidManifest(t 
*testing.T) {
-       tempDir, defFn := test.Space(require.New(t))
-       defer defFn()
-
-       fileSystem := fs.NewLocalFileSystem()
-       l := logger.GetLogger("test")
-
-       sourceRoot := filepath.Join(tempDir, "source")
-       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
-       partID := uint64(0x73)
-
-       manifest := []byte(`{broken json`)
-       sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, 
manifest)
-
-       nodeAddr := testNodeAddrPrimary
-       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
-       require.NoError(t, err)
-       defer controller.close()
-
-       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
"sidx_trace_id", sourcePath, "group1", 1))
-
-       _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, 
"sidx_trace_id")
-       require.Error(t, readErr)
-       assert.Contains(t, readErr.Error(), "failed to parse manifest.json")
-}
-
-func TestHandoffController_ReadPartFromHandoff_SidxNoValidTimestamp(t 
*testing.T) {
-       tempDir, defFn := test.Space(require.New(t))
-       defer defFn()
-
-       fileSystem := fs.NewLocalFileSystem()
-       l := logger.GetLogger("test")
-
-       sourceRoot := filepath.Join(tempDir, "source")
-       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
-       partID := uint64(0x74)
-
-       // Manifest with no minTimestamp and segmentID=0 (invalid)
-       manifest := []byte(`{
-               "compressedSizeBytes": 128,
-               "totalCount": 5,
-               "blocksCount": 1,
-               "minKey": 1,
-               "maxKey": 10,
-               "segmentID": 0
-       }`)
-       sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, 
manifest)
-
-       nodeAddr := testNodeAddrPrimary
-       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
-       require.NoError(t, err)
-       defer controller.close()
-
-       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
"sidx_trace_id", sourcePath, "group1", 1))
-
-       _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, 
"sidx_trace_id")
-       require.Error(t, readErr)
-       assert.Contains(t, readErr.Error(), "has no valid timestamp")
-}

Reply via email to