This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 0dd5d684c align part sync timestamp to segment grid so part and sidx
land in the same segment (#1136)
0dd5d684c is described below
commit 0dd5d684c7d5b82cab535551fa8ea09dbaafb440
Author: mrproliu <[email protected]>
AuthorDate: Mon May 18 16:34:46 2026 +0800
align part sync timestamp to segment grid so part and sidx land in the same
segment (#1136)
---
banyand/internal/storage/segment.go | 6 +
banyand/internal/storage/storage.go | 2 +
banyand/internal/storage/tsdb.go | 4 +
banyand/measure/write_data.go | 3 +-
banyand/measure/write_data_segmentref_test.go | 168 +++++++++++++++++++++++++-
banyand/stream/write_data.go | 3 +-
banyand/stream/write_data_segmentref_test.go | 136 ++++++++++++++++++++-
banyand/trace/write_data.go | 3 +-
banyand/trace/write_data_segmentref_test.go | 136 ++++++++++++++++++++-
9 files changed, 455 insertions(+), 6 deletions(-)
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 4350dd9b0..5204842a9 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -402,6 +402,12 @@ func (sc *segmentController[T, O]) getOptions()
*TSDBOpts[T, O] {
return sc.opts
}
+func (sc *segmentController[T, O]) getSegmentInterval() IntervalRule {
+ sc.optsMutex.RLock()
+ defer sc.optsMutex.RUnlock()
+ return sc.opts.SegmentInterval
+}
+
func (sc *segmentController[T, O]) updateOptions(resourceOpts
*commonv1.ResourceOpts) {
sc.optsMutex.Lock()
defer sc.optsMutex.Unlock()
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index 5e54012ec..fc33a1ffa 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -112,6 +112,8 @@ type TSDB[T TSTable, O any] interface {
io.Closer
CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error)
SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error)
+ // SegmentInterval returns the current segment interval rule.
+ SegmentInterval() IntervalRule
Tick(ts int64)
UpdateOptions(opts *commonv1.ResourceOpts)
TakeFileSnapshot(dst string) (bool, error)
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 5264a7b94..d6e0405b4 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -281,6 +281,10 @@ func (d *database[T, O]) SelectSegments(timeRange
timestamp.TimeRange) ([]Segmen
return d.segmentController.selectSegments(timeRange)
}
+func (d *database[T, O]) SegmentInterval() IntervalRule {
+ return d.segmentController.getSegmentInterval()
+}
+
func (d *database[T, O]) UpdateOptions(resourceOpts *commonv1.ResourceOpts) {
if d.closed.Load() {
return
diff --git a/banyand/measure/write_data.go b/banyand/measure/write_data.go
index c37b52e60..c6079f365 100644
--- a/banyand/measure/write_data.go
+++ b/banyand/measure/write_data.go
@@ -117,7 +117,8 @@ func (s *syncCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext) (que
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
return nil, err
}
- segmentTime := time.Unix(0, ctx.MinTimestamp)
+ // Align to the segment grid so part-chunk sync and sidx land in the
same segment.
+ segmentTime := tsdb.SegmentInterval().Standard(time.Unix(0,
ctx.MinTimestamp))
segment, err := tsdb.CreateSegmentIfNotExist(segmentTime)
if err != nil {
s.l.Error().Err(err).Str("group",
ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment")
diff --git a/banyand/measure/write_data_segmentref_test.go
b/banyand/measure/write_data_segmentref_test.go
index 0606cc204..b71f5e742 100644
--- a/banyand/measure/write_data_segmentref_test.go
+++ b/banyand/measure/write_data_segmentref_test.go
@@ -122,7 +122,10 @@ func
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
db := openTestTSDBForRefTest(t, tmpPath, &openShardCount)
defer db.Close()
- segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
+ // CreatePartHandler reinterprets ctx.MinTimestamp via time.Unix in
Local
+ // tz before alignment; keep the warmup segment on the same tz so both
+ // resolve to the same segment instant.
+ segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local)
warmup, err := db.CreateSegmentIfNotExist(segTime)
require.NoError(t, err)
@@ -248,3 +251,166 @@ type fakeGroup struct {
func (f *fakeGroup) GetSchema() *commonv1.Group { return nil }
func (f *fakeGroup) SupplyTSDB() io.Closer { return f.tsdb }
+
+// TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp covers the
+// regression where the part chunk-sync receiver passed the raw MinTimestamp
+// straight into CreateSegmentIfNotExist. Real data points carry a timestamp
+// anywhere inside a segment window, while the matching sidx Insert/Update
+// arrives from liaison with a grid-aligned start. Without alignment here,
+// part and sidx end up in different segments. The fix is to feed every
+// raw MinTimestamp through TSDB.SegmentInterval().Standard before creating
+// the segment so both paths converge on the same aligned start.
+func TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp(t
*testing.T) {
+ t.Run("DAY(1)", func(t *testing.T) {
+ runOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, Num:
1})
+ })
+ t.Run("DAY(5)", func(t *testing.T) {
+ runOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, Num:
5})
+ })
+}
+
+func runOffGridCase(t *testing.T, ir storage.IntervalRule) {
+ t.Helper()
+ tmpPath, cleanup := test.Space(require.New(t))
+ defer cleanup()
+
+ const groupName = "off-grid-group"
+ db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+ defer db.Close()
+
+ // Pick a raw timestamp deliberately off-grid: an arbitrary 06:00 in
local
+ // time inside the current grid bucket. Without the alignment fix the
+ // receiver would build a segment whose Start equals this raw ts (modulo
+ // rotation to a 0-second wall instant), which never matches the
Standard
+ // grid that liaison-side sidx uses.
+ rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+ wantStart := ir.Standard(rawTS)
+
+ callback := &syncCallback{
+ l: logger.GetLogger("test-offgrid"),
+ schemaRepo: newTestSchemaRepo(db, groupName),
+ }
+ ctx := &queue.ChunkedSyncPartContext{
+ ID: 1,
+ Group: groupName,
+ ShardID: 0,
+ MinTimestamp: rawTS.UnixNano(),
+ MaxTimestamp: rawTS.UnixNano(),
+ }
+ handler, err := callback.CreatePartHandler(ctx)
+ require.NoError(t, err)
+ defer func() { _ = handler.(*syncPartContext).Close() }()
+ partCtx := handler.(*syncPartContext)
+ require.NotNil(t, partCtx.segment)
+
+ gotStart := partCtx.segment.GetTimeRange().Start
+ require.True(t, gotStart.Equal(wantStart),
+ "REGRESSION: raw MinTimestamp=%s landed in segment start=%s;
expected aligned start=%s (ir=%+v)",
+ rawTS, gotStart, wantStart, ir)
+ // Sanity: assert the raw ts itself would NOT have produced this start —
+ // otherwise the test passes vacuously when raw happens to land on grid.
+ require.False(t, rawTS.Equal(wantStart),
+ "test setup is degenerate: rawTS already aligned, cannot detect
missing Standard call")
+}
+
+// TestSegmentCreateTS_ConsistencyAcrossPaths verifies that both the part
+// chunk-sync receiver (Path A, raw ts straight from a part's MinTimestamp)
+// and the sidx series-sync receiver (Path B, ts pre-standardized at the
+// liaison) converge on the SAME segment Start. A pre-existing aligned
+// segment in an older grid bucket simulates historical data on disk — it
+// must NOT be touched, and the two paths' new writes must land in the
+// same new bucket as each other.
+//
+// This is the regression demo for the PR #1120 fault where Path A used
+// raw ts while Path B (sidx via liaison) used Standard ts, so the same
+// real datapoint produced part data in one segment and sidx data in
+// another.
+func TestSegmentCreateTS_ConsistencyAcrossPaths(t *testing.T) {
+ tmpPath, cleanup := test.Space(require.New(t))
+ defer cleanup()
+
+ const groupName = "consistency-group"
+ ir := storage.IntervalRule{Unit: storage.DAY, Num: 5}
+ db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+ defer db.Close()
+
+ // Historical segment: pre-create an aligned segment in a much older
grid
+ // bucket so the test cluster looks like it has prior data on disk.
+ historicalRaw := time.Date(2026, 4, 10, 0, 0, 0, 0, time.Local)
+ historicalStart := ir.Standard(historicalRaw)
+ histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
+ require.NoError(t, err)
+ histSeg.DecRef()
+ histSeg.DecRef() // refCount->0, idle-closed
+
+ // New raw datapoint with an off-grid (06:00 inside a 5-day bucket) ts.
+ // liaison-side sidx pre-standardizes it; data-node-side part chunk-sync
+ // also runs it through Standard now.
+ rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+ wantNewStart := ir.Standard(rawTS)
+ require.False(t, wantNewStart.Equal(historicalStart),
+ "test setup degenerate: new bucket coincides with historical
bucket")
+ require.False(t, wantNewStart.Equal(rawTS),
+ "test setup degenerate: rawTS already on grid, cannot detect
Standard")
+
+ repo := newTestSchemaRepo(db, groupName)
+
+ // Path A: part chunk-sync receiver gets the raw MinTimestamp from the
+ // transferred part's metadata. Aligned internally by the fix.
+ chunkCallback := &syncCallback{l: logger.GetLogger("test-pathA"),
schemaRepo: repo}
+ handlerA, err :=
chunkCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+ ID: 1,
+ Group: groupName,
+ ShardID: 0,
+ MinTimestamp: rawTS.UnixNano(),
+ MaxTimestamp: rawTS.UnixNano(),
+ })
+ require.NoError(t, err)
+ defer func() { _ = handlerA.(*syncPartContext).Close() }()
+ gotStartA := handlerA.(*syncPartContext).segment.GetTimeRange().Start
+
+ // Path B: sidx series-sync receiver gets a MinTimestamp the liaison has
+ // already aligned via Standard. The handler uses it as-is.
+ seriesCallback := &syncSeriesCallback{l:
logger.GetLogger("test-pathB"), schemaRepo: repo}
+ handlerB, err :=
seriesCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+ ID: 2,
+ Group: groupName,
+ ShardID: 0,
+ MinTimestamp: wantNewStart.UnixNano(),
+ MaxTimestamp: wantNewStart.UnixNano(),
+ })
+ require.NoError(t, err)
+ defer func() { _ = handlerB.(*syncSeriesContext).Close() }()
+ gotStartB := handlerB.(*syncSeriesContext).segment.GetTimeRange().Start
+
+ require.True(t, gotStartA.Equal(gotStartB),
+ "REGRESSION: Path A (raw ts %s -> segment %s) and Path B
(aligned ts %s -> segment %s) diverged",
+ rawTS, gotStartA, wantNewStart, gotStartB)
+ require.True(t, gotStartA.Equal(wantNewStart),
+ "path A segment start %s != expected aligned %s", gotStartA,
wantNewStart)
+ require.False(t, gotStartA.Equal(historicalStart),
+ "new write must NOT land in historical segment %s",
historicalStart)
+}
+
+func openTestTSDBWithInterval(t *testing.T, tmpPath, groupName string, ir
storage.IntervalRule) storage.TSDB[*tsTable, option] {
+ t.Helper()
+ opts := storage.TSDBOpts[*tsTable, option]{
+ ShardNum: 1,
+ Location: filepath.Join(tmpPath, "tab"),
+ TSTableCreator: newTSTable,
+ SegmentInterval: ir,
+ TTL: storage.IntervalRule{Unit: ir.Unit, Num: 60},
+ Option: option{protector: protector.Nop{},
mergePolicy: newDefaultMergePolicyForTesting()},
+ }
+ require.NoError(t, os.MkdirAll(opts.Location, storage.DirPerm))
+ ctx := common.SetPosition(
+ context.WithValue(context.Background(), logger.ContextKey,
logger.GetLogger("test-offgrid")),
+ func(p common.Position) common.Position {
+ p.Database = "test-offgrid"
+ return p
+ },
+ )
+ db, err := storage.OpenTSDB[*tsTable, option](ctx, opts, nil, groupName)
+ require.NoError(t, err)
+ return db
+}
diff --git a/banyand/stream/write_data.go b/banyand/stream/write_data.go
index 76f4e4ced..8506a36e7 100644
--- a/banyand/stream/write_data.go
+++ b/banyand/stream/write_data.go
@@ -116,7 +116,8 @@ func (s *syncCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext) (que
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
return nil, err
}
- segmentTime := time.Unix(0, ctx.MinTimestamp)
+ // Align to the segment grid so part-chunk sync and sidx land in the
same segment.
+ segmentTime := tsdb.SegmentInterval().Standard(time.Unix(0,
ctx.MinTimestamp))
segment, err := tsdb.CreateSegmentIfNotExist(segmentTime)
if err != nil {
s.l.Error().Err(err).Str("group",
ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment")
diff --git a/banyand/stream/write_data_segmentref_test.go
b/banyand/stream/write_data_segmentref_test.go
index 81784ac1c..509d333b9 100644
--- a/banyand/stream/write_data_segmentref_test.go
+++ b/banyand/stream/write_data_segmentref_test.go
@@ -114,6 +114,12 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
// syncCallback.CreatePartHandler through a minimal schemaRepo shim and
// asserts that segment ownership transfers to partCtx and that Close
// actually DecRefs. See trace package for rationale.
+// In each StoresSegment test, segTime must use time.Local because the
+// callback path runs time.Unix(0,ns) -> Standard -> CreateSegmentIfNotExist
+// in Local tz. If the warmup segment were created in UTC, the two calls
+// could represent the same instant but format to different segment-suffix
+// strings, causing a directory-name mismatch instead of reusing the same
+// segment.
func TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
tmpPath, cleanup := test.Space(require.New(t))
defer cleanup()
@@ -122,7 +128,7 @@ func
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
db := openTestTSDBForRefTest(t, tmpPath, &openShardCount)
defer db.Close()
- segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
+ segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local)
warmup, err := db.CreateSegmentIfNotExist(segTime)
require.NoError(t, err)
@@ -246,3 +252,131 @@ type fakeGroup struct {
func (f *fakeGroup) GetSchema() *commonv1.Group { return nil }
func (f *fakeGroup) SupplyTSDB() io.Closer { return f.tsdb }
+
+// TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp covers
+// the regression where the stream part chunk-sync receiver passed the raw
+// MinTimestamp straight into CreateSegmentIfNotExist. See the analogous
+// measure test for full rationale.
+func TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp(t
*testing.T) {
+ t.Run("DAY(1)", func(t *testing.T) {
+ runStreamOffGridCase(t, storage.IntervalRule{Unit: storage.DAY,
Num: 1})
+ })
+ t.Run("DAY(5)", func(t *testing.T) {
+ runStreamOffGridCase(t, storage.IntervalRule{Unit: storage.DAY,
Num: 5})
+ })
+}
+
+func runStreamOffGridCase(t *testing.T, ir storage.IntervalRule) {
+ t.Helper()
+ tmpPath, cleanup := test.Space(require.New(t))
+ defer cleanup()
+
+ const groupName = "off-grid-stream"
+ db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+ defer db.Close()
+
+ rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+ wantStart := ir.Standard(rawTS)
+
+ callback := &syncCallback{
+ l: logger.GetLogger("test-offgrid-stream"),
+ schemaRepo: newTestSchemaRepo(db, groupName),
+ }
+ handler, err :=
callback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+ ID: 1,
+ Group: groupName,
+ ShardID: 0,
+ MinTimestamp: rawTS.UnixNano(),
+ MaxTimestamp: rawTS.UnixNano(),
+ })
+ require.NoError(t, err)
+ defer func() { _ = handler.(*syncPartContext).Close() }()
+ gotStart := handler.(*syncPartContext).segment.GetTimeRange().Start
+ require.True(t, gotStart.Equal(wantStart),
+ "REGRESSION: raw MinTimestamp=%s landed in segment start=%s;
expected aligned %s (ir=%+v)",
+ rawTS, gotStart, wantStart, ir)
+ require.False(t, rawTS.Equal(wantStart),
+ "test setup degenerate: rawTS already aligned")
+}
+
+// TestSegmentCreateTS_ConsistencyAcrossPaths is the stream variant of the
+// Path-A vs Path-B convergence demo. A pre-existing aligned historical
+// segment must NOT be touched; both new writes must land in the same
+// new bucket.
+func TestSegmentCreateTS_ConsistencyAcrossPaths(t *testing.T) {
+ tmpPath, cleanup := test.Space(require.New(t))
+ defer cleanup()
+
+ const groupName = "consistency-stream"
+ ir := storage.IntervalRule{Unit: storage.DAY, Num: 5}
+ db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+ defer db.Close()
+
+ historicalStart := ir.Standard(time.Date(2026, 4, 10, 0, 0, 0, 0,
time.Local))
+ histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
+ require.NoError(t, err)
+ histSeg.DecRef()
+ histSeg.DecRef()
+
+ rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+ wantNewStart := ir.Standard(rawTS)
+ require.False(t, wantNewStart.Equal(historicalStart))
+ require.False(t, wantNewStart.Equal(rawTS))
+
+ repo := newTestSchemaRepo(db, groupName)
+
+ chunkCallback := &syncCallback{l:
logger.GetLogger("test-stream-pathA"), schemaRepo: repo}
+ handlerA, err :=
chunkCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+ ID: 1,
+ Group: groupName,
+ ShardID: 0,
+ MinTimestamp: rawTS.UnixNano(),
+ MaxTimestamp: rawTS.UnixNano(),
+ })
+ require.NoError(t, err)
+ defer func() { _ = handlerA.(*syncPartContext).Close() }()
+ gotStartA := handlerA.(*syncPartContext).segment.GetTimeRange().Start
+
+ seriesCallback := &syncSeriesCallback{l:
logger.GetLogger("test-stream-pathB"), schemaRepo: repo}
+ handlerB, err :=
seriesCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+ ID: 2,
+ Group: groupName,
+ ShardID: 0,
+ MinTimestamp: wantNewStart.UnixNano(),
+ MaxTimestamp: wantNewStart.UnixNano(),
+ })
+ require.NoError(t, err)
+ defer func() { _ = handlerB.(*syncSeriesContext).Close() }()
+ gotStartB := handlerB.(*syncSeriesContext).segment.GetTimeRange().Start
+
+ require.True(t, gotStartA.Equal(gotStartB),
+ "REGRESSION: stream Path A (raw %s -> %s) and Path B (aligned
%s -> %s) diverged",
+ rawTS, gotStartA, wantNewStart, gotStartB)
+ require.True(t, gotStartA.Equal(wantNewStart),
+ "path A start %s != expected aligned %s", gotStartA,
wantNewStart)
+ require.False(t, gotStartA.Equal(historicalStart),
+ "new write must NOT land in historical segment %s",
historicalStart)
+}
+
+func openTestTSDBWithInterval(t *testing.T, tmpPath, groupName string, ir
storage.IntervalRule) storage.TSDB[*tsTable, option] {
+ t.Helper()
+ opts := storage.TSDBOpts[*tsTable, option]{
+ ShardNum: 1,
+ Location: filepath.Join(tmpPath, "tab"),
+ TSTableCreator: newTSTable,
+ SegmentInterval: ir,
+ TTL: storage.IntervalRule{Unit: ir.Unit, Num: 60},
+ Option: option{protector: protector.Nop{},
mergePolicy: newDefaultMergePolicyForTesting()},
+ }
+ require.NoError(t, os.MkdirAll(opts.Location, storage.DirPerm))
+ ctx := common.SetPosition(
+ context.WithValue(context.Background(), logger.ContextKey,
logger.GetLogger("test-offgrid-stream")),
+ func(p common.Position) common.Position {
+ p.Database = "test-offgrid-stream"
+ return p
+ },
+ )
+ db, err := storage.OpenTSDB[*tsTable, option](ctx, opts, nil, groupName)
+ require.NoError(t, err)
+ return db
+}
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 16c989b52..7a57ba4ce 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -288,7 +288,8 @@ func (s *syncChunkCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext)
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
return nil, err
}
- segmentTime := time.Unix(0, ctx.MinTimestamp)
+ // Align to the segment grid so part-chunk sync and sidx land in the
same segment.
+ segmentTime := tsdb.SegmentInterval().Standard(time.Unix(0,
ctx.MinTimestamp))
segment, err := tsdb.CreateSegmentIfNotExist(segmentTime)
if err != nil {
s.l.Error().Err(err).Str("group",
ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment")
diff --git a/banyand/trace/write_data_segmentref_test.go
b/banyand/trace/write_data_segmentref_test.go
index f332141d8..d170bb3cd 100644
--- a/banyand/trace/write_data_segmentref_test.go
+++ b/banyand/trace/write_data_segmentref_test.go
@@ -213,6 +213,12 @@ func openTestTSDBForRefTest(t *testing.T, tmpPath string,
openShardCount *atomic
//
// This complements TestSyncReceiver_SegmentRefOwnership, which builds
// syncPartContext manually. Together they cover every line of the fix.
+//
+// segTime must use time.Local: the callback path runs time.Unix(0,ns)
+// -> Standard -> CreateSegmentIfNotExist in Local tz. If the warmup
+// segment were created in UTC, the two calls could represent the same
+// instant but format to different segment-suffix strings, causing a
+// directory-name mismatch instead of reusing the same segment.
func TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
tmpPath, cleanup := test.Space(require.New(t))
defer cleanup()
@@ -221,7 +227,7 @@ func
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
db := openTestTSDBForRefTest(t, tmpPath, &openShardCount)
defer db.Close()
- segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
+ segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local)
// Drive the segment into the idle-closed state (refCount=0) so that the
// probe at the end of this test reliably exercises the `initialize`
@@ -352,3 +358,131 @@ type fakeGroup struct {
func (f *fakeGroup) GetSchema() *commonv1.Group { return nil }
func (f *fakeGroup) SupplyTSDB() io.Closer { return f.tsdb }
+
+// TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp covers
+// the regression where the trace part chunk-sync receiver passed the raw
+// MinTimestamp straight into CreateSegmentIfNotExist. See the analogous
+// measure test for full rationale.
+func TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp(t
*testing.T) {
+ t.Run("DAY(1)", func(t *testing.T) {
+ runTraceOffGridCase(t, storage.IntervalRule{Unit: storage.DAY,
Num: 1})
+ })
+ t.Run("DAY(5)", func(t *testing.T) {
+ runTraceOffGridCase(t, storage.IntervalRule{Unit: storage.DAY,
Num: 5})
+ })
+}
+
+func runTraceOffGridCase(t *testing.T, ir storage.IntervalRule) {
+ t.Helper()
+ tmpPath, cleanup := test.Space(require.New(t))
+ defer cleanup()
+
+ const groupName = "off-grid-trace"
+ db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+ defer db.Close()
+
+ rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+ wantStart := ir.Standard(rawTS)
+
+ callback := &syncChunkCallback{
+ l: logger.GetLogger("test-offgrid-trace"),
+ schemaRepo: newTestSchemaRepo(db, groupName),
+ }
+ handler, err :=
callback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+ ID: 1,
+ Group: groupName,
+ ShardID: 0,
+ MinTimestamp: rawTS.UnixNano(),
+ MaxTimestamp: rawTS.UnixNano(),
+ })
+ require.NoError(t, err)
+ defer func() { _ = handler.(*syncPartContext).Close() }()
+ gotStart := handler.(*syncPartContext).segment.GetTimeRange().Start
+ require.True(t, gotStart.Equal(wantStart),
+ "REGRESSION: raw MinTimestamp=%s landed in segment start=%s;
expected aligned %s (ir=%+v)",
+ rawTS, gotStart, wantStart, ir)
+ require.False(t, rawTS.Equal(wantStart),
+ "test setup degenerate: rawTS already aligned")
+}
+
+// TestSegmentCreateTS_ConsistencyAcrossPaths is the trace variant of the
+// Path-A vs Path-B convergence demo. A pre-existing aligned historical
+// segment must NOT be touched; both new writes must land in the same
+// new bucket.
+func TestSegmentCreateTS_ConsistencyAcrossPaths(t *testing.T) {
+ tmpPath, cleanup := test.Space(require.New(t))
+ defer cleanup()
+
+ const groupName = "consistency-trace"
+ ir := storage.IntervalRule{Unit: storage.DAY, Num: 5}
+ db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+ defer db.Close()
+
+ historicalStart := ir.Standard(time.Date(2026, 4, 10, 0, 0, 0, 0,
time.Local))
+ histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
+ require.NoError(t, err)
+ histSeg.DecRef()
+ histSeg.DecRef()
+
+ rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+ wantNewStart := ir.Standard(rawTS)
+ require.False(t, wantNewStart.Equal(historicalStart))
+ require.False(t, wantNewStart.Equal(rawTS))
+
+ repo := newTestSchemaRepo(db, groupName)
+
+ chunkCallback := &syncChunkCallback{l:
logger.GetLogger("test-trace-pathA"), schemaRepo: repo}
+ handlerA, err :=
chunkCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+ ID: 1,
+ Group: groupName,
+ ShardID: 0,
+ MinTimestamp: rawTS.UnixNano(),
+ MaxTimestamp: rawTS.UnixNano(),
+ })
+ require.NoError(t, err)
+ defer func() { _ = handlerA.(*syncPartContext).Close() }()
+ gotStartA := handlerA.(*syncPartContext).segment.GetTimeRange().Start
+
+ seriesCallback := &syncSeriesCallback{l:
logger.GetLogger("test-trace-pathB"), schemaRepo: repo}
+ handlerB, err :=
seriesCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+ ID: 2,
+ Group: groupName,
+ ShardID: 0,
+ MinTimestamp: wantNewStart.UnixNano(),
+ MaxTimestamp: wantNewStart.UnixNano(),
+ })
+ require.NoError(t, err)
+ defer func() { _ = handlerB.(*syncSeriesContext).Close() }()
+ gotStartB := handlerB.(*syncSeriesContext).segment.GetTimeRange().Start
+
+ require.True(t, gotStartA.Equal(gotStartB),
+ "REGRESSION: trace Path A (raw %s -> %s) and Path B (aligned %s
-> %s) diverged",
+ rawTS, gotStartA, wantNewStart, gotStartB)
+ require.True(t, gotStartA.Equal(wantNewStart),
+ "path A start %s != expected aligned %s", gotStartA,
wantNewStart)
+ require.False(t, gotStartA.Equal(historicalStart),
+ "new write must NOT land in historical segment %s",
historicalStart)
+}
+
+func openTestTSDBWithInterval(t *testing.T, tmpPath, groupName string, ir
storage.IntervalRule) storage.TSDB[*tsTable, option] {
+ t.Helper()
+ opts := storage.TSDBOpts[*tsTable, option]{
+ ShardNum: 1,
+ Location: filepath.Join(tmpPath, "tab"),
+ TSTableCreator: newTSTable,
+ SegmentInterval: ir,
+ TTL: storage.IntervalRule{Unit: ir.Unit, Num: 60},
+ Option: option{protector: protector.Nop{},
mergePolicy: newDefaultMergePolicyForTesting()},
+ }
+ require.NoError(t, os.MkdirAll(opts.Location, storage.DirPerm))
+ ctx := common.SetPosition(
+ context.WithValue(context.Background(), logger.ContextKey,
logger.GetLogger("test-offgrid-trace")),
+ func(p common.Position) common.Position {
+ p.Database = "test-offgrid-trace"
+ return p
+ },
+ )
+ db, err := storage.OpenTSDB[*tsTable, option](ctx, opts, nil, groupName)
+ require.NoError(t, err)
+ return db
+}