This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug-hq-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7c857d46c0e8ecbc3ec7635c3a4bec99681e7f7d Author: Hongtao Gao <[email protected]> AuthorDate: Sun Apr 12 03:32:34 2026 +0000 fix(test): close lock files after writing to prevent FD leaks in tests via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> --- banyand/trace/handoff_replay_test.go | 218 +++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) diff --git a/banyand/trace/handoff_replay_test.go b/banyand/trace/handoff_replay_test.go index 786373526..7605fe1fa 100644 --- a/banyand/trace/handoff_replay_test.go +++ b/banyand/trace/handoff_replay_test.go @@ -428,6 +428,32 @@ func createTestSidxPart(t *testing.T, fileSystem fs.FileSystem, root string, par _, err = lf.Write(content) require.NoError(t, err) require.NoError(t, lf.Close()) +======= + } + + return partPath +} + +func TestHandoffController_ReadPartFromHandoff_CoreMetadata(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x60) + partPath := filepath.Join(sourceRoot, partName(partID)) + fileSystem.MkdirIfNotExist(partPath, storage.DirPerm) + + coreFiles := map[string][]byte{ + "primary.bin": []byte("primary data"), + "spans.bin": []byte("spans data"), + "meta.bin": []byte("meta data"), + "metadata.json": []byte(`{"compressedSizeBytes":1024,"uncompressedSpanSizeBytes":2048,` + + `"totalCount":50,"blocksCount":5,"minTimestamp":1700000000,"maxTimestamp":1700001000}`), +>>>>>>> 77e23c9a (fix(test): close lock files after writing to prevent FD leaks in tests) "tag.type": []byte("tag type"), "traceID.filter": []byte("filter"), } @@ -438,6 +464,7 @@ func createTestSidxPart(t *testing.T, fileSystem fs.FileSystem, root string, par _, err = lf.Write(content) require.NoError(t, err) require.NoError(t, lf.Close()) +<<<<<<< HEAD } nodeAddr := testNodeAddrPrimary @@ -478,3 +505,194 @@ func TestHandoffController_ReadPartFromHandoff_CoreInvalidMetadata(t *testing.T) _, err = lf.Write(content) require.NoError(t, err) require.NoError(t, lf.Close()) +<<<<<<< HEAD +======= + } + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, PartTypeCore, partPath, "group1", 1)) + + _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, PartTypeCore) + require.Error(t, readErr) + assert.Contains(t, readErr.Error(), "failed to parse metadata.json") +} + +func TestHandoffController_ReadPartFromHandoff_SidxWithTimestamps(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x70) + + manifest := []byte(`{ + "minTimestamp": 1700000000, + "maxTimestamp": 1700001000, + "compressedSizeBytes": 512, + "uncompressedSizeBytes": 1024, + "totalCount": 20, + "blocksCount": 2, + "minKey": 10, + "maxKey": 200, + "segmentID": 1700099999 + }`) + sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) + + streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") + require.NoError(t, err) + defer release() + + assert.Equal(t, partID, streamingPart.ID) + assert.Equal(t, "group1", streamingPart.Group) + assert.Equal(t, "sidx_trace_id", streamingPart.PartType) + assert.Equal(t, uint64(512), streamingPart.CompressedSizeBytes) + assert.Equal(t, uint64(1024), streamingPart.UncompressedSizeBytes) + assert.Equal(t, uint64(20), streamingPart.TotalCount) + assert.Equal(t, uint64(2), streamingPart.BlocksCount) + assert.Equal(t, int64(10), streamingPart.MinKey) + assert.Equal(t, int64(200), streamingPart.MaxKey) + // MinTimestamp comes from the pointer field, not SegmentID + assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp) + assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp) +} + +func TestHandoffController_ReadPartFromHandoff_SidxWithSegmentIDFallback(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x71) + + // Legacy manifest: no minTimestamp/maxTimestamp, only segmentID + manifest := []byte(`{ + "compressedSizeBytes": 256, + "uncompressedSizeBytes": 512, + "totalCount": 10, + "blocksCount": 1, + "minKey": 5, + "maxKey": 100, + "segmentID": 1700050000 + }`) + sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) + + streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") + require.NoError(t, err) + defer release() + + // MinTimestamp falls back to SegmentID; MaxTimestamp falls back to MinTimestamp + assert.Equal(t, int64(1700050000), streamingPart.MinTimestamp) + assert.Equal(t, int64(1700050000), streamingPart.MaxTimestamp) + assert.Equal(t, uint64(256), streamingPart.CompressedSizeBytes) +} + +func TestHandoffController_ReadPartFromHandoff_SidxMissingManifest(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x72) + + // Create sidx part without manifest.json + sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, nil) + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) + + _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") + require.Error(t, readErr) + assert.Contains(t, readErr.Error(), "failed to read manifest.json") +} + +func TestHandoffController_ReadPartFromHandoff_SidxInvalidManifest(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x73) + + manifest := []byte(`{broken json`) + sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) + + _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") + require.Error(t, readErr) + assert.Contains(t, readErr.Error(), "failed to parse manifest.json") +} + +func TestHandoffController_ReadPartFromHandoff_SidxNoValidTimestamp(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x74) + + // Manifest with no minTimestamp and segmentID=0 (invalid) + manifest := []byte(`{ + "compressedSizeBytes": 128, + "totalCount": 5, + "blocksCount": 1, + "minKey": 1, + "maxKey": 10, + "segmentID": 0 + }`) + sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) + + _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") + require.Error(t, readErr) + assert.Contains(t, readErr.Error(), "has no valid timestamp") +} +>>>>>>> 77e23c9a (fix(test): close lock files after writing to prevent FD leaks in tests)
