This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 628f5def18ee59948edb02370b272faabfd85edd Author: Gao Hongtao <[email protected]> AuthorDate: Tue Apr 28 19:31:28 2026 +0800 fix(trace): resolve flaky schema change merge test (#1099) * docs: add snapshot retention flags and relative path version note Document the min-file-snapshot-age flags for measure, stream, trace, and property services. Add version annotation for relative path support. --- api/proto/banyandb/database/v1/rpc.proto | 2 + banyand/measure/metadata.go | 3 +- banyand/stream/metadata.go | 4 +- banyand/trace/metadata.go | 4 +- banyand/trace/metadata_test.go | 81 ++++++++++++++++++++++++++++++++ docs/api-reference.md | 1 + pkg/timestamp/scheduler.go | 8 +++- 7 files changed, 98 insertions(+), 5 deletions(-) diff --git a/api/proto/banyandb/database/v1/rpc.proto b/api/proto/banyandb/database/v1/rpc.proto index d9a1cda92..f877722fa 100644 --- a/api/proto/banyandb/database/v1/rpc.proto +++ b/api/proto/banyandb/database/v1/rpc.proto @@ -515,6 +515,8 @@ message ShardInfo { InvertedIndexInfo inverted_index_info = 5; // sidx_info contains information about sidx. SIDXInfo sidx_info = 6; + // file_part_count is the number of file parts (excluding in-memory parts) in this shard. + int64 file_part_count = 7; } // SeriesIndexInfo contains information about the series index. diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index d846f5f0d..d206cd342 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -455,12 +455,13 @@ func (sr *schemaRepo) collectShardInfo(table any, shardID uint32) *databasev1.Sh } } defer snapshot.decRef() - var totalCount, compressedSize, partCount uint64 + var totalCount, compressedSize, partCount, filePartCount uint64 for _, pw := range snapshot.parts { if pw.p != nil { totalCount += pw.p.partMetadata.TotalCount compressedSize += pw.p.partMetadata.CompressedSizeBytes partCount++ + filePartCount++ } else if pw.mp != nil { totalCount += pw.mp.partMetadata.TotalCount compressedSize += pw.mp.partMetadata.CompressedSizeBytes diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 96c810213..dd91f1481 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -344,13 +344,14 @@ func (sr *schemaRepo) collectShardInfo(table any, shardID uint32) *databasev1.Sh } } defer snapshot.decRef() - var totalCount, compressedSize, uncompressedSize, partCount uint64 + var totalCount, compressedSize, uncompressedSize, partCount, filePartCount uint64 for _, pw := range snapshot.parts { if pw.p != nil { totalCount += pw.p.partMetadata.TotalCount compressedSize += pw.p.partMetadata.CompressedSizeBytes uncompressedSize += pw.p.partMetadata.UncompressedSizeBytes partCount++ + filePartCount++ } else if pw.mp != nil { totalCount += pw.mp.partMetadata.TotalCount compressedSize += pw.mp.partMetadata.CompressedSizeBytes @@ -366,6 +367,7 @@ func (sr *schemaRepo) collectShardInfo(table any, shardID uint32) *databasev1.Sh PartCount: int64(partCount), InvertedIndexInfo: invertedIndexInfo, SidxInfo: &databasev1.SIDXInfo{}, + FilePartCount: int64(filePartCount), } } diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go index 5bbdfe174..3b02231b5 100644 --- a/banyand/trace/metadata.go +++ b/banyand/trace/metadata.go @@ -353,13 +353,14 @@ func (sr *schemaRepo) collectShardInfo(ctx context.Context, table any, shardID u } } defer snapshot.decRef() - var totalCount, compressedSize, uncompressedSize, partCount uint64 + var totalCount, compressedSize, uncompressedSize, partCount, filePartCount uint64 for _, pw := range snapshot.parts { if pw.p != nil { totalCount += pw.p.partMetadata.TotalCount compressedSize += pw.p.partMetadata.CompressedSizeBytes uncompressedSize += pw.p.partMetadata.UncompressedSpanSizeBytes partCount++ + filePartCount++ } else if pw.mp != nil { totalCount += pw.mp.partMetadata.TotalCount compressedSize += pw.mp.partMetadata.CompressedSizeBytes @@ -375,6 +376,7 @@ func (sr *schemaRepo) collectShardInfo(ctx context.Context, table any, shardID u PartCount: int64(partCount), InvertedIndexInfo: &databasev1.InvertedIndexInfo{}, SidxInfo: sidxInfo, + FilePartCount: int64(filePartCount), } } diff --git a/banyand/trace/metadata_test.go b/banyand/trace/metadata_test.go index 8cad466f2..13c74a30e 100644 --- a/banyand/trace/metadata_test.go +++ b/banyand/trace/metadata_test.go @@ -358,6 +358,58 @@ var _ = Describe("Metadata", func() { }) }) + Context("Trace schema with changed tag type after merge", func() { + It("querying data should return correct values after parts with different types are merged", func() { + traceName := "schema_change_tag_type_merge" + now := timestamp.NowMilli() + + env := setupSchemaChangeTrace(svcs, traceName, groupName, traceSetupOptions{withExtraTag: true}) + writeSchemaChangeTraceData(svcs, traceName, groupName, now.Add(-2*time.Hour), 5, + writeTraceDataOptions{extraTag: extraTagInt}) + // Wait for the first batch to flush to disk before writing the second batch. + // This ensures each shard has at least one file part from the first batch, + // so the second batch creates additional parts that can be merged. + Eventually(func() int64 { + return getFilePartCount(svcs, groupName) + }, flags.EventuallyTimeout).Should(BeNumerically(">=", 1)) + changeTraceExtraTagType(svcs, traceName, groupName) + writeSchemaChangeTraceData(svcs, traceName, groupName, now.Add(-1*time.Hour), 3, + writeTraceDataOptions{extraTag: extraTagString, traceIDPrefix: "trace_new_"}) + partCountBeforeMerge := getTotalPartCount(svcs, groupName) + Eventually(func() int64 { + return getTotalPartCount(svcs, groupName) + }, flags.EventuallyTimeout).Should(BeNumerically("<", partCountBeforeMerge)) + + Eventually(func(innerGm Gomega) { + spans := querySchemaChangeTraceData(svcs, traceName, groupName, + now.Add(-3*time.Hour), now, + []string{"trace_id", "service_id", "duration", "extra_tag"}, nil) + innerGm.Expect(spans).To(HaveLen(8)) + + nullCount := 0 + stringCount := 0 + for _, span := range spans { + for _, tag := range span.Tags { + if tag.Key == "extra_tag" { + switch tag.Value.GetValue().(type) { + case *modelv1.TagValue_Null: + nullCount++ + case *modelv1.TagValue_Str: + stringCount++ + } + } + } + } + innerGm.Expect(nullCount).To(Equal(5), + "old data with int type should return null after schema changed to STRING and parts merged") + innerGm.Expect(stringCount).To(Equal(3), + "new data should have string extra_tag values after merge") + }, flags.EventuallyTimeout).Should(Succeed()) + + env.cleanup() + }) + }) + Context("Trace schema with deleted tag in query", func() { It("querying data should fail if the condition includes a deleted tag", func() { traceName := "schema_change_filter_deleted" @@ -694,3 +746,32 @@ func querySchemaChangeTraceData(svcs *services, name, group string, begin, end t }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue()) return spans } + +func getTotalPartCount(svcs *services, group string) int64 { + dataInfo, err := svcs.trace.CollectDataInfo(context.TODO(), group) + if err != nil || dataInfo == nil { + return 0 + } + var total int64 + for _, seg := range dataInfo.SegmentInfo { + for _, shard := range seg.ShardInfo { + total += shard.PartCount + } + } + return total +} + +func getFilePartCount(svcs *services, group string) int64 { + dataInfo, err := svcs.trace.CollectDataInfo(context.TODO(), group) + if err != nil || dataInfo == nil { + return 0 + } + var total int64 + for _, seg := range dataInfo.SegmentInfo { + for _, shard := range seg.ShardInfo { + total += shard.FilePartCount + } + } + return total +} + diff --git a/docs/api-reference.md b/docs/api-reference.md index 44696cff7..4a5bc34e9 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -3958,6 +3958,7 @@ ShardInfo contains information about a specific shard. | part_count | [int64](#int64) | | part_count is the number of parts in this shard. | | inverted_index_info | [InvertedIndexInfo](#banyandb-database-v1-InvertedIndexInfo) | | inverted_index_info contains information about the inverted index. | | sidx_info | [SIDXInfo](#banyandb-database-v1-SIDXInfo) | | sidx_info contains information about sidx. | +| file_part_count | [int64](#int64) | | file_part_count is the number of file parts (excluding in-memory parts) in this shard. | diff --git a/pkg/timestamp/scheduler.go b/pkg/timestamp/scheduler.go index b98df0b01..7ee533e2e 100644 --- a/pkg/timestamp/scheduler.go +++ b/pkg/timestamp/scheduler.go @@ -150,12 +150,16 @@ func (s *Scheduler) Closed() bool { // Close the Scheduler and shut down all registered tasks. func (s *Scheduler) Close() { s.Lock() - defer s.Unlock() s.closed = true + tasks := make([]*task, 0, len(s.tasks)) for k, t := range s.tasks { - t.close() + tasks = append(tasks, t) delete(s.tasks, k) } + s.Unlock() + for _, t := range tasks { + t.close() + } } // Metrics returns the metrics of all registered tasks.
