This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug-empty-mints in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c3d20cb13739e77e27258404e22571ff5cd4c4fc Author: Hongtao Gao <[email protected]> AuthorDate: Fri Apr 10 11:04:14 2026 +0000 fix(sidx): use MinTimestamp/MaxTimestamp instead of SegmentID in streaming sync SIDX StreamingParts was incorrectly setting MinTimestamp from partMetadata.SegmentID and omitting MaxTimestamp entirely. This caused the receiving node to reject parts with "invalid MinTimestamp 0" when SegmentID was zero, and previously created corrupt seg-19700101 directories before validation was added in #1059. --- CHANGES.md | 1 + banyand/internal/sidx/sync.go | 12 ++- banyand/internal/sidx/sync_test.go | 166 +++++++++++++++++++++++++++++++++++++ 3 files changed, 176 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 48c2d04fb..939f46a83 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -29,6 +29,7 @@ Release Notes. - ui: fix query editor refresh/reset behavior and BydbQL keyword highlighting. - Disable the rotation task on warm and cold nodes to prevent incorrect segment boundaries during lifecycle migration. - Prevent epoch-dated segment directories (seg-19700101) from being created by zero timestamps in distributed sync paths. +- Fix SIDX streaming sync sending SegmentID as MinTimestamp instead of the actual timestamp, causing sync failures on the receiving node. ### Chores diff --git a/banyand/internal/sidx/sync.go b/banyand/internal/sidx/sync.go index 465df54eb..7a0ae0523 100644 --- a/banyand/internal/sidx/sync.go +++ b/banyand/internal/sidx/sync.go @@ -74,7 +74,7 @@ func (s *sidx) StreamingParts(partIDsToSync map[uint64]struct{}, group string, s part := pw.p files, release := createPartFileReaders(part) releaseFuncs = append(releaseFuncs, release) - streamingParts = append(streamingParts, queue.StreamingPartData{ + spd := queue.StreamingPartData{ ID: part.partMetadata.ID, Group: group, ShardID: shardID, @@ -84,11 +84,17 @@ func (s *sidx) StreamingParts(partIDsToSync map[uint64]struct{}, group string, s UncompressedSizeBytes: part.partMetadata.UncompressedSizeBytes, TotalCount: part.partMetadata.TotalCount, BlocksCount: part.partMetadata.BlocksCount, - MinTimestamp: part.partMetadata.SegmentID, MinKey: part.partMetadata.MinKey, MaxKey: part.partMetadata.MaxKey, PartType: name, - }) + } + if part.partMetadata.MinTimestamp != nil { + spd.MinTimestamp = *part.partMetadata.MinTimestamp + } + if part.partMetadata.MaxTimestamp != nil { + spd.MaxTimestamp = *part.partMetadata.MaxTimestamp + } + streamingParts = append(streamingParts, spd) } } diff --git a/banyand/internal/sidx/sync_test.go b/banyand/internal/sidx/sync_test.go new file mode 100644 index 000000000..0d73d0f64 --- /dev/null +++ b/banyand/internal/sidx/sync_test.go @@ -0,0 +1,166 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/data" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/pkg/fs" +) + +// TestStreamingParts_Timestamps verifies that StreamingParts propagates +// MinTimestamp and MaxTimestamp from partMetadata (not SegmentID) to +// StreamingPartData. This prevents zero-timestamp segments from being +// created on the receiving node during distributed sync. +func TestStreamingParts_Timestamps(t *testing.T) { + reqs := []WriteRequest{ + createTestWriteRequest(1, 100, "data1"), + createTestWriteRequest(1, 200, "data2"), + } + + t.Run("with_timestamps_set", func(t *testing.T) { + sidxIface := createTestSIDX(t) + raw := sidxIface.(*sidx) + defer func() { + assert.NoError(t, raw.Close()) + }() + + minTS := int64(1700000000) + maxTS := int64(1700001000) + writeTestDataWithTimeRange(t, raw, reqs, 1, 1, &minTS, &maxTS) + + flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}}) + require.NoError(t, err) + raw.IntroduceFlushed(flushIntro) + flushIntro.Release() + + partIDs := map[uint64]struct{}{1: {}} + parts, releaseFuncs := raw.StreamingParts(partIDs, "test-group", 0, "test-sidx") + defer func() { + for _, release := range releaseFuncs { + release() + } + }() + + require.Len(t, parts, 1) + assert.Equal(t, uint64(1), parts[0].ID) + assert.Equal(t, int64(1700000000), parts[0].MinTimestamp, + "MinTimestamp should come from partMetadata.MinTimestamp, not SegmentID") + assert.Equal(t, int64(1700001000), parts[0].MaxTimestamp, + "MaxTimestamp should come from partMetadata.MaxTimestamp") + assert.Equal(t, "test-group", parts[0].Group) + assert.Equal(t, uint32(0), parts[0].ShardID) + assert.Equal(t, data.TopicTracePartSync.String(), parts[0].Topic) + assert.Equal(t, "test-sidx", parts[0].PartType) + }) + + t.Run("nil_timestamps_default_to_zero", func(t *testing.T) { + sidxIface := createTestSIDX(t) + raw := sidxIface.(*sidx) + defer func() { + assert.NoError(t, raw.Close()) + }() + + writeTestDataWithTimeRange(t, raw, reqs, 1, 1, nil, nil) + + flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}}) + require.NoError(t, err) + raw.IntroduceFlushed(flushIntro) + flushIntro.Release() + + partIDs := map[uint64]struct{}{1: {}} + parts, releaseFuncs := raw.StreamingParts(partIDs, "test-group", 0, "test-sidx") + defer func() { + for _, release := range releaseFuncs { + release() + } + }() + + require.Len(t, parts, 1) + assert.Equal(t, int64(0), parts[0].MinTimestamp, + "MinTimestamp should default to 0 when partMetadata.MinTimestamp is nil") + assert.Equal(t, int64(0), parts[0].MaxTimestamp, + "MaxTimestamp should default to 0 when partMetadata.MaxTimestamp is nil") + }) + + t.Run("nil_snapshot_returns_nil", func(t *testing.T) { + sidxIface := createTestSIDX(t) + raw := sidxIface.(*sidx) + defer func() { + assert.NoError(t, raw.Close()) + }() + + partIDs := map[uint64]struct{}{1: {}} + parts, releaseFuncs := raw.StreamingParts(partIDs, "test-group", 0, "test-sidx") + defer func() { + for _, release := range releaseFuncs { + release() + } + }() + assert.Nil(t, parts) + }) + + t.Run("multiple_parts_sorted_by_id", func(t *testing.T) { + dir := t.TempDir() + fileSystem := fs.NewLocalFileSystem() + opts := NewDefaultOptions() + opts.Memory = protector.NewMemory(observability.NewBypassRegistry()) + opts.Path = dir + + sidxIface, err := NewSIDX(fileSystem, opts) + require.NoError(t, err) + raw := sidxIface.(*sidx) + defer func() { + assert.NoError(t, raw.Close()) + }() + + min1, max1 := int64(1700000000), int64(1700001000) + min2, max2 := int64(1800000000), int64(1800001000) + writeTestDataWithTimeRange(t, raw, reqs, 1, 2, &min1, &max1) + writeTestDataWithTimeRange(t, raw, reqs, 2, 3, &min2, &max2) + + flushIntro, flushErr := raw.Flush(map[uint64]struct{}{2: {}, 3: {}}) + require.NoError(t, flushErr) + raw.IntroduceFlushed(flushIntro) + flushIntro.Release() + + partIDs := map[uint64]struct{}{2: {}, 3: {}} + parts, releaseFuncs := raw.StreamingParts(partIDs, "test-group", 0, "test-sidx") + defer func() { + for _, release := range releaseFuncs { + release() + } + }() + + require.Len(t, parts, 2) + // Parts should be sorted by ID + assert.Equal(t, uint64(2), parts[0].ID) + assert.Equal(t, int64(1700000000), parts[0].MinTimestamp) + assert.Equal(t, int64(1700001000), parts[0].MaxTimestamp) + assert.Equal(t, uint64(3), parts[1].ID) + assert.Equal(t, int64(1800000000), parts[1].MinTimestamp) + assert.Equal(t, int64(1800001000), parts[1].MaxTimestamp) + }) +} +
