This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug-hq-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3d40ac1afeb937b28889b6bc3e8fd76cb0a3441d Author: Hongtao Gao <[email protected]> AuthorDate: Sun Apr 12 01:55:04 2026 +0000 fix(handoff): prevent size limit bypass and sidx timestamp corruption in handoff replay - Replace canEnqueue+updateTotalSize with atomic tryReserveSize to close TOCTOU race that allowed concurrent enqueues to exceed the size limit. - Read manifest.json for sidx parts in readPartFromHandoff, populating MinTimestamp/MaxTimestamp with the same fallback logic as streaming sync (pointer field → SegmentID legacy → warn). Previously sidx parts were replayed with MinTimestamp=0, causing rejection or seg-19700101 creation. - Surface metadata read/parse errors instead of silently swallowing them. - Add queryTraceByService to integration test to verify sidx replay via indexed tag lookup, which the existing trace_id-only query couldn't catch. - Clean up orphaned parts when no snapshot references them in tsTable init. --- banyand/trace/handoff_replay_test.go | 258 ----------------------------------- 1 file changed, 258 deletions(-) diff --git a/banyand/trace/handoff_replay_test.go b/banyand/trace/handoff_replay_test.go index 87335f2a1..786373526 100644 --- a/banyand/trace/handoff_replay_test.go +++ b/banyand/trace/handoff_replay_test.go @@ -428,30 +428,6 @@ func createTestSidxPart(t *testing.T, fileSystem fs.FileSystem, root string, par _, err = lf.Write(content) require.NoError(t, err) require.NoError(t, lf.Close()) - } - - return partPath -} - -func TestHandoffController_ReadPartFromHandoff_CoreMetadata(t *testing.T) { - tempDir, defFn := test.Space(require.New(t)) - defer defFn() - - fileSystem := fs.NewLocalFileSystem() - l := logger.GetLogger("test") - - sourceRoot := filepath.Join(tempDir, "source") - fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) - partID := uint64(0x60) - partPath := filepath.Join(sourceRoot, partName(partID)) - fileSystem.MkdirIfNotExist(partPath, storage.DirPerm) - - coreFiles := map[string][]byte{ - "primary.bin": []byte("primary data"), - "spans.bin": []byte("spans data"), - "meta.bin": []byte("meta data"), - "metadata.json": []byte(`{"compressedSizeBytes":1024,"uncompressedSpanSizeBytes":2048,` + - `"totalCount":50,"blocksCount":5,"minTimestamp":1700000000,"maxTimestamp":1700001000}`), "tag.type": []byte("tag type"), "traceID.filter": []byte("filter"), } @@ -471,52 +447,6 @@ func TestHandoffController_ReadPartFromHandoff_CoreMetadata(t *testing.T) { require.NoError(t, controller.enqueueForNode(nodeAddr, partID, PartTypeCore, partPath, "group1", 1)) - streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, partID, PartTypeCore) - require.NoError(t, err) - defer release() - - assert.Equal(t, uint64(1024), streamingPart.CompressedSizeBytes) - assert.Equal(t, uint64(2048), streamingPart.UncompressedSizeBytes) - assert.Equal(t, uint64(50), streamingPart.TotalCount) - assert.Equal(t, uint64(5), streamingPart.BlocksCount) - assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp) - assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp) -} - -func TestHandoffController_ReadPartFromHandoff_CoreMissingMetadata(t *testing.T) { - tempDir, defFn := test.Space(require.New(t)) - defer defFn() - - fileSystem := fs.NewLocalFileSystem() - l := logger.GetLogger("test") - - sourceRoot := filepath.Join(tempDir, "source") - fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) - partID := uint64(0x61) - partPath := filepath.Join(sourceRoot, partName(partID)) - fileSystem.MkdirIfNotExist(partPath, storage.DirPerm) - - coreFiles := map[string][]byte{ - "primary.bin": []byte("primary data"), - "spans.bin": []byte("spans data"), - "meta.bin": []byte("meta data"), - } - for filename, content := range coreFiles { - filePath := filepath.Join(partPath, filename) - lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm) - require.NoError(t, err) - _, err = lf.Write(content) - require.NoError(t, err) - require.NoError(t, lf.Close()) - } - - nodeAddr := testNodeAddrPrimary - controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) - require.NoError(t, err) - defer controller.close() - - require.NoError(t, controller.enqueueForNode(nodeAddr, partID, PartTypeCore, partPath, "group1", 1)) - _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, PartTypeCore) require.Error(t, readErr) assert.Contains(t, readErr.Error(), "failed to read metadata.json") @@ -548,191 +478,3 @@ func TestHandoffController_ReadPartFromHandoff_CoreInvalidMetadata(t *testing.T) _, err = lf.Write(content) require.NoError(t, err) require.NoError(t, lf.Close()) - } - - nodeAddr := testNodeAddrPrimary - controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) - require.NoError(t, err) - defer controller.close() - - require.NoError(t, controller.enqueueForNode(nodeAddr, partID, PartTypeCore, partPath, "group1", 1)) - - _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, PartTypeCore) - require.Error(t, readErr) - assert.Contains(t, readErr.Error(), "failed to parse metadata.json") -} - -func TestHandoffController_ReadPartFromHandoff_SidxWithTimestamps(t *testing.T) { - tempDir, defFn := test.Space(require.New(t)) - defer defFn() - - fileSystem := fs.NewLocalFileSystem() - l := logger.GetLogger("test") - - sourceRoot := filepath.Join(tempDir, "source") - fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) - partID := uint64(0x70) - - manifest := []byte(`{ - "minTimestamp": 1700000000, - "maxTimestamp": 1700001000, - "compressedSizeBytes": 512, - "uncompressedSizeBytes": 1024, - "totalCount": 20, - "blocksCount": 2, - "minKey": 10, - "maxKey": 200, - "segmentID": 1700099999 - }`) - sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) - - nodeAddr := testNodeAddrPrimary - controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) - require.NoError(t, err) - defer controller.close() - - require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) - - streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") - require.NoError(t, err) - defer release() - - assert.Equal(t, partID, streamingPart.ID) - assert.Equal(t, "group1", streamingPart.Group) - assert.Equal(t, "sidx_trace_id", streamingPart.PartType) - assert.Equal(t, uint64(512), streamingPart.CompressedSizeBytes) - assert.Equal(t, uint64(1024), streamingPart.UncompressedSizeBytes) - assert.Equal(t, uint64(20), streamingPart.TotalCount) - assert.Equal(t, uint64(2), streamingPart.BlocksCount) - assert.Equal(t, int64(10), streamingPart.MinKey) - assert.Equal(t, int64(200), streamingPart.MaxKey) - // MinTimestamp comes from the pointer field, not SegmentID - assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp) - assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp) -} - -func TestHandoffController_ReadPartFromHandoff_SidxWithSegmentIDFallback(t *testing.T) { - tempDir, defFn := test.Space(require.New(t)) - defer defFn() - - fileSystem := fs.NewLocalFileSystem() - l := logger.GetLogger("test") - - sourceRoot := filepath.Join(tempDir, "source") - fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) - partID := uint64(0x71) - - // Legacy manifest: no minTimestamp/maxTimestamp, only segmentID - manifest := []byte(`{ - "compressedSizeBytes": 256, - "uncompressedSizeBytes": 512, - "totalCount": 10, - "blocksCount": 1, - "minKey": 5, - "maxKey": 100, - "segmentID": 1700050000 - }`) - sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) - - nodeAddr := testNodeAddrPrimary - controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) - require.NoError(t, err) - defer controller.close() - - require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) - - streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") - require.NoError(t, err) - defer release() - - // MinTimestamp falls back to SegmentID; MaxTimestamp falls back to MinTimestamp - assert.Equal(t, int64(1700050000), streamingPart.MinTimestamp) - assert.Equal(t, int64(1700050000), streamingPart.MaxTimestamp) - assert.Equal(t, uint64(256), streamingPart.CompressedSizeBytes) -} - -func TestHandoffController_ReadPartFromHandoff_SidxMissingManifest(t *testing.T) { - tempDir, defFn := test.Space(require.New(t)) - defer defFn() - - fileSystem := fs.NewLocalFileSystem() - l := logger.GetLogger("test") - - sourceRoot := filepath.Join(tempDir, "source") - fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) - partID := uint64(0x72) - - // Create sidx part without manifest.json - sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, nil) - - nodeAddr := testNodeAddrPrimary - controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) - require.NoError(t, err) - defer controller.close() - - require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) - - _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") - require.Error(t, readErr) - assert.Contains(t, readErr.Error(), "failed to read manifest.json") -} - -func TestHandoffController_ReadPartFromHandoff_SidxInvalidManifest(t *testing.T) { - tempDir, defFn := test.Space(require.New(t)) - defer defFn() - - fileSystem := fs.NewLocalFileSystem() - l := logger.GetLogger("test") - - sourceRoot := filepath.Join(tempDir, "source") - fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) - partID := uint64(0x73) - - manifest := []byte(`{broken json`) - sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) - - nodeAddr := testNodeAddrPrimary - controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) - require.NoError(t, err) - defer controller.close() - - require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) - - _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") - require.Error(t, readErr) - assert.Contains(t, readErr.Error(), "failed to parse manifest.json") -} - -func TestHandoffController_ReadPartFromHandoff_SidxNoValidTimestamp(t *testing.T) { - tempDir, defFn := test.Space(require.New(t)) - defer defFn() - - fileSystem := fs.NewLocalFileSystem() - l := logger.GetLogger("test") - - sourceRoot := filepath.Join(tempDir, "source") - fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) - partID := uint64(0x74) - - // Manifest with no minTimestamp and segmentID=0 (invalid) - manifest := []byte(`{ - "compressedSizeBytes": 128, - "totalCount": 5, - "blocksCount": 1, - "minKey": 1, - "maxKey": 10, - "segmentID": 0 - }`) - sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) - - nodeAddr := testNodeAddrPrimary - controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) - require.NoError(t, err) - defer controller.close() - - require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) - - _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") - require.Error(t, readErr) - assert.Contains(t, readErr.Error(), "has no valid timestamp") -}
