This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch v0.10.x
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 628f5def18ee59948edb02370b272faabfd85edd
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Apr 28 19:31:28 2026 +0800

    fix(trace): resolve flaky schema change merge test (#1099)
    
    * docs: add snapshot retention flags and relative path version note
    
    Document the min-file-snapshot-age flags for measure, stream, trace,
    and property services. Add version annotation for relative path support.
---
 api/proto/banyandb/database/v1/rpc.proto |  2 +
 banyand/measure/metadata.go              |  3 +-
 banyand/stream/metadata.go               |  4 +-
 banyand/trace/metadata.go                |  4 +-
 banyand/trace/metadata_test.go           | 81 ++++++++++++++++++++++++++++++++
 docs/api-reference.md                    |  1 +
 pkg/timestamp/scheduler.go               |  8 +++-
 7 files changed, 98 insertions(+), 5 deletions(-)

diff --git a/api/proto/banyandb/database/v1/rpc.proto 
b/api/proto/banyandb/database/v1/rpc.proto
index d9a1cda92..f877722fa 100644
--- a/api/proto/banyandb/database/v1/rpc.proto
+++ b/api/proto/banyandb/database/v1/rpc.proto
@@ -515,6 +515,8 @@ message ShardInfo {
   InvertedIndexInfo inverted_index_info = 5;
   // sidx_info contains information about sidx.
   SIDXInfo sidx_info = 6;
+  // file_part_count is the number of file parts (excluding in-memory parts) 
in this shard.
+  int64 file_part_count = 7;
 }
 
 // SeriesIndexInfo contains information about the series index.
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index d846f5f0d..d206cd342 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -455,12 +455,13 @@ func (sr *schemaRepo) collectShardInfo(table any, shardID 
uint32) *databasev1.Sh
                }
        }
        defer snapshot.decRef()
-       var totalCount, compressedSize, partCount uint64
+       var totalCount, compressedSize, partCount, filePartCount uint64
        for _, pw := range snapshot.parts {
                if pw.p != nil {
                        totalCount += pw.p.partMetadata.TotalCount
                        compressedSize += pw.p.partMetadata.CompressedSizeBytes
                        partCount++
+                       filePartCount++
                } else if pw.mp != nil {
                        totalCount += pw.mp.partMetadata.TotalCount
                        compressedSize += pw.mp.partMetadata.CompressedSizeBytes
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 96c810213..dd91f1481 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -344,13 +344,14 @@ func (sr *schemaRepo) collectShardInfo(table any, shardID 
uint32) *databasev1.Sh
                }
        }
        defer snapshot.decRef()
-       var totalCount, compressedSize, uncompressedSize, partCount uint64
+       var totalCount, compressedSize, uncompressedSize, partCount, 
filePartCount uint64
        for _, pw := range snapshot.parts {
                if pw.p != nil {
                        totalCount += pw.p.partMetadata.TotalCount
                        compressedSize += pw.p.partMetadata.CompressedSizeBytes
                        uncompressedSize += 
pw.p.partMetadata.UncompressedSizeBytes
                        partCount++
+                       filePartCount++
                } else if pw.mp != nil {
                        totalCount += pw.mp.partMetadata.TotalCount
                        compressedSize += pw.mp.partMetadata.CompressedSizeBytes
@@ -366,6 +367,7 @@ func (sr *schemaRepo) collectShardInfo(table any, shardID 
uint32) *databasev1.Sh
                PartCount:         int64(partCount),
                InvertedIndexInfo: invertedIndexInfo,
                SidxInfo:          &databasev1.SIDXInfo{},
+               FilePartCount:     int64(filePartCount),
        }
 }
 
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 5bbdfe174..3b02231b5 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -353,13 +353,14 @@ func (sr *schemaRepo) collectShardInfo(ctx 
context.Context, table any, shardID u
                }
        }
        defer snapshot.decRef()
-       var totalCount, compressedSize, uncompressedSize, partCount uint64
+       var totalCount, compressedSize, uncompressedSize, partCount, 
filePartCount uint64
        for _, pw := range snapshot.parts {
                if pw.p != nil {
                        totalCount += pw.p.partMetadata.TotalCount
                        compressedSize += pw.p.partMetadata.CompressedSizeBytes
                        uncompressedSize += 
pw.p.partMetadata.UncompressedSpanSizeBytes
                        partCount++
+                       filePartCount++
                } else if pw.mp != nil {
                        totalCount += pw.mp.partMetadata.TotalCount
                        compressedSize += pw.mp.partMetadata.CompressedSizeBytes
@@ -375,6 +376,7 @@ func (sr *schemaRepo) collectShardInfo(ctx context.Context, 
table any, shardID u
                PartCount:         int64(partCount),
                InvertedIndexInfo: &databasev1.InvertedIndexInfo{},
                SidxInfo:          sidxInfo,
+               FilePartCount:     int64(filePartCount),
        }
 }
 
diff --git a/banyand/trace/metadata_test.go b/banyand/trace/metadata_test.go
index 8cad466f2..13c74a30e 100644
--- a/banyand/trace/metadata_test.go
+++ b/banyand/trace/metadata_test.go
@@ -358,6 +358,58 @@ var _ = Describe("Metadata", func() {
                        })
                })
 
+               Context("Trace schema with changed tag type after merge", 
func() {
+                       It("querying data should return correct values after 
parts with different types are merged", func() {
+                               traceName := "schema_change_tag_type_merge"
+                               now := timestamp.NowMilli()
+
+                               env := setupSchemaChangeTrace(svcs, traceName, 
groupName, traceSetupOptions{withExtraTag: true})
+                               writeSchemaChangeTraceData(svcs, traceName, 
groupName, now.Add(-2*time.Hour), 5,
+                                       writeTraceDataOptions{extraTag: 
extraTagInt})
+                               // Wait for the first batch to flush to disk 
before writing the second batch.
+                               // This ensures each shard has at least one 
file part from the first batch,
+                               // so the second batch creates additional parts 
that can be merged.
+                               Eventually(func() int64 {
+                                       return getFilePartCount(svcs, groupName)
+                               }, 
flags.EventuallyTimeout).Should(BeNumerically(">=", 1))
+                               changeTraceExtraTagType(svcs, traceName, 
groupName)
+                               writeSchemaChangeTraceData(svcs, traceName, 
groupName, now.Add(-1*time.Hour), 3,
+                                       writeTraceDataOptions{extraTag: 
extraTagString, traceIDPrefix: "trace_new_"})
+                               partCountBeforeMerge := getTotalPartCount(svcs, 
groupName)
+                               Eventually(func() int64 {
+                                       return getTotalPartCount(svcs, 
groupName)
+                               }, 
flags.EventuallyTimeout).Should(BeNumerically("<", partCountBeforeMerge))
+
+                               Eventually(func(innerGm Gomega) {
+                                       spans := 
querySchemaChangeTraceData(svcs, traceName, groupName,
+                                               now.Add(-3*time.Hour), now,
+                                               []string{"trace_id", 
"service_id", "duration", "extra_tag"}, nil)
+                                       innerGm.Expect(spans).To(HaveLen(8))
+
+                                       nullCount := 0
+                                       stringCount := 0
+                                       for _, span := range spans {
+                                               for _, tag := range span.Tags {
+                                                       if tag.Key == 
"extra_tag" {
+                                                               switch 
tag.Value.GetValue().(type) {
+                                                               case 
*modelv1.TagValue_Null:
+                                                                       
nullCount++
+                                                               case 
*modelv1.TagValue_Str:
+                                                                       
stringCount++
+                                                               }
+                                                       }
+                                               }
+                                       }
+                                       innerGm.Expect(nullCount).To(Equal(5),
+                                               "old data with int type should 
return null after schema changed to STRING and parts merged")
+                                       innerGm.Expect(stringCount).To(Equal(3),
+                                               "new data should have string 
extra_tag values after merge")
+                               }, flags.EventuallyTimeout).Should(Succeed())
+
+                               env.cleanup()
+                       })
+               })
+
                Context("Trace schema with deleted tag in query", func() {
                        It("querying data should fail if the condition includes 
a deleted tag", func() {
                                traceName := "schema_change_filter_deleted"
@@ -694,3 +746,32 @@ func querySchemaChangeTraceData(svcs *services, name, 
group string, begin, end t
        }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
        return spans
 }
+
+func getTotalPartCount(svcs *services, group string) int64 {
+       dataInfo, err := svcs.trace.CollectDataInfo(context.TODO(), group)
+       if err != nil || dataInfo == nil {
+               return 0
+       }
+       var total int64
+       for _, seg := range dataInfo.SegmentInfo {
+               for _, shard := range seg.ShardInfo {
+                       total += shard.PartCount
+               }
+       }
+       return total
+}
+
+func getFilePartCount(svcs *services, group string) int64 {
+       dataInfo, err := svcs.trace.CollectDataInfo(context.TODO(), group)
+       if err != nil || dataInfo == nil {
+               return 0
+       }
+       var total int64
+       for _, seg := range dataInfo.SegmentInfo {
+               for _, shard := range seg.ShardInfo {
+                       total += shard.FilePartCount
+               }
+       }
+       return total
+}
+
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 44696cff7..4a5bc34e9 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -3958,6 +3958,7 @@ ShardInfo contains information about a specific shard.
 | part_count | [int64](#int64) |  | part_count is the number of parts in this 
shard. |
 | inverted_index_info | 
[InvertedIndexInfo](#banyandb-database-v1-InvertedIndexInfo) |  | 
inverted_index_info contains information about the inverted index. |
 | sidx_info | [SIDXInfo](#banyandb-database-v1-SIDXInfo) |  | sidx_info 
contains information about sidx. |
+| file_part_count | [int64](#int64) |  | file_part_count is the number of file 
parts (excluding in-memory parts) in this shard. |
 
 
 
diff --git a/pkg/timestamp/scheduler.go b/pkg/timestamp/scheduler.go
index b98df0b01..7ee533e2e 100644
--- a/pkg/timestamp/scheduler.go
+++ b/pkg/timestamp/scheduler.go
@@ -150,12 +150,16 @@ func (s *Scheduler) Closed() bool {
 // Close the Scheduler and shut down all registered tasks.
 func (s *Scheduler) Close() {
        s.Lock()
-       defer s.Unlock()
        s.closed = true
+       tasks := make([]*task, 0, len(s.tasks))
        for k, t := range s.tasks {
-               t.close()
+               tasks = append(tasks, t)
                delete(s.tasks, k)
        }
+       s.Unlock()
+       for _, t := range tasks {
+               t.close()
+       }
 }
 
 // Metrics returns the metrics of all registered tasks.

Reply via email to