This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new d2cb7daf6 Fix lifecycle migration segment shorter than the configured
`SegmentInterval` (#1120)
d2cb7daf6 is described below
commit d2cb7daf6adbc57f5daf15499595cbc2b021033a
Author: mrproliu <[email protected]>
AuthorDate: Sat May 9 12:46:34 2026 +0800
Fix lifecycle migration segment shorter than the configured
`SegmentInterval` (#1120)
---
CHANGES.md | 1 +
banyand/backup/lifecycle/segment_boundary_utils.go | 43 +-
.../lifecycle/segment_boundary_utils_test.go | 229 ++++++++++-
banyand/backup/lifecycle/steps.go | 73 +++-
banyand/backup/lifecycle/steps_test.go | 93 +++++
banyand/internal/storage/rotation_test.go | 30 +-
banyand/internal/storage/segment.go | 24 +-
banyand/internal/storage/segment_test.go | 453 +++++++++++++++++++++
banyand/internal/storage/storage.go | 39 ++
banyand/internal/wqueue/wqueue.go | 2 +-
banyand/internal/wqueue/wqueue_test.go | 25 +-
11 files changed, 930 insertions(+), 82 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 2f4b67fa6..546e8e1fb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -87,6 +87,7 @@ Release Notes.
- Fix nil-pointer panic on cold-tier data nodes when FODC `InspectAll` raced
with idle-segment cleanup.
- Add `GroupLifecycleInfo.errors` to surface per-group collection failures
from FODC `InspectAll` instead of silently dropping the affected node entry.
- Fix `CollectDataInfo` and `CollectLiaisonInfo` not handling
`CATALOG_PROPERTY` groups.
+- Fix lifecycle migration where the receiving node could create segments
shorter than the configured `SegmentInterval`.
### Chores
diff --git a/banyand/backup/lifecycle/segment_boundary_utils.go
b/banyand/backup/lifecycle/segment_boundary_utils.go
index c3f0d6a12..1fd32c298 100644
--- a/banyand/backup/lifecycle/segment_boundary_utils.go
+++ b/banyand/backup/lifecycle/segment_boundary_utils.go
@@ -25,34 +25,18 @@ import (
)
func calculateTargetSegments(partMinTS, partMaxTS int64, targetInterval
storage.IntervalRule) []time.Time {
- minTime := time.Unix(0, partMinTS).UTC()
- maxTime := time.Unix(0, partMaxTS).UTC()
+ // Use time.Local so sender's bucket math stays on the same per-timezone
+ // grid as the receiver, which sees ctx.MinTimestamp as time.Local too.
+ minTime := time.Unix(0, partMinTS)
+ maxTime := time.Unix(0, partMaxTS)
var targetSegments []time.Time
-
- var segmentStart time.Time
- switch targetInterval.Unit {
- case storage.DAY:
- daysSinceEpoch := minTime.Sub(time.Date(1970, 1, 1, 0, 0, 0, 0,
time.UTC)).Hours() / 24
- segmentIndex := int(daysSinceEpoch) / targetInterval.Num
- segmentStart = time.Date(1970, 1,
1+segmentIndex*targetInterval.Num, 0, 0, 0, 0, time.UTC)
- case storage.HOUR:
- hoursSinceEpoch := minTime.Sub(time.Date(1970, 1, 1, 0, 0, 0,
0, time.UTC)).Hours()
- segmentIndex := int(hoursSinceEpoch) / targetInterval.Num
- segmentStart = time.Date(1970, 1, 1,
segmentIndex*targetInterval.Num, 0, 0, 0, time.UTC)
- default:
- segmentStart = targetInterval.Unit.Standard(minTime)
- }
-
- current := segmentStart
- for !current.After(maxTime) {
- segmentEnd := targetInterval.NextTime(current)
- if !(segmentEnd.Before(minTime) || current.After(maxTime)) {
- targetSegments = append(targetSegments, current)
- }
- current = segmentEnd
+ // current starts at the bucket containing minTime, so segmentEnd is
+ // always > minTime and the loop guard already excludes anything past
+ // maxTime - every iteration produces a real overlap.
+ for current := targetInterval.Standard(minTime);
!current.After(maxTime); current = targetInterval.NextTime(current) {
+ targetSegments = append(targetSegments, current)
}
-
return targetSegments
}
@@ -61,12 +45,11 @@ func getSegmentTimeRange(segmentStart time.Time, interval
storage.IntervalRule)
return timestamp.NewSectionTimeRange(segmentStart, segmentEnd)
}
+// getTargetStageInterval returns the segment interval of the migration target
+// (the next stage relative to the current node), as populated by parseGroup.
func getTargetStageInterval(group *GroupConfig) storage.IntervalRule {
- if group.ResourceOpts != nil && len(group.ResourceOpts.Stages) > 0 {
- stage := group.ResourceOpts.Stages[0]
- if stage.SegmentInterval != nil {
- return storage.MustToIntervalRule(stage.SegmentInterval)
- }
+ if group.TargetSegmentInterval != nil {
+ return storage.MustToIntervalRule(group.TargetSegmentInterval)
}
if group.ResourceOpts != nil && group.ResourceOpts.SegmentInterval !=
nil {
diff --git a/banyand/backup/lifecycle/segment_boundary_utils_test.go
b/banyand/backup/lifecycle/segment_boundary_utils_test.go
index 003558667..98777f455 100644
--- a/banyand/backup/lifecycle/segment_boundary_utils_test.go
+++ b/banyand/backup/lifecycle/segment_boundary_utils_test.go
@@ -18,11 +18,13 @@
package lifecycle
import (
+ "sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
)
@@ -37,54 +39,54 @@ func TestCalculateTargetSegments(t *testing.T) {
}{
{
name: "2-day to 3-day segments - part spans
multiple segments",
- partMinTS: time.Date(2024, 1, 2, 0, 0, 0, 0,
time.UTC).UnixNano(), // Day 2
- partMaxTS: time.Date(2024, 1, 4, 0, 0, 0, 0,
time.UTC).UnixNano(), // Day 4
+ partMinTS: time.Date(2024, 1, 2, 0, 0, 0, 0,
time.Local).UnixNano(), // Day 2
+ partMaxTS: time.Date(2024, 1, 4, 0, 0, 0, 0,
time.Local).UnixNano(), // Day 4
targetInterval: storage.IntervalRule{
Unit: storage.DAY,
Num: 3,
},
expected: []time.Time{
- time.Date(2024, 1, 0, 0, 0, 0, 0, time.UTC), //
Day 0-3
- time.Date(2024, 1, 3, 0, 0, 0, 0, time.UTC), //
Day 3-6
+ time.Date(2023, 12, 31, 0, 0, 0, 0,
time.Local), // Day 0-3
+ time.Date(2024, 1, 3, 0, 0, 0, 0, time.Local),
// Day 3-6
},
},
{
name: "2-day to 3-day segments - part fits in
single segment",
- partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0,
time.UTC).UnixNano(), // Day 1
- partMaxTS: time.Date(2024, 1, 2, 0, 0, 0, 0,
time.UTC).UnixNano(), // Day 2
+ partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0,
time.Local).UnixNano(), // Day 1
+ partMaxTS: time.Date(2024, 1, 2, 0, 0, 0, 0,
time.Local).UnixNano(), // Day 2
targetInterval: storage.IntervalRule{
Unit: storage.DAY,
Num: 3,
},
expected: []time.Time{
- time.Date(2024, 1, 0, 0, 0, 0, 0, time.UTC), //
Day 0-3
+ time.Date(2023, 12, 31, 0, 0, 0, 0,
time.Local), // Day 0-3
},
},
{
name: "hour to day segments - part spans multiple
segments",
- partMinTS: time.Date(2024, 1, 1, 12, 0, 0, 0,
time.UTC).UnixNano(), // Hour 12
- partMaxTS: time.Date(2024, 1, 1, 18, 0, 0, 0,
time.UTC).UnixNano(), // Hour 18
+ partMinTS: time.Date(2024, 1, 1, 12, 0, 0, 0,
time.Local).UnixNano(), // Hour 12
+ partMaxTS: time.Date(2024, 1, 1, 18, 0, 0, 0,
time.Local).UnixNano(), // Hour 18
targetInterval: storage.IntervalRule{
Unit: storage.DAY,
Num: 1,
},
expected: []time.Time{
- time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), //
Day 1
+ time.Date(2024, 1, 1, 0, 0, 0, 0, time.Local),
// Day 1
},
},
{
name: "day to hour segments - part spans multiple
segments",
- partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0,
time.UTC).UnixNano(), // Day 1
- partMaxTS: time.Date(2024, 1, 1, 23, 59, 59, 0,
time.UTC).UnixNano(), // End of Day 1
+ partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0,
time.Local).UnixNano(), // Day 1
+ partMaxTS: time.Date(2024, 1, 1, 23, 59, 59, 0,
time.Local).UnixNano(), // End of Day 1
targetInterval: storage.IntervalRule{
Unit: storage.HOUR,
Num: 6,
},
expected: []time.Time{
- time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
// Hour 0-6
- time.Date(2024, 1, 1, 6, 0, 0, 0, time.UTC),
// Hour 6-12
- time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC),
// Hour 12-18
- time.Date(2024, 1, 1, 18, 0, 0, 0, time.UTC),
// Hour 18-24
+ time.Date(2024, 1, 1, 0, 0, 0, 0, time.Local),
// Hour 0-6
+ time.Date(2024, 1, 1, 6, 0, 0, 0, time.Local),
// Hour 6-12
+ time.Date(2024, 1, 1, 12, 0, 0, 0, time.Local),
// Hour 12-18
+ time.Date(2024, 1, 1, 18, 0, 0, 0, time.Local),
// Hour 18-24
},
},
}
@@ -136,3 +138,198 @@ func TestGetSegmentTimeRange(t *testing.T) {
})
}
}
+
+const (
+ stageWarm = "warm"
+ stageCold = "cold"
+ stageFrozen = "frozen"
+ selectorWarm = "type=warm"
+ selectorCold = "type=cold"
+ selectorFrozen = "type=frozen"
+)
+
+func dayInterval(num uint32) *commonv1.IntervalRule {
+ return &commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY,
Num: num}
+}
+
+func stage(name, selector string, segNum, ttlNum uint32)
*commonv1.LifecycleStage {
+ return &commonv1.LifecycleStage{
+ Name: name,
+ SegmentInterval: dayInterval(segNum),
+ Ttl: dayInterval(ttlNum),
+ NodeSelector: selector,
+ }
+}
+
+// TestGetTargetStageInterval pins every branch of getTargetStageInterval:
+// the explicit TargetSegmentInterval, the ResourceOpts.SegmentInterval
+// fallback, and the final 1d default.
+func TestGetTargetStageInterval(t *testing.T) {
+ //nolint:govet // fieldalignment: test struct optimization not critical
+ type tc struct {
+ name string
+ group *GroupConfig
+ expected storage.IntervalRule
+ }
+
+ cases := []tc{
+ {
+ name: "3-stage warm->cold returns cold's 15d
(sw_metricsHour)",
+ group: &GroupConfig{
+ Group: &commonv1.Group{
+ ResourceOpts: &commonv1.ResourceOpts{
+ SegmentInterval: dayInterval(5),
+ Stages:
[]*commonv1.LifecycleStage{
+ stage(stageWarm,
selectorWarm, 7, 7),
+ stage(stageCold,
selectorCold, 15, 30),
+ },
+ },
+ },
+ SegmentInterval: dayInterval(7),
+ TargetSegmentInterval: dayInterval(15),
+ },
+ expected: storage.IntervalRule{Unit: storage.DAY, Num:
15},
+ },
+ {
+ name: "3-stage warm->cold returns cold's 5d
(sw_metricsMinute)",
+ group: &GroupConfig{
+ Group: &commonv1.Group{
+ ResourceOpts: &commonv1.ResourceOpts{
+ SegmentInterval: dayInterval(1),
+ Stages:
[]*commonv1.LifecycleStage{
+ stage(stageWarm,
selectorWarm, 3, 7),
+ stage(stageCold,
selectorCold, 5, 30),
+ },
+ },
+ },
+ SegmentInterval: dayInterval(3),
+ TargetSegmentInterval: dayInterval(5),
+ },
+ expected: storage.IntervalRule{Unit: storage.DAY, Num:
5},
+ },
+ {
+ name: "2-stage hot->cold returns cold's interval",
+ group: &GroupConfig{
+ Group: &commonv1.Group{
+ ResourceOpts: &commonv1.ResourceOpts{
+ SegmentInterval: dayInterval(1),
+ Stages:
[]*commonv1.LifecycleStage{
+ stage(stageCold,
selectorCold, 15, 30),
+ },
+ },
+ },
+ SegmentInterval: dayInterval(1),
+ TargetSegmentInterval: dayInterval(15),
+ },
+ expected: storage.IntervalRule{Unit: storage.DAY, Num:
15},
+ },
+ {
+ name: "4-stage chain warm->cold picks cold, not the
last stage",
+ group: &GroupConfig{
+ Group: &commonv1.Group{
+ ResourceOpts: &commonv1.ResourceOpts{
+ SegmentInterval: dayInterval(1),
+ Stages:
[]*commonv1.LifecycleStage{
+ stage(stageWarm,
selectorWarm, 3, 7),
+ stage(stageCold,
selectorCold, 15, 30),
+ stage(stageFrozen,
selectorFrozen, 30, 365),
+ },
+ },
+ },
+ SegmentInterval: dayInterval(3),
+ TargetSegmentInterval: dayInterval(15),
+ },
+ expected: storage.IntervalRule{Unit: storage.DAY, Num:
15},
+ },
+ {
+ name: "fallback to ResourceOpts.SegmentInterval when
TargetSegmentInterval is nil",
+ group: &GroupConfig{
+ Group: &commonv1.Group{
+ ResourceOpts: &commonv1.ResourceOpts{
+ SegmentInterval: dayInterval(7),
+ },
+ },
+ },
+ expected: storage.IntervalRule{Unit: storage.DAY, Num:
7},
+ },
+ {
+ name: "fallback to default 1d when nothing is set",
+ group: &GroupConfig{
+ Group: &commonv1.Group{ResourceOpts:
&commonv1.ResourceOpts{}},
+ },
+ expected: storage.IntervalRule{Unit: storage.DAY, Num:
1},
+ },
+ {
+ name: "fallback handles nil ResourceOpts",
+ group: &GroupConfig{
+ Group: &commonv1.Group{},
+ },
+ expected: storage.IntervalRule{Unit: storage.DAY, Num:
1},
+ },
+ {
+ name: "TargetSegmentInterval wins over
ResourceOpts.SegmentInterval",
+ group: &GroupConfig{
+ Group: &commonv1.Group{
+ ResourceOpts: &commonv1.ResourceOpts{
+ SegmentInterval: dayInterval(1),
+ Stages:
[]*commonv1.LifecycleStage{
+ stage(stageWarm,
selectorWarm, 7, 7),
+ stage(stageCold,
selectorCold, 15, 30),
+ },
+ },
+ },
+ SegmentInterval: dayInterval(7),
+ TargetSegmentInterval: dayInterval(15),
+ },
+ expected: storage.IntervalRule{Unit: storage.DAY, Num:
15},
+ },
+ }
+
+ for _, c := range cases {
+ t.Run(c.name, func(t *testing.T) {
+ got := getTargetStageInterval(c.group)
+ assert.Equal(t, c.expected, got,
+ "target stage interval mismatch: expected
%d(%s), got %d(%s)",
+ c.expected.Num, c.expected.Unit.String(),
+ got.Num, got.Unit.String())
+ })
+ }
+}
+
+// TestGetTargetStageInterval_ConcurrentReaders pins down that the function is
+// safe for concurrent readers - lifecycle migration code calls it from
+// multiple goroutines (per-shard / per-group fan-out).
+func TestGetTargetStageInterval_ConcurrentReaders(t *testing.T) {
+ group := &GroupConfig{
+ Group: &commonv1.Group{
+ ResourceOpts: &commonv1.ResourceOpts{
+ SegmentInterval: dayInterval(5),
+ Stages: []*commonv1.LifecycleStage{
+ stage(stageWarm, selectorWarm, 7, 7),
+ stage(stageCold, selectorCold, 15, 30),
+ },
+ },
+ },
+ SegmentInterval: dayInterval(7),
+ TargetSegmentInterval: dayInterval(15),
+ }
+ expected := storage.IntervalRule{Unit: storage.DAY, Num: 15}
+
+ const goroutines = 64
+ const iterations = 1000
+ var wg sync.WaitGroup
+ wg.Add(goroutines)
+ for g := 0; g < goroutines; g++ {
+ go func() {
+ defer wg.Done()
+ for i := 0; i < iterations; i++ {
+ got := getTargetStageInterval(group)
+ if got != expected {
+ t.Errorf("concurrent read returned %v,
expected %v", got, expected)
+ return
+ }
+ }
+ }()
+ }
+ wg.Wait()
+}
diff --git a/banyand/backup/lifecycle/steps.go
b/banyand/backup/lifecycle/steps.go
index 700897840..080501883 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -86,12 +86,18 @@ func (l *lifecycleService) getSnapshots(groups
[]*commonv1.Group, p *Progress) (
// It contains all necessary information for migration and deletion operations.
type GroupConfig struct {
*commonv1.Group
- NodeSelector node.Selector
- QueueClient queue.Client
- AccumulatedTTL *commonv1.IntervalRule
+ NodeSelector node.Selector
+ QueueClient queue.Client
+ AccumulatedTTL *commonv1.IntervalRule
+ // SegmentInterval is the current stage's segment interval, used to read
+ // source segments on this node.
SegmentInterval *commonv1.IntervalRule
- TargetShardNum uint32
- TargetReplicas uint32
+ // TargetSegmentInterval is the next stage's segment interval. It
differs
+ // from SegmentInterval on 3-stage deployments (e.g. warm->cold), so any
+ // computation against the target tier's segment grid must use this
field.
+ TargetSegmentInterval *commonv1.IntervalRule
+ TargetShardNum uint32
+ TargetReplicas uint32
}
// Close releases resources held by the GroupConfig.
@@ -101,6 +107,17 @@ func (gc *GroupConfig) Close() {
}
}
+// cloneIntervalRule returns a deep copy of ir, or nil if ir is nil.
proto.Clone
+// applied to a typed-nil *commonv1.IntervalRule yields a non-nil zero value
+// (Num=0, Unit=UNIT_UNSPECIFIED) that downstream MustToIntervalRule rejects;
+// short-circuiting on nil keeps the GroupConfig fallback chain intact.
+func cloneIntervalRule(ir *commonv1.IntervalRule) *commonv1.IntervalRule {
+ if ir == nil {
+ return nil
+ }
+ return proto.Clone(ir).(*commonv1.IntervalRule)
+}
+
//nolint:contextcheck // health check goroutine uses context.Background()
func parseGroup(
g *commonv1.Group, nodeLabels map[string]string, nodes
[]*databasev1.Node,
@@ -113,9 +130,25 @@ func parseGroup(
if len(ro.Stages) == 0 {
return nil, fmt.Errorf("no stages in group %s", g.Metadata.Name)
}
+ // Validate IntervalRules up-front so later derefs (incl. Stages[i+1])
are safe.
+ if ro.Ttl == nil {
+ return nil, fmt.Errorf("group %s: missing ttl", g.Metadata.Name)
+ }
+ if ro.SegmentInterval == nil {
+ return nil, fmt.Errorf("group %s: missing segment_interval",
g.Metadata.Name)
+ }
+ for _, st := range ro.Stages {
+ if st.SegmentInterval == nil {
+ return nil, fmt.Errorf("group %s stage %s: missing
segment_interval", g.Metadata.Name, st.Name)
+ }
+ if st.Ttl == nil {
+ return nil, fmt.Errorf("group %s stage %s: missing
ttl", g.Metadata.Name, st.Name)
+ }
+ }
ttlTime := proto.Clone(ro.Ttl).(*commonv1.IntervalRule)
- segmentInterval :=
proto.Clone(ro.SegmentInterval).(*commonv1.IntervalRule)
+ segmentInterval := cloneIntervalRule(ro.SegmentInterval)
var nst *commonv1.LifecycleStage
+ var targetSegmentInterval *commonv1.IntervalRule
for i, st := range ro.Stages {
selector, err := pub.ParseLabelSelector(st.NodeSelector)
if err != nil {
@@ -130,14 +163,21 @@ func parseGroup(
return nil, nil
}
nst = ro.Stages[i+1]
- segmentInterval = st.SegmentInterval
- l.Info().Msgf("migrating group %s at stage %s to stage %s,
segment interval: %d(%s), total ttl needs: %d(%s)",
- g.Metadata.Name, st.Name, nst.Name,
segmentInterval.Num, segmentInterval.Unit.String(), ttlTime.Num,
ttlTime.Unit.String())
+ // Clone before exposing through GroupConfig so callers cannot
mutate
+ // the shared proto Stages[*] sub-objects.
+ segmentInterval = cloneIntervalRule(st.SegmentInterval)
+ targetSegmentInterval = cloneIntervalRule(nst.SegmentInterval)
+ l.Info().Msgf("migrating group %s at stage %s to stage %s,
source segment interval: %d(%s), target segment interval: %d(%s), total ttl
needs: %d(%s)",
+ g.Metadata.Name, st.Name, nst.Name,
+ segmentInterval.Num, segmentInterval.Unit.String(),
+ targetSegmentInterval.Num,
targetSegmentInterval.Unit.String(),
+ ttlTime.Num, ttlTime.Unit.String())
break
}
if nst == nil {
nst = ro.Stages[0]
ttlTime = proto.Clone(ro.Ttl).(*commonv1.IntervalRule)
+ targetSegmentInterval = cloneIntervalRule(nst.SegmentInterval)
l.Info().Msgf("no matching stage for group %s, defaulting to
first stage %s segment interval: %d(%s), total ttl needs: %d(%s)",
g.Metadata.Name, nst.Name, segmentInterval.Num,
segmentInterval.Unit.String(), ttlTime.Num, ttlTime.Unit.String())
}
@@ -179,13 +219,14 @@ func parseGroup(
clusterStateMgr.addRouteTable(t)
}
return &GroupConfig{
- Group: g,
- TargetShardNum: nst.ShardNum,
- TargetReplicas: nst.Replicas,
- AccumulatedTTL: ttlTime,
- SegmentInterval: segmentInterval,
- NodeSelector: nodeSel,
- QueueClient: client,
+ Group: g,
+ TargetShardNum: nst.ShardNum,
+ TargetReplicas: nst.Replicas,
+ AccumulatedTTL: ttlTime,
+ SegmentInterval: segmentInterval,
+ TargetSegmentInterval: targetSegmentInterval,
+ NodeSelector: nodeSel,
+ QueueClient: client,
}, nil
}
diff --git a/banyand/backup/lifecycle/steps_test.go
b/banyand/backup/lifecycle/steps_test.go
new file mode 100644
index 000000000..c1728b5f2
--- /dev/null
+++ b/banyand/backup/lifecycle/steps_test.go
@@ -0,0 +1,93 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+)
+
+// TestParseGroup_RejectsMissingIntervals verifies that parseGroup fails fast
+// with a descriptive error when required IntervalRule fields are nil, rather
+// than panicking later inside the migration log. parseGroup validates these
+// fields before touching metadata.Repo or the cluster state manager, so
+// passing nil for those parameters is safe in this test - if a future change
+// reorders parseGroup so the deps run first, the tests will panic clearly
+// and that contract should be revisited.
+func TestParseGroup_RejectsMissingIntervals(t *testing.T) {
+ makeGroup := func(mutate func(ro *commonv1.ResourceOpts))
*commonv1.Group {
+ ro := &commonv1.ResourceOpts{
+ ShardNum: 1,
+ SegmentInterval: dayInterval(1),
+ Ttl: dayInterval(7),
+ Stages: []*commonv1.LifecycleStage{
+ stage(stageWarm, selectorWarm, 7, 7),
+ stage(stageCold, selectorCold, 15, 30),
+ },
+ }
+ mutate(ro)
+ return &commonv1.Group{
+ Metadata: &commonv1.Metadata{Name: "test-group"},
+ ResourceOpts: ro,
+ }
+ }
+
+ cases := []struct {
+ name string
+ mutate func(*commonv1.ResourceOpts)
+ errFrag string
+ }{
+ {
+ name: "top-level ttl missing",
+ mutate: func(ro *commonv1.ResourceOpts) { ro.Ttl = nil
},
+ errFrag: "group test-group: missing ttl",
+ },
+ {
+ name: "top-level segment_interval missing",
+ mutate: func(ro *commonv1.ResourceOpts) {
ro.SegmentInterval = nil },
+ errFrag: "group test-group: missing segment_interval",
+ },
+ {
+ name: "stage segment_interval missing",
+ mutate: func(ro *commonv1.ResourceOpts) {
ro.Stages[0].SegmentInterval = nil },
+ errFrag: "group test-group stage warm: missing
segment_interval",
+ },
+ {
+ name: "stage ttl missing",
+ mutate: func(ro *commonv1.ResourceOpts) {
ro.Stages[0].Ttl = nil },
+ errFrag: "group test-group stage warm: missing ttl",
+ },
+ {
+ name: "next stage segment_interval missing",
+ mutate: func(ro *commonv1.ResourceOpts) {
ro.Stages[1].SegmentInterval = nil },
+ errFrag: "group test-group stage cold: missing
segment_interval",
+ },
+ }
+ for _, c := range cases {
+ t.Run(c.name, func(t *testing.T) {
+ g := makeGroup(c.mutate)
+ _, err := parseGroup(g, map[string]string{"type":
"warm"}, nil, nil, nil, nil)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), c.errFrag)
+ })
+ }
+}
diff --git a/banyand/internal/storage/rotation_test.go
b/banyand/internal/storage/rotation_test.go
index 9577c6736..698df3392 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -249,7 +249,11 @@ func TestRotationDisabled(t *testing.T) {
}
ctx := context.Background()
mc := timestamp.NewMockClock()
- ts, err := time.ParseInLocation("2006-01-02 15:04:05",
"2024-05-01 00:00:00", time.Local)
+ // 2024-05-02 is day 19845 since the Unix epoch which is
divisible by 3,
+ // so it lies on the 3-day epoch-aligned grid produced by
+ // IntervalRule.Standard. Picking an aligned day keeps the
assertions
+ // simple: 6 consecutive days collapse into exactly two 3-day
segments.
+ ts, err := time.ParseInLocation("2006-01-02 15:04:05",
"2024-05-02 00:00:00", time.UTC)
require.NoError(t, err)
mc.Set(ts)
ctx = timestamp.SetClock(ctx, mc)
@@ -267,11 +271,11 @@ func TestRotationDisabled(t *testing.T) {
db := tsdb.(*database[*MockTSTable, any])
segCtrl := db.segmentController
- // Simulate data arriving day by day for 6 days (mimicking
lifecycle migration)
- // Day 1 (05-01): already in segment [05-01, 05-04)
- // Day 2 (05-02): still in [05-01, 05-04)
- // Day 3 (05-03): still in [05-01, 05-04)
- // Day 4 (05-04): need new segment [05-04, 05-07)
+ // Simulate data arriving day by day for 6 days (mimicking
lifecycle migration).
+ // Day 1 (05-02): already in segment [05-02, 05-05)
+ // Day 2 (05-03): still in [05-02, 05-05)
+ // Day 3 (05-04): still in [05-02, 05-05)
+ // Day 4 (05-05): need new segment [05-05, 05-08)
for day := 0; day < 6; day++ {
dayTime := ts.AddDate(0, 0, day)
mc.Set(dayTime)
@@ -284,7 +288,7 @@ func TestRotationDisabled(t *testing.T) {
tsdb.Tick(maxTS.UnixNano())
}
- // Verify: only 2 segments created, both with correct 3-day
boundaries
+ // Verify: only 2 segments created, both on the 3-day
epoch-aligned grid.
segments, _ := segCtrl.segments(false)
defer func() {
for i := range segments {
@@ -293,13 +297,13 @@ func TestRotationDisabled(t *testing.T) {
}()
require.Equal(t, 2, len(segments), "expected 2 segments for 6
days with 3-day interval")
- // First segment: [05-01, 05-04)
- assert.Equal(t, "2024-05-01 00:00:00",
segments[0].Start.Format("2006-01-02 15:04:05"))
- assert.Equal(t, "2024-05-04 00:00:00",
segments[0].End.Format("2006-01-02 15:04:05"))
+ // First segment: [05-02, 05-05)
+ assert.Equal(t, "2024-05-02 00:00:00",
segments[0].Start.UTC().Format("2006-01-02 15:04:05"))
+ assert.Equal(t, "2024-05-05 00:00:00",
segments[0].End.UTC().Format("2006-01-02 15:04:05"))
- // Second segment: [05-04, 05-07)
- assert.Equal(t, "2024-05-04 00:00:00",
segments[1].Start.Format("2006-01-02 15:04:05"))
- assert.Equal(t, "2024-05-07 00:00:00",
segments[1].End.Format("2006-01-02 15:04:05"))
+ // Second segment: [05-05, 05-08)
+ assert.Equal(t, "2024-05-05 00:00:00",
segments[1].Start.UTC().Format("2006-01-02 15:04:05"))
+ assert.Equal(t, "2024-05-08 00:00:00",
segments[1].End.UTC().Format("2006-01-02 15:04:05"))
})
}
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 4f1490bec..58ac3dddd 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -579,19 +579,37 @@ func (sc *segmentController[T, O]) create(start
time.Time) (*segment[T, O], erro
}
}
options := sc.getOptions()
- start = options.SegmentInterval.Unit.Standard(start)
+ // Anchor stdEnd to the aligned start before any bump so end stays on
the
+ // global grid even when start is bumped past a legacy off-grid
neighbor;
+ // subsequent segments then self-heal back to the grid.
+ alignedStart := options.SegmentInterval.Standard(start)
+ stdEnd := options.SegmentInterval.NextTime(alignedStart)
+ start = alignedStart
+ // sc.lst is sorted ascending by start time with non-overlapping ranges;
+ // a single pass bumps start past every legacy segment that swallows it
+ // (each next segment.Start >= previous.End).
var next *segment[T, O]
for _, s := range sc.lst {
if s.Contains(start.UnixNano()) {
- return s, nil
+ start = s.End
+ continue
}
if next == nil && s.Start.After(start) {
next = s
}
}
- stdEnd := options.SegmentInterval.NextTime(start)
var end time.Time
if next != nil && next.Start.Before(stdEnd) {
+ // `next` starts inside the current grid bucket - a legacy
off-grid
+ // segment whose TTL hasn't elapsed. Cap end at next.Start to
avoid
+ // overlap; surfacing this at Info level lets operators see the
+ // abnormal span until the legacy neighbor ages out.
+ sc.l.Info().
+ Stringer("alignedStart", alignedStart).
+ Stringer("bumpedStart", start).
+ Stringer("nextStart", next.Start).
+ Stringer("stdEnd", stdEnd).
+ Msg("new segment span is shorter than configured
SegmentInterval due to an unaligned legacy neighbor")
end = next.Start
} else {
end = stdEnd
diff --git a/banyand/internal/storage/segment_test.go
b/banyand/internal/storage/segment_test.go
index f50e07603..2430de773 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -23,6 +23,8 @@ import (
"fmt"
"os"
"path/filepath"
+ "slices"
+ "sort"
"strconv"
"sync"
"sync/atomic"
@@ -1178,3 +1180,454 @@ func TestSegment_ConcurrentReopenAndClose_NoPanic(t
*testing.T) {
require.Nil(t, s.index, "segment %s should be cleaned up",
s.suffix)
}
}
+
+// newAlignmentTestController is a compact helper for tests covering the
+// epoch-aligned segment.create() behavior introduced by the lifecycle fix.
+// It spins up a real segmentController backed by a temp dir, with the given
+// SegmentInterval, and returns the controller plus a cleanup function.
+func newAlignmentTestController(
+ t *testing.T, interval IntervalRule,
+) (*segmentController[mockTSTable, mockTSTableOpener], string, func()) {
+ t.Helper()
+ tempDir, cleanup := setupTestEnvironment(t)
+
+ ctx := context.Background()
+ l := logger.GetLogger("test-alignment")
+ ctx = context.WithValue(ctx, logger.ContextKey, l)
+ ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+ return common.Position{
+ Database: "test-db",
+ Stage: "alignment",
+ }
+ })
+
+ opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+ TSTableCreator: func(_ fs.FileSystem, _ string, _
common.Position, _ *logger.Logger,
+ _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+ ) (mockTSTable, error) {
+ return mockTSTable{ID: common.ShardID(0)}, nil
+ },
+ ShardNum: 1,
+ SegmentInterval: interval,
+ TTL: IntervalRule{Unit: DAY, Num:
60},
+ SeriesIndexFlushTimeoutSeconds: 10,
+ SeriesIndexCacheMaxBytes: 1024 * 1024,
+ }
+
+ serviceCache := NewServiceCache().(*serviceCache)
+ sc := newSegmentController[mockTSTable, mockTSTableOpener](
+ ctx,
+ tempDir,
+ l,
+ opts,
+ nil,
+ nil,
+ 5*time.Minute,
+
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"),
opts.MemoryLimit),
+ serviceCache,
+ group,
+ )
+ return sc, tempDir, cleanup
+}
+
+// TestIntervalRule_Standard_AlignsToEpochGrid pins the alignment math itself.
+// Two timestamps in the same N*Unit bucket measured from epoch must round
+// down to the exact same instant; two timestamps in adjacent buckets must
+// round to instants exactly N*Unit apart.
+func TestIntervalRule_Standard_AlignsToEpochGrid(t *testing.T) {
+ //nolint:govet // fieldalignment: test struct optimization not critical
+ cases := []struct {
+ name string
+ ir IntervalRule
+ probes []time.Time
+ expected []time.Time
+ }{
+ {
+ name: "DAY Num=15 matches calculateTargetSegments grid",
+ ir: IntervalRule{Unit: DAY, Num: 15},
+ probes: []time.Time{
+ time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 16, 6, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 19, 23, 59, 59, 0, time.UTC),
+ time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 5, 7, 12, 30, 0, 0, time.UTC),
+ },
+ expected: []time.Time{
+ time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 5, 7, 0, 0, 0, 0, time.UTC),
+ },
+ },
+ {
+ name: "DAY Num=5",
+ ir: IntervalRule{Unit: DAY, Num: 5},
+ // 1970-01-01 + N*5 day boundaries near today:
2026-04-02 (day 20545),
+ // 2026-04-07 (day 20550), 2026-04-12 (day 20555).
+ probes: []time.Time{
+ time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 3, 6, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 6, 23, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+ },
+ expected: []time.Time{
+ time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+ },
+ },
+ {
+ name: "HOUR Num=6",
+ ir: IntervalRule{Unit: HOUR, Num: 6},
+ probes: []time.Time{
+ time.Date(2026, 4, 19, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 19, 5, 59, 59, 0, time.UTC),
+ time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 19, 11, 30, 0, 0, time.UTC),
+ time.Date(2026, 4, 19, 12, 0, 0, 0, time.UTC),
+ },
+ expected: []time.Time{
+ time.Date(2026, 4, 19, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 19, 0, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC),
+ time.Date(2026, 4, 19, 12, 0, 0, 0, time.UTC),
+ },
+ },
+ {
+ name: "DAY Num=1 matches old Unit.Standard
behavior",
+ ir: IntervalRule{Unit: DAY, Num: 1},
+ probes: []time.Time{time.Date(2026, 4, 19, 6, 30, 0,
0, time.UTC)},
+ expected: []time.Time{time.Date(2026, 4, 19, 0, 0, 0,
0, time.UTC)},
+ },
+ {
+ // Pre-epoch inputs: floor division puts them in the
bucket below
+ // rather than collapsing to epoch as truncated
division would.
+ name: "DAY Num=15 pre-epoch uses floor division",
+ ir: IntervalRule{Unit: DAY, Num: 15},
+ probes: []time.Time{
+ time.Date(1969, 12, 25, 0, 0, 0, 0, time.UTC),
+ time.Date(1969, 12, 31, 23, 59, 59, 0,
time.UTC),
+ },
+ expected: []time.Time{
+ time.Date(1969, 12, 17, 0, 0, 0, 0, time.UTC),
+ time.Date(1969, 12, 17, 0, 0, 0, 0, time.UTC),
+ },
+ },
+ }
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ require.Equal(t, len(tc.probes), len(tc.expected))
+ for i, p := range tc.probes {
+ got := tc.ir.Standard(p)
+ assert.Equal(t, tc.expected[i], got,
+ "Standard(%s) under %d %s expected %s
got %s",
+ p.Format(time.RFC3339), tc.ir.Num,
tc.ir.Unit.String(),
+ tc.expected[i].Format(time.RFC3339),
got.Format(time.RFC3339))
+ }
+ })
+ }
+}
+
+// TestIntervalRule_Standard_PreservesLocation verifies that the returned
+// time keeps the input's Location and lands on a local-day midnight (DAY)
+// or local-hour boundary (HOUR) anchored in the caller's timezone. Under the
+// per-timezone grid, the absolute instant naturally differs across timezones.
+func TestIntervalRule_Standard_PreservesLocation(t *testing.T) {
+ shanghai := time.FixedZone("CST", 8*3600)
+ losAngeles := time.FixedZone("PST", -8*3600)
+
+ cases := []struct {
+ probe time.Time
+ ir IntervalRule
+ }{
+ {time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC),
IntervalRule{Unit: DAY, Num: 15}},
+ {time.Date(2026, 4, 19, 11, 30, 0, 0, time.UTC),
IntervalRule{Unit: HOUR, Num: 6}},
+ }
+ for _, c := range cases {
+ for _, loc := range []*time.Location{time.UTC, shanghai,
losAngeles} {
+ got := c.ir.Standard(c.probe.In(loc))
+ assert.Equal(t, loc, got.Location(),
+ "Standard must return time in input.Location();
ir=%+v loc=%s", c.ir, loc)
+ switch c.ir.Unit {
+ case DAY:
+ assert.Equal(t, 0, got.Hour(), "DAY bucket must
align to local-day midnight")
+ assert.Equal(t, 0, got.Minute())
+ assert.Equal(t, 0, got.Second())
+ case HOUR:
+ assert.Equal(t, 0, got.Minute(), "HOUR bucket
must align to local-hour boundary")
+ assert.Equal(t, 0, got.Second())
+ }
+ }
+ }
+}
+
+// TestCreateSegment_OutOfOrderArrival_SameBucket reproduces the production
+// truncation observed on demo-banyandb-data-cold-0 on 2026-04-30 where a part
+// with MinTimestamp ~2026-04-19 created seg-20260419 first and a later-
+// processed part with MinTimestamp ~2026-04-16 produced a 3-day truncated
+// seg-20260416. With Standard() honoring Num, both timestamps resolve to the
+// same epoch-aligned bucket [04/07, 04/22), so the second create returns the
+// same segment instead of creating a truncated neighbor.
+func TestCreateSegment_OutOfOrderArrival_SameBucket(t *testing.T) {
+ sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY,
Num: 15})
+ defer cleanup()
+
+ laterFirst := time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC)
+ earlierAfter := time.Date(2026, 4, 16, 6, 0, 0, 0, time.UTC)
+ expectedBucketStart := time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC)
+ expectedBucketEnd := time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC)
+
+ seg1, err := sc.create(laterFirst)
+ require.NoError(t, err)
+ require.NotNil(t, seg1)
+ defer seg1.DecRef()
+
+ seg2, err := sc.create(earlierAfter)
+ require.NoError(t, err)
+ require.NotNil(t, seg2)
+
+ assert.Same(t, seg1, seg2,
+ "both timestamps fall in the same 15d bucket; create() must
reuse the existing segment")
+ assert.Equal(t, expectedBucketStart, seg1.Start,
+ "segment must start at the epoch-aligned bucket boundary, not
at the first arrival's day")
+ assert.Equal(t, expectedBucketEnd, seg1.End)
+ assert.Equal(t, 15*24*time.Hour, seg1.End.Sub(seg1.Start))
+}
+
+// TestCreateSegment_OutOfOrderArrival_AdjacentBuckets covers the case where
+// the late-arrival timestamp falls in a strictly different epoch bucket from
+// the first-arrival one. The new segment must occupy a clean N*Unit span
+// without being truncated by the existing later segment - the truncation
+// branch in create() must collapse to "stdEnd == next.Start" exactly, leaving
+// span = N*Unit.
+func TestCreateSegment_OutOfOrderArrival_AdjacentBuckets(t *testing.T) {
+ sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY,
Num: 15})
+ defer cleanup()
+
+ laterFirst := time.Date(2026, 5, 5, 6, 0, 0, 0, time.UTC) // bucket
[04/22, 05/07)
+ earlierAfter := time.Date(2026, 4, 15, 6, 0, 0, 0, time.UTC) // bucket
[04/07, 04/22)
+
+ seg1, err := sc.create(laterFirst)
+ require.NoError(t, err)
+ require.NotNil(t, seg1)
+ defer seg1.DecRef()
+ assert.Equal(t, time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC),
seg1.Start)
+ assert.Equal(t, time.Date(2026, 5, 7, 0, 0, 0, 0, time.UTC), seg1.End)
+ assert.Equal(t, 15*24*time.Hour, seg1.End.Sub(seg1.Start))
+
+ seg2, err := sc.create(earlierAfter)
+ require.NoError(t, err)
+ require.NotNil(t, seg2)
+ defer seg2.DecRef()
+ assert.NotSame(t, seg1, seg2,
+ "timestamps in different buckets must not collapse to one
segment")
+ assert.Equal(t, time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC), seg2.Start)
+ assert.Equal(t, time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC), seg2.End,
+ "new earlier segment must abut next.Start with
stdEnd==next.Start (no internal truncation)")
+ assert.Equal(t, 15*24*time.Hour, seg2.End.Sub(seg2.Start))
+}
+
+// TestCreateSegment_HourInterval_OutOfOrder verifies the same alignment
+// guarantee for HOUR-based segment intervals (Num=6).
+func TestCreateSegment_HourInterval_OutOfOrder(t *testing.T) {
+ sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit:
HOUR, Num: 6})
+ defer cleanup()
+
+ t1 := time.Date(2026, 4, 19, 11, 30, 0, 0, time.UTC) // bucket [06:00,
12:00)
+ t2 := time.Date(2026, 4, 19, 8, 15, 0, 0, time.UTC) // same bucket
+ t3 := time.Date(2026, 4, 19, 13, 0, 0, 0, time.UTC) // bucket [12:00,
18:00)
+
+ seg1, err := sc.create(t1)
+ require.NoError(t, err)
+ defer seg1.DecRef()
+ assert.Equal(t, time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC),
seg1.Start)
+ assert.Equal(t, 6*time.Hour, seg1.End.Sub(seg1.Start))
+
+ seg2, err := sc.create(t2)
+ require.NoError(t, err)
+ assert.Same(t, seg1, seg2, "08:15 must reuse the [06:00, 12:00)
segment")
+
+ seg3, err := sc.create(t3)
+ require.NoError(t, err)
+ defer seg3.DecRef()
+ assert.NotSame(t, seg1, seg3)
+ assert.Equal(t, time.Date(2026, 4, 19, 12, 0, 0, 0, time.UTC),
seg3.Start)
+ assert.Equal(t, 6*time.Hour, seg3.End.Sub(seg3.Start))
+}
+
+// TestCreateSegment_ConcurrentCreates_DeterministicAlignment exercises the
+// receiver under contention: many goroutines call create() with random
+// timestamps spread across a window that covers multiple epoch-aligned
+// buckets. Whatever interleaving the scheduler picks, the resulting segment
+// set must (a) cover every probe timestamp, (b) all start on epoch-aligned
+// boundaries, (c) every span be exactly the configured N*Unit, and (d) be
+// non-overlapping. This pins down the property that motivated the fix:
+// alignment is determined by the global grid, not by arrival order.
+func TestCreateSegment_ConcurrentCreates_DeterministicAlignment(t *testing.T) {
+ sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY,
Num: 15})
+ defer cleanup()
+
+ // Probe range covers 4 epoch-aligned 15d buckets:
+ // [03/08, 03/23), [03/23, 04/07), [04/07, 04/22), [04/22, 05/07).
+ rangeStart := time.Date(2026, 3, 10, 0, 0, 0, 0, time.UTC)
+ rangeEndExclusive := time.Date(2026, 5, 5, 0, 0, 0, 0, time.UTC)
+ rangeNanos := rangeEndExclusive.UnixNano() - rangeStart.UnixNano()
+
+ const goroutines = 32
+ const probesPerGoroutine = 64
+ const knuthMul = uint64(0x9E3779B97F4A7C15)
+ probes := make([]time.Time, 0, goroutines*probesPerGoroutine)
+ for i := 0; i < goroutines*probesPerGoroutine; i++ {
+ offset := int64((uint64(i+1)*knuthMul)>>1) % rangeNanos
+ probes = append(probes, time.Unix(0,
rangeStart.UnixNano()+offset).UTC())
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(goroutines)
+ for g := 0; g < goroutines; g++ {
+ go func(idx int) {
+ defer wg.Done()
+ start := idx * probesPerGoroutine
+ end := start + probesPerGoroutine
+ for _, ts := range probes[start:end] {
+ seg, err := sc.create(ts)
+ if err != nil {
+ t.Errorf("create(%s) returned error:
%v", ts.Format(time.RFC3339), err)
+ return
+ }
+ if seg == nil {
+ t.Errorf("create(%s) returned nil
segment", ts.Format(time.RFC3339))
+ return
+ }
+ if !seg.Contains(ts.UnixNano()) {
+ t.Errorf("probe %s landed in segment
[%s, %s) which does not contain it",
+ ts.Format(time.RFC3339),
seg.Start.Format(time.RFC3339), seg.End.Format(time.RFC3339))
+ }
+ seg.DecRef()
+ }
+ }(g)
+ }
+ wg.Wait()
+
+ sc.RLock()
+ segs := slices.Clone(sc.lst)
+ sc.RUnlock()
+
+ require.NotEmpty(t, segs, "concurrent creates must produce at least one
segment")
+ sort.Slice(segs, func(i, j int) bool { return
segs[i].Start.Before(segs[j].Start) })
+
+ epoch := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
+ fifteenDays := 15 * 24 * time.Hour
+ for i, seg := range segs {
+ assert.Equal(t, fifteenDays, seg.End.Sub(seg.Start),
+ "segment[%d] %s span must be exactly 15d, got %v",
+ i, seg.suffix, seg.End.Sub(seg.Start))
+ offset := seg.Start.Sub(epoch)
+ assert.Equal(t, time.Duration(0), offset%fifteenDays,
+ "segment[%d] start %s must be on the epoch-aligned 15d
grid",
+ i, seg.Start.Format(time.RFC3339))
+ if i > 0 {
+ assert.False(t, segs[i-1].End.After(seg.Start),
+ "segments must not overlap: prev.End=%s
next.Start=%s",
+ segs[i-1].End.Format(time.RFC3339),
seg.Start.Format(time.RFC3339))
+ }
+ }
+}
+
+// TestCreateSegment_LegacyOffGridNeighbour_TransitionThenGrid: a pre-fix
+// off-grid segment on disk must produce a transition segment that ends on
+// the global grid, so subsequent segments self-heal back to it.
+func TestCreateSegment_LegacyOffGridNeighbour_TransitionThenGrid(t *testing.T)
{
+ sc, tempDir, cleanup := newAlignmentTestController(t,
IntervalRule{Unit: DAY, Num: 15})
+ defer cleanup()
+
+ ctx := context.Background()
+ l := logger.GetLogger("test-legacy-transition")
+ ctx = context.WithValue(ctx, logger.ContextKey, l)
+ ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+ return common.Position{Database: "test-db", Stage: "cold"}
+ })
+
+ // Inject a legacy off-grid segment [2026-05-01, 2026-05-16). The new
15d
+ // epoch grid puts buckets at 04/22, 05/07, 05/22, 06/06, ...; legacy
+ // straddles 05/07 and ends inside [05/07, 05/22).
+ legacyStart := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)
+ legacyEnd := time.Date(2026, 5, 16, 0, 0, 0, 0, time.UTC)
+ suffix := legacyStart.Format(dayFormat)
+ segPath := filepath.Join(tempDir, fmt.Sprintf("seg-%s", suffix))
+ require.NoError(t, os.MkdirAll(segPath, DirPerm))
+ require.NoError(t, os.WriteFile(filepath.Join(segPath,
metadataFilename), []byte(currentVersion), FilePerm))
+ legacy, err := sc.openSegment(ctx, legacyStart, legacyEnd, segPath,
suffix, sc.groupCache)
+ require.NoError(t, err)
+ sc.lst = append(sc.lst, legacy)
+ sc.sortLst()
+
+ // First write past legacy.End: aligned start (05/07) is inside legacy,
+ // bump pushes start to 05/16 (legacy.End), but stdEnd was locked to the
+ // original grid bucket end (05/22). So the transition segment is
shorter
+ // than 15d but still ends on the grid.
+ probe := time.Date(2026, 5, 17, 6, 0, 0, 0, time.UTC)
+ transition, err := sc.create(probe)
+ require.NoError(t, err)
+ require.NotNil(t, transition)
+ defer transition.DecRef()
+
+ gridBoundary := time.Date(2026, 5, 22, 0, 0, 0, 0, time.UTC)
+ assert.True(t, transition.Contains(probe.UnixNano()),
+ "transition segment must cover the probe; got [%s, %s)",
+ transition.Start.Format(time.RFC3339),
transition.End.Format(time.RFC3339))
+ assert.Equal(t, legacyEnd, transition.Start,
+ "start must be bumped to legacy.End (no overlap, no misroute)")
+ assert.Equal(t, gridBoundary, transition.End,
+ "end must land on the next 15d-from-epoch boundary, not
legacy.End+15d")
+ assert.Less(t, transition.End.Sub(transition.Start), 15*24*time.Hour,
+ "transition segment span is expected to be shorter than the
configured SegmentInterval")
+
+ // Second write past the transition segment: aligned start lands cleanly
+ // on the next grid bucket [05/22, 06/06) and produces a full 15d
segment.
+ postProbe := time.Date(2026, 5, 25, 6, 0, 0, 0, time.UTC)
+ gridSeg, err := sc.create(postProbe)
+ require.NoError(t, err)
+ require.NotNil(t, gridSeg)
+ defer gridSeg.DecRef()
+
+ assert.Equal(t, gridBoundary, gridSeg.Start,
+ "first post-transition segment must start on the global grid")
+ assert.Equal(t, time.Date(2026, 6, 6, 0, 0, 0, 0, time.UTC),
gridSeg.End,
+ "first post-transition segment must span a full 15d on the
grid")
+ assert.Equal(t, 15*24*time.Hour, gridSeg.End.Sub(gridSeg.Start),
+ "transition self-heals: subsequent segments are full Num*Unit")
+}
+
+// TestCreateSegment_PersistedMetadataReflectsAlignedRange writes a segment
+// via create() with an unaligned timestamp, then verifies that the on-disk
+// metadata records the epoch-aligned start/end - the alignment must survive
+// a process restart.
+func TestCreateSegment_PersistedMetadataReflectsAlignedRange(t *testing.T) {
+ sc, tempDir, cleanup := newAlignmentTestController(t,
IntervalRule{Unit: DAY, Num: 15})
+ defer cleanup()
+
+ probe := time.Date(2026, 4, 16, 6, 30, 0, 0, time.UTC)
+ expectedStart := time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC)
+ expectedEnd := time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC)
+
+ seg, err := sc.create(probe)
+ require.NoError(t, err)
+ require.NotNil(t, seg)
+ assert.Equal(t, expectedStart, seg.Start)
+ assert.Equal(t, expectedEnd, seg.End)
+ seg.DecRef()
+
+ suffix := expectedStart.Format(dayFormat)
+ metadataPath := filepath.Join(tempDir, fmt.Sprintf("seg-%s", suffix),
metadataFilename)
+ rawMeta, readErr := os.ReadFile(metadataPath)
+ require.NoError(t, readErr)
+ meta, parseErr := readSegmentMeta(rawMeta)
+ require.NoError(t, parseErr)
+ assert.Equal(t, currentVersion, meta.Version)
+ assert.Equal(t, expectedEnd.Format(time.RFC3339Nano), meta.EndTime,
+ "persisted endTime must reflect the epoch-aligned bucket end,
not start+15d from the probe day")
+}
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index 799f42b82..5e54012ec 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -202,6 +202,45 @@ func (ir IntervalRule) NextTime(current time.Time)
time.Time {
panic("invalid interval unit")
}
+// Standard aligns t down to the nearest Num*Unit boundary on a grid anchored
+// at 1970-01-01 in t.Location(). Boundaries always fall on local-day midnight
+// (DAY) or local-hour boundaries (HOUR), so the result format/parses cleanly
+// in t.Location() and Num=1 reduces to IntervalUnit.Standard's local
alignment.
+// The grid is per-timezone, so two nodes in different geographic timezones may
+// produce different bucket starts for the same instant.
+func (ir IntervalRule) Standard(t time.Time) time.Time {
+ if ir.Num <= 0 {
+ logger.Panicf("interval rule Num must be positive: %+v", ir)
+ }
+ if ir.Num == 1 {
+ return ir.Unit.Standard(t)
+ }
+ epochLocal := time.Date(1970, 1, 1, 0, 0, 0, 0, t.Location())
+ switch ir.Unit {
+ case DAY:
+ todayMidnight := time.Date(t.Year(), t.Month(), t.Day(), 0, 0,
0, 0, t.Location())
+ // +12 absorbs DST-affected days that are 23 or 25 absolute
hours;
+ // floorDiv handles pre-epoch (negative) inputs correctly.
+ days :=
floorDiv(int64(todayMidnight.Sub(epochLocal).Hours()+12), 24)
+ bucketIdx := floorDiv(days, int64(ir.Num))
+ return time.Date(1970, 1, 1+int(bucketIdx)*ir.Num, 0, 0, 0, 0,
t.Location())
+ case HOUR:
+ todayHour := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(),
0, 0, 0, t.Location())
+ hours := floorDiv(int64(todayHour.Sub(epochLocal)),
int64(time.Hour))
+ bucketIdx := floorDiv(hours, int64(ir.Num))
+ return time.Date(1970, 1, 1, int(bucketIdx)*ir.Num, 0, 0, 0,
t.Location())
+ }
+ panic("invalid interval unit")
+}
+
+func floorDiv(a, b int64) int64 {
+ q := a / b
+ if a%b != 0 && (a < 0) != (b < 0) {
+ q--
+ }
+ return q
+}
+
func (ir IntervalRule) estimatedDuration() time.Duration {
switch ir.Unit {
case HOUR:
diff --git a/banyand/internal/wqueue/wqueue.go
b/banyand/internal/wqueue/wqueue.go
index ae2ab8338..2ea61ffa9 100644
--- a/banyand/internal/wqueue/wqueue.go
+++ b/banyand/internal/wqueue/wqueue.go
@@ -237,7 +237,7 @@ func (q *Queue[S, O]) getShard(shardID common.ShardID)
*Shard[S] {
// It uses the Queue's SegmentInterval to generate the time range.
func (q *Queue[S, O]) GetTimeRange(ts time.Time) timestamp.TimeRange {
opts := q.getOpts()
- start := opts.SegmentInterval.Unit.Standard(ts)
+ start := opts.SegmentInterval.Standard(ts)
end := opts.SegmentInterval.NextTime(start)
return timestamp.NewSectionTimeRange(start, end)
}
diff --git a/banyand/internal/wqueue/wqueue_test.go
b/banyand/internal/wqueue/wqueue_test.go
index 91c14ff4b..2dc6b310f 100644
--- a/banyand/internal/wqueue/wqueue_test.go
+++ b/banyand/internal/wqueue/wqueue_test.go
@@ -68,6 +68,9 @@ func TestQueue_GetTimeRange(t *testing.T) {
expectedEnd: time.Date(2023, 12, 16, 0, 0, 0, 0,
time.UTC),
},
{
+ // Hour-since-epoch for 2023-12-15 14:00 UTC is 472958,
which is even,
+ // so under Num=2 the bucket boundary already coincides
with 14:00 -
+ // epoch alignment matches plain hour alignment in this
case.
name: "multiple hours interval",
segmentInterval: storage.IntervalRule{
Unit: storage.HOUR,
@@ -78,14 +81,30 @@ func TestQueue_GetTimeRange(t *testing.T) {
expectedEnd: time.Date(2023, 12, 15, 16, 0, 0, 0,
time.UTC),
},
{
- name: "multiple days interval",
+ // Hour-since-epoch for 2023-12-15 15:00 UTC is 472959
(odd), so
+ // under Num=2 the bucket aligns down to 14:00.
+ name: "multiple hours interval (odd hour drops to even
boundary)",
+ segmentInterval: storage.IntervalRule{
+ Unit: storage.HOUR,
+ Num: 2,
+ },
+ inputTime: time.Date(2023, 12, 15, 15, 30, 45,
123456789, time.UTC),
+ expectedStart: time.Date(2023, 12, 15, 14, 0, 0, 0,
time.UTC),
+ expectedEnd: time.Date(2023, 12, 15, 16, 0, 0, 0,
time.UTC),
+ },
+ {
+ // Day-since-epoch for 2023-12-15 is 19706. Under Num=3
the bucket
+ // aligns down to 19704 -> 2023-12-13 .. 2023-12-16.
The previous
+ // expectation of 12/15..12/18 was written against the
historical
+ // bug that aligned only to the day, not to N*day from
epoch.
+ name: "multiple days interval (aligned to N*day from
epoch)",
segmentInterval: storage.IntervalRule{
Unit: storage.DAY,
Num: 3,
},
inputTime: time.Date(2023, 12, 15, 14, 30, 45,
123456789, time.UTC),
- expectedStart: time.Date(2023, 12, 15, 0, 0, 0, 0,
time.UTC),
- expectedEnd: time.Date(2023, 12, 18, 0, 0, 0, 0,
time.UTC),
+ expectedStart: time.Date(2023, 12, 13, 0, 0, 0, 0,
time.UTC),
+ expectedEnd: time.Date(2023, 12, 16, 0, 0, 0, 0,
time.UTC),
},
}