This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7f7a5683ba6964857df3c646ee42715921e2897b Author: Gao Hongtao <[email protected]> AuthorDate: Wed Apr 8 18:06:01 2026 +0800 Stable Segment Endtime (#1051) --- .gitignore | 3 + CHANGES.md | 13 ++ banyand/internal/storage/segment.go | 38 ++++-- banyand/internal/storage/segment_test.go | 219 +++++++++++++++++++++++++++++++ banyand/internal/storage/version.go | 23 +++- banyand/internal/storage/version_test.go | 63 +++++++++ banyand/internal/storage/versions.yml | 1 + 7 files changed, 348 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 9af8ace51..3235cb1ac 100644 --- a/.gitignore +++ b/.gitignore @@ -81,6 +81,9 @@ gomock_reflect* # eBPF generated files and binaries fodc/agent/internal/ktm/iomonitor/ebpf/generated/vmlinux.h +<<<<<<< HEAD # OpenSSL serial files *.srl +======= +>>>>>>> 29e87c9d (Stable Segment Endtime (#1051)) diff --git a/CHANGES.md b/CHANGES.md index 80c4f6c93..ace8b3034 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,7 +2,20 @@ Release Notes. +<<<<<<< HEAD ## 0.10.2 +======= +## 0.11.0 + +### Features + +- Organize access logs under a dedicated "accesslog" subdirectory to improve log organization and separation from other application data. +- Collect BanyanDB data on e2e test failure for CI debugging. +- Add log query e2e test. +- Sync lifecycle e2e test from SkyWalking stages test. +- Add periodic health check for property schema connection. +- Persist segment end time in per-segment metadata so boundaries don't shift across restarts or config changes. +>>>>>>> 29e87c9d (Stable Segment Endtime (#1051)) ### Bug Fixes diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index e4239e0bc..381a11478 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -19,6 +19,7 @@ package storage import ( "context" + "encoding/json" "fmt" "io/fs" "path" @@ -33,7 +34,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - "github.com/apache/skywalking-banyandb/pkg/convert" banyanfs "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -491,23 +491,32 @@ func (sc *segmentController[T, O]) open() error { return nil } metadataPath := path.Join(segmentPath, metadataFilename) - version, err := sc.lfs.Read(metadataPath) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { + rawMeta, readErr := sc.lfs.Read(metadataPath) + if readErr != nil { + if errors.Is(readErr, fs.ErrNotExist) { emptySegments = append(emptySegments, segmentPath) return nil } - return err + return readErr } - if len(version) == 0 { + if len(rawMeta) == 0 { emptySegments = append(emptySegments, segmentPath) return nil } - if err = checkVersion(convert.BytesToString(version)); err != nil { - return err + meta, parseErr := readSegmentMeta(rawMeta) + if parseErr != nil { + return parseErr } - _, err = sc.load(start, end, sc.location) - return err + segmentEnd := end + if meta.EndTime != "" { + parsedEnd, timeErr := time.Parse(time.RFC3339Nano, meta.EndTime) + if timeErr != nil { + return timeErr + } + segmentEnd = parsedEnd + } + _, loadErr := sc.load(start, segmentEnd, sc.location) + return loadErr }) if len(emptySegments) > 0 { sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments found, removing them.") @@ -551,7 +560,14 @@ func (sc *segmentController[T, O]) create(start time.Time) (*segment[T, O], erro } segPath := path.Join(sc.location, fmt.Sprintf(segTemplate, sc.format(start))) sc.lfs.MkdirPanicIfExist(segPath, DirPerm) - data := []byte(currentVersion) + meta := segmentMeta{ + Version: currentVersion, + EndTime: end.Format(time.RFC3339Nano), + } + data, marshalErr := json.Marshal(meta) + if marshalErr != nil { + logger.Panicf("cannot marshal segment metadata: %s", marshalErr) + } metadataPath := filepath.Join(segPath, metadataFilename) lf, err := sc.lfs.CreateLockFile(metadataPath, FilePerm) if err != nil { diff --git a/banyand/internal/storage/segment_test.go b/banyand/internal/storage/segment_test.go index 7bcc1063b..9a55dd358 100644 --- a/banyand/internal/storage/segment_test.go +++ b/banyand/internal/storage/segment_test.go @@ -19,6 +19,7 @@ package storage import ( "context" + "encoding/json" "fmt" "os" "path/filepath" @@ -641,3 +642,221 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t *testing.T) { "Remaining segment %d should be from the expected date", i) } } + +func TestCreateSegmentWritesJSONMetadata(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-segment-metadata") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{ + Database: "test-db", + Stage: "test-stage", + } + }) + + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 1, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 7}, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + serviceCache := NewServiceCache().(*serviceCache) + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, + tempDir, + l, + opts, + nil, + nil, + 5*time.Minute, + fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), opts.MemoryLimit), + serviceCache, + group, + ) + + now := time.Now().UTC() + startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + + seg, createErr := sc.create(startTime) + require.NoError(t, createErr) + require.NotNil(t, seg) + + // Read metadata from disk and verify it's JSON with endTime + suffix := startTime.Format(dayFormat) + metadataPath := filepath.Join(tempDir, fmt.Sprintf("seg-%s", suffix), metadataFilename) + rawMeta, readErr := os.ReadFile(metadataPath) + require.NoError(t, readErr) + + meta, parseErr := readSegmentMeta(rawMeta) + require.NoError(t, parseErr) + assert.Equal(t, currentVersion, meta.Version) + assert.NotEmpty(t, meta.EndTime, "endTime should be persisted in metadata") + + // Verify the endTime matches the segment's End + expectedEnd := startTime.Add(24 * time.Hour) + assert.Equal(t, expectedEnd.Format(time.RFC3339Nano), meta.EndTime) + + seg.DecRef() +} + +func TestOpenReadsPersistedEndTime(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-open-endtime") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{ + Database: "test-db", + Stage: "test-stage", + } + }) + + group := "test-group" + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 1, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 30}, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + serviceCache := NewServiceCache().(*serviceCache) + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, + tempDir, + l, + opts, + nil, + nil, + 5*time.Minute, + fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), opts.MemoryLimit), + serviceCache, + group, + ) + + now := time.Now().UTC() + day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + day2 := day1.Add(24 * time.Hour) + + // Manually create segment directories with JSON metadata. + // day1 with a specific endTime that differs from the default NextTime. + customEnd := day1.Add(12 * time.Hour) + segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath1, DirPerm)) + meta1 := segmentMeta{Version: currentVersion, EndTime: customEnd.Format(time.RFC3339Nano)} + meta1Data, marshalErr := json.Marshal(meta1) + require.NoError(t, marshalErr) + require.NoError(t, os.WriteFile(filepath.Join(segPath1, metadataFilename), meta1Data, FilePerm)) + + // day2 with standard endTime. + segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath2, DirPerm)) + meta2 := segmentMeta{Version: currentVersion, EndTime: day2.Add(24 * time.Hour).Format(time.RFC3339Nano)} + meta2Data, marshalErr := json.Marshal(meta2) + require.NoError(t, marshalErr) + require.NoError(t, os.WriteFile(filepath.Join(segPath2, metadataFilename), meta2Data, FilePerm)) + + // Open the controller (loads segments from disk). + openErr := sc.open() + require.NoError(t, openErr) + + // Verify segments loaded correctly. + require.Len(t, sc.lst, 2) + + // First segment should have the custom endTime from metadata. + assert.Equal(t, customEnd, sc.lst[0].End, "segment 0 should use persisted endTime") + // Second segment should have its endTime from metadata. + assert.Equal(t, day2.Add(24*time.Hour), sc.lst[1].End, "segment 1 should use persisted endTime") + + for _, seg := range sc.lst { + seg.DecRef() + } +} + +func TestOpenFallbackOldFormatMetadata(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-open-fallback") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{ + Database: "test-db", + Stage: "test-stage", + } + }) + + group := "test-group" + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 1, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 30}, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + serviceCache := NewServiceCache().(*serviceCache) + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, + tempDir, + l, + opts, + nil, + nil, + 5*time.Minute, + fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), opts.MemoryLimit), + serviceCache, + group, + ) + + now := time.Now() + day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local) + day2 := day1.Add(24 * time.Hour) + + // Create segments with OLD format metadata (plain version string). + segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath1, DirPerm)) + require.NoError(t, os.WriteFile(filepath.Join(segPath1, metadataFilename), []byte("1.4.0"), FilePerm)) + + segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath2, DirPerm)) + require.NoError(t, os.WriteFile(filepath.Join(segPath2, metadataFilename), []byte("1.4.0"), FilePerm)) + + // Open should succeed with fallback end time computation. + openErr := sc.open() + require.NoError(t, openErr) + + require.Len(t, sc.lst, 2) + + // First segment's end should be day2's start (fallback: end = next segment's start). + assert.Equal(t, day2, sc.lst[0].End, "old format should use fallback end time") + // Second segment's end should be day2 + 24h (fallback: end = NextTime(start)). + assert.Equal(t, day2.Add(24*time.Hour), sc.lst[1].End, "last segment should use NextTime fallback") + + for _, seg := range sc.lst { + seg.DecRef() + } +} diff --git a/banyand/internal/storage/version.go b/banyand/internal/storage/version.go index 55a92f04b..ddea79a33 100644 --- a/banyand/internal/storage/version.go +++ b/banyand/internal/storage/version.go @@ -28,7 +28,7 @@ import ( const ( metadataFilename = "metadata" - currentVersion = "1.4.0" + currentVersion = "1.5.0" compatibleVersionsKey = "versions" compatibleVersionsFilename = "versions.yml" ) @@ -49,6 +49,27 @@ func checkVersion(version string) error { return errors.WithMessagef(errVersionIncompatible, "incompatible version %s, supported versions: %s", version, strings.Join(compatibleVersions, ", ")) } +type segmentMeta struct { + Version string `json:"version"` + EndTime string `json:"endTime,omitempty"` +} + +func readSegmentMeta(data []byte) (segmentMeta, error) { + var meta segmentMeta + trimmed := strings.TrimSpace(string(data)) + if len(trimmed) > 0 && trimmed[0] == '{' { + if unmarshalErr := json.Unmarshal(data, &meta); unmarshalErr != nil { + return segmentMeta{}, unmarshalErr + } + } else { + meta.Version = trimmed + } + if checkErr := checkVersion(meta.Version); checkErr != nil { + return segmentMeta{}, checkErr + } + return meta, nil +} + func readCompatibleVersions() []string { i, err := versionFS.ReadFile(compatibleVersionsFilename) if err != nil { diff --git a/banyand/internal/storage/version_test.go b/banyand/internal/storage/version_test.go new file mode 100644 index 000000000..726e6d8a2 --- /dev/null +++ b/banyand/internal/storage/version_test.go @@ -0,0 +1,63 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReadSegmentMeta_NewFormat(t *testing.T) { + data := []byte(`{"version":"1.4.0","endTime":"2026-04-07T00:00:00+08:00"}`) + meta, err := readSegmentMeta(data) + require.NoError(t, err) + assert.Equal(t, "1.4.0", meta.Version) + assert.Equal(t, "2026-04-07T00:00:00+08:00", meta.EndTime) +} + +func TestReadSegmentMeta_OldFormat(t *testing.T) { + data := []byte("1.4.0") + meta, err := readSegmentMeta(data) + require.NoError(t, err) + assert.Equal(t, "1.4.0", meta.Version) + assert.Equal(t, "", meta.EndTime) +} + +func TestReadSegmentMeta_OldFormatWithNewline(t *testing.T) { + data := []byte("1.4.0\n") + meta, err := readSegmentMeta(data) + require.NoError(t, err) + assert.Equal(t, "1.4.0", meta.Version) + assert.Equal(t, "", meta.EndTime) +} + +func TestReadSegmentMeta_IncompatibleVersion(t *testing.T) { + data := []byte(`{"version":"0.1.0","endTime":"2026-04-07T00:00:00+08:00"}`) + _, err := readSegmentMeta(data) + assert.Error(t, err) +} + +func TestReadSegmentMeta_NewFormatNoEndTime(t *testing.T) { + data := []byte(`{"version":"1.4.0"}`) + meta, err := readSegmentMeta(data) + require.NoError(t, err) + assert.Equal(t, "1.4.0", meta.Version) + assert.Equal(t, "", meta.EndTime) +} diff --git a/banyand/internal/storage/versions.yml b/banyand/internal/storage/versions.yml index e5cb937c5..ddac13c49 100644 --- a/banyand/internal/storage/versions.yml +++ b/banyand/internal/storage/versions.yml @@ -15,3 +15,4 @@ versions: - 1.4.0 +- 1.5.0
