This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch test/multi-segments
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit a2a8bde5ca4ea9e28f002c5b9de2e75469ea6b1d
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Tue Sep 23 08:40:06 2025 +0800

    Fix sorting issue of timestamps in measure model segments and enhance 
memory management in merging process. Update related tests for improved 
coverage and accuracy.
---
 CHANGES.md                                         |   1 +
 banyand/measure/flusher.go                         |  85 ++++++++++---
 banyand/measure/part.go                            |   1 +
 banyand/measure/query_test.go                      |   4 +-
 banyand/measure/syncer.go                          |  23 ----
 banyand/measure/tstable.go                         |   8 ++
 banyand/measure/write_liaison.go                   |   5 +-
 banyand/measure/write_standalone.go                |   3 -
 banyand/stream/flusher.go                          |  85 ++++++++++---
 banyand/stream/part.go                             |   2 +
 banyand/stream/tstable.go                          |   8 ++
 banyand/stream/write_liaison.go                    |   2 +-
 banyand/trace/flusher.go                           | 138 +++++++++++++++------
 banyand/trace/part.go                              |   2 +
 banyand/trace/tstable.go                           |   8 ++
 banyand/trace/write_liaison.go                     |   2 +-
 test/cases/measure/data/input/linked_or.yaml       |   1 -
 test/cases/measure/measure.go                      |   5 +-
 .../multi_segments/multi_segments_suite_test.go    |   9 +-
 .../multi_segments/multi_segments_suite_test.go    |   2 +-
 20 files changed, 279 insertions(+), 115 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ab84e0d0..79e28c3a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,7 @@ Release Notes.
 - Fix returning empty result when using IN operatior on the array type tags.
 - Fix memory leaks and OOM issues in streaming processing by implementing 
deduplication logic in priority queues and improving sliding window memory 
management.
 - Fix etcd prefix matching any key that starts with this prefix.
+- Fix the sorting timestamps issue of the measure model when there are more 
than one segment.
 
 ### Document
 
diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go
index 7c2b0bba..c0390f50 100644
--- a/banyand/measure/flusher.go
+++ b/banyand/measure/flusher.go
@@ -99,33 +99,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch 
uint64, flushWatcher watc
 }
 
 func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan 
*mergerIntroduction) (bool, error) {
+       var merged bool
+       var currentSegmentID int64
        var memParts []*partWrapper
        mergedIDs := make(map[uint64]struct{})
+
+       // Helper function to merge current segment's parts
+       mergeCurrentSegment := func() (bool, error) {
+               if len(memParts) < 2 {
+                       return false, nil
+               }
+
+               // Create a copy of mergedIDs for this merge operation
+               currentMergedIDs := make(map[uint64]struct{}, len(mergedIDs))
+               for id := range mergedIDs {
+                       currentMergedIDs[id] = struct{}{}
+               }
+
+               // merge memory must not be closed by the tsTable.close
+               closeCh := make(chan struct{})
+               newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+                       currentMergedIDs, mergeCh, closeCh, "mem")
+               close(closeCh)
+               if err != nil {
+                       if errors.Is(err, errClosed) {
+                               return true, nil
+                       }
+                       return false, err
+               }
+               return newPart != nil, nil
+       }
+
+       // Process parts grouped by segmentID
        for i := range snp.parts {
-               if snp.parts[i].mp != nil {
-                       memParts = append(memParts, snp.parts[i])
-                       mergedIDs[snp.parts[i].ID()] = struct{}{}
+               if snp.parts[i].mp == nil {
                        continue
                }
-       }
-       if len(memParts) < 2 {
-               return false, nil
-       }
-       // merge memory must not be closed by the tsTable.close
-       closeCh := make(chan struct{})
-       newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
-               mergedIDs, mergeCh, closeCh, "mem")
-       close(closeCh)
-       if err != nil {
-               if errors.Is(err, errClosed) {
-                       return true, nil
+
+               segID := snp.parts[i].mp.segmentID
+
+               // If this is a new segment, merge the previous segment first
+               if currentSegmentID != 0 && currentSegmentID != segID {
+                       m, err := mergeCurrentSegment()
+                       if err != nil {
+                               return false, err
+                       }
+                       if m {
+                               merged = true
+                       }
+
+                       // Reset for next segment
+                       memParts = memParts[:0]
+                       for id := range mergedIDs {
+                               delete(mergedIDs, id)
+                       }
                }
-               return false, err
+
+               // Add part to current segment
+               currentSegmentID = segID
+               memParts = append(memParts, snp.parts[i])
+               mergedIDs[snp.parts[i].ID()] = struct{}{}
        }
-       if newPart == nil {
-               return false, nil
+
+       // Merge the last segment if it has parts
+       if len(memParts) >= 2 {
+               m, err := mergeCurrentSegment()
+               if err != nil {
+                       return false, err
+               }
+               if m {
+                       merged = true
+               }
        }
-       return true, nil
+
+       return merged, nil
 }
 
 func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction) {
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index d7d639ea..e47bfbc6 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -109,6 +109,7 @@ type memPart struct {
        timestamps        bytes.Buffer
        fieldValues       bytes.Buffer
        partMetadata      partMetadata
+       segmentID         int64
 }
 
 func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer, 
fs.Writer) {
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 85d1af5f..17ca9c37 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -1699,9 +1699,7 @@ func TestSegResultHeap_Sorting(t *testing.T) {
                        }
 
                        // Add all results to heap
-                       for _, sr := range tt.segResults {
-                               heap.results = append(heap.results, sr)
-                       }
+                       heap.results = append(heap.results, tt.segResults...)
 
                        // Initialize heap
                        require.Equal(t, len(tt.segResults), heap.Len())
diff --git a/banyand/measure/syncer.go b/banyand/measure/syncer.go
index 791a97e0..b3b7f3a4 100644
--- a/banyand/measure/syncer.go
+++ b/banyand/measure/syncer.go
@@ -20,7 +20,6 @@ package measure
 import (
        "context"
        "fmt"
-       "strings"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/data"
@@ -231,28 +230,6 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                        // Create streaming reader for the part.
                        files, release := createPartFileReaders(part)
                        releaseFuncs = append(releaseFuncs, release)
-                       builder := strings.Builder{}
-                       for i := range part.primaryBlockMetadata {
-                               offset := part.primaryBlockMetadata[i].offset
-                               size := part.primaryBlockMetadata[i].size
-                               buf := make([]byte, size)
-                               part.primary.Read(int64(offset), buf)
-                               uncompressedBuf, err := zstd.Decompress(nil, 
buf)
-                               if err != nil {
-                                       return fmt.Errorf("cannot decompress 
block metadata: %w", err)
-                               }
-                               blockMetadata, err := 
unmarshalBlockMetadata(nil, uncompressedBuf)
-                               if err != nil {
-                                       return fmt.Errorf("cannot unmarshal 
block metadata: %w", err)
-                               }
-                               for _, block := range blockMetadata {
-                                       builder.WriteString(fmt.Sprintf("%v", 
block.seriesID))
-                                       builder.WriteString(",")
-                               }
-                       }
-                       timeStart := time.Unix(0, 
part.partMetadata.MinTimestamp)
-                       timeEnd := time.Unix(0, part.partMetadata.MaxTimestamp)
-                       fmt.Printf("snp %v primary block metadata: %v total 
count: %v time range: %v-%v group: %v shard: %v - %v\n", curSnapshot.epoch, 
builder.String(), part.partMetadata.TotalCount, timeStart, timeEnd, tst.group, 
tst.shardID, time.Now().Format(time.StampNano))
                        // Create streaming part sync data.
                        streamingParts = append(streamingParts, 
queue.StreamingPartData{
                                ID:                    part.partMetadata.ID,
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 2f71eddd..5ddab3ea 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -106,6 +106,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, 
p common.Position,
                fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
        }
        if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+               for _, id := range loadedSnapshots {
+                       fileSystem.MustRMAll(filepath.Join(rootPath, 
snapshotName(id)))
+               }
                return &tst, uint64(time.Now().UnixNano())
        }
        sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -279,12 +282,17 @@ func (tst *tsTable) Close() error {
 }
 
 func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
+       tst.mustAddDataPointsWithSegmentID(dps, 0)
+}
+
+func (tst *tsTable) mustAddDataPointsWithSegmentID(dps *dataPoints, segmentID 
int64) {
        if len(dps.seriesIDs) == 0 {
                return
        }
 
        mp := generateMemPart()
        mp.mustInitFromDataPoints(dps)
+       mp.segmentID = segmentID
        tst.mustAddMemPart(mp)
 }
 
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 6bae9dc4..e64cad88 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -108,10 +108,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                for j := range g.tables {
                        es := g.tables[j]
                        if es.tsTable != nil && es.dataPoints != nil {
-                               for i := range es.dataPoints.timestamps {
-                                       fmt.Printf("series id: %v timestamp: %v 
time range: %v\n", es.dataPoints.seriesIDs[i], es.dataPoints.timestamps[i], 
es.timeRange)
-                               }
-                               es.tsTable.mustAddDataPoints(es.dataPoints)
+                               
es.tsTable.mustAddDataPointsWithSegmentID(es.dataPoints, 
es.timeRange.Start.UnixNano())
                                releaseDataPoints(es.dataPoints)
                        }
                        nodes := g.queue.GetNodes(es.shardID)
diff --git a/banyand/measure/write_standalone.go 
b/banyand/measure/write_standalone.go
index 759af0e2..dc7fd8b1 100644
--- a/banyand/measure/write_standalone.go
+++ b/banyand/measure/write_standalone.go
@@ -80,9 +80,6 @@ func processDataPoint(dpt *dataPointsInTable, req 
*measurev1.WriteRequest, write
        if err := series.Marshal(); err != nil {
                return 0, fmt.Errorf("cannot marshal series: %w", err)
        }
-       if req.Metadata.Name == "service_cpm_minute" {
-               fmt.Printf("entity values: %v time range: %v series id: %v\n", 
writeEvent.EntityValues, dpt.timeRange, series.ID)
-       }
 
        if stm.schema.IndexMode {
                fields := handleIndexMode(stm.schema, req, is.indexRuleLocators)
diff --git a/banyand/stream/flusher.go b/banyand/stream/flusher.go
index d8d4cc2f..04788af0 100644
--- a/banyand/stream/flusher.go
+++ b/banyand/stream/flusher.go
@@ -129,33 +129,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch 
uint64, flushWatcher watc
 }
 
 func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan 
*mergerIntroduction) (bool, error) {
+       var merged bool
+       var currentSegmentID int64
        var memParts []*partWrapper
        mergedIDs := make(map[uint64]struct{})
+
+       // Helper function to merge current segment's parts
+       mergeCurrentSegment := func() (bool, error) {
+               if len(memParts) < 2 {
+                       return false, nil
+               }
+
+               // Create a copy of mergedIDs for this merge operation
+               currentMergedIDs := make(map[uint64]struct{}, len(mergedIDs))
+               for id := range mergedIDs {
+                       currentMergedIDs[id] = struct{}{}
+               }
+
+               // merge memory must not be closed by the tsTable.close
+               closeCh := make(chan struct{})
+               newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+                       currentMergedIDs, mergeCh, closeCh, "mem")
+               close(closeCh)
+               if err != nil {
+                       if errors.Is(err, errClosed) {
+                               return true, nil
+                       }
+                       return false, err
+               }
+               return newPart != nil, nil
+       }
+
+       // Process parts grouped by segmentID
        for i := range snp.parts {
-               if snp.parts[i].mp != nil {
-                       memParts = append(memParts, snp.parts[i])
-                       mergedIDs[snp.parts[i].ID()] = struct{}{}
+               if snp.parts[i].mp == nil {
                        continue
                }
-       }
-       if len(memParts) < 2 {
-               return false, nil
-       }
-       // merge memory must not be closed by the tsTable.close
-       closeCh := make(chan struct{})
-       newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
-               mergedIDs, mergeCh, closeCh, "mem")
-       close(closeCh)
-       if err != nil {
-               if errors.Is(err, errClosed) {
-                       return true, nil
+
+               segID := snp.parts[i].mp.segmentID
+
+               // If this is a new segment, merge the previous segment first
+               if currentSegmentID != 0 && currentSegmentID != segID {
+                       m, err := mergeCurrentSegment()
+                       if err != nil {
+                               return false, err
+                       }
+                       if m {
+                               merged = true
+                       }
+
+                       // Reset for next segment
+                       memParts = memParts[:0]
+                       for id := range mergedIDs {
+                               delete(mergedIDs, id)
+                       }
                }
-               return false, err
+
+               // Add part to current segment
+               currentSegmentID = segID
+               memParts = append(memParts, snp.parts[i])
+               mergedIDs[snp.parts[i].ID()] = struct{}{}
        }
-       if newPart == nil {
-               return false, nil
+
+       // Merge the last segment if it has parts
+       if len(memParts) >= 2 {
+               m, err := mergeCurrentSegment()
+               if err != nil {
+                       return false, err
+               }
+               if m {
+                       merged = true
+               }
        }
-       return true, nil
+
+       return merged, nil
 }
 
 func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction) {
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index 07e5a22f..39cb2145 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -112,6 +112,7 @@ type memPart struct {
        primary           bytes.Buffer
        timestamps        bytes.Buffer
        partMetadata      partMetadata
+       segmentID         int64
 }
 
 func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer, 
fs.Writer, fs.Writer) {
@@ -140,6 +141,7 @@ func (mp *memPart) reset() {
        mp.meta.Reset()
        mp.primary.Reset()
        mp.timestamps.Reset()
+       mp.segmentID = 0
        if mp.tagFamilies != nil {
                for k, tf := range mp.tagFamilies {
                        tf.Reset()
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index bd521ce4..9384c4b4 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -261,6 +261,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, 
p common.Position,
                fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
        }
        if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+               for _, id := range loadedSnapshots {
+                       fileSystem.MustRMAll(filepath.Join(rootPath, 
snapshotName(id)))
+               }
                return &tst, uint64(time.Now().UnixNano()), nil
        }
        sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -336,12 +339,17 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
 }
 
 func (tst *tsTable) mustAddElements(es *elements) {
+       tst.mustAddElementsWithSegmentID(es, 0)
+}
+
+func (tst *tsTable) mustAddElementsWithSegmentID(es *elements, segmentID 
int64) {
        if len(es.seriesIDs) == 0 {
                return
        }
 
        mp := generateMemPart()
        mp.mustInitFromElements(es)
+       mp.segmentID = segmentID
        tst.mustAddMemPart(mp)
 }
 
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index c78d1a37..be566f09 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -181,7 +181,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                g := groups[i]
                for j := range g.tables {
                        es := g.tables[j]
-                       es.tsTable.mustAddElements(es.elements)
+                       es.tsTable.mustAddElementsWithSegmentID(es.elements, 
es.timeRange.Start.UnixNano())
                        releaseElements(es.elements)
                        // Get nodes for this shard
                        nodes := g.queue.GetNodes(es.shardID)
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 56f81c32..727cad6c 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -48,33 +48,20 @@ func (tst *tsTable) flusherLoop(flushCh chan 
*flusherIntroduction, mergeCh chan
                                        
tst.incTotalFlushLatency(time.Since(start).Seconds())
                                }()
                                curSnapshot := tst.currentSnapshot()
-                               if curSnapshot != nil {
-                                       flusherWatchers = 
tst.pauseFlusherToPileupMemParts(epoch, flusherWatcher, flusherWatchers)
-                                       curSnapshot.decRef()
-                                       curSnapshot = nil
-                               }
-                               tst.RLock()
-                               if tst.snapshot != nil && tst.snapshot.epoch > 
epoch {
-                                       curSnapshot = tst.snapshot
-                                       curSnapshot.incRef()
-                               }
-                               tst.RUnlock()
+                               var ok bool
+                               var merged bool
+                               curSnapshot, flusherWatchers, ok, merged = 
tst.pauseFlusherToPileupMemPartsWithMerge(curSnapshot, flusherWatcher, 
flusherWatchers, epoch, mergeCh)
                                if curSnapshot != nil {
                                        defer curSnapshot.decRef()
-                                       merged, err := 
tst.mergeMemParts(curSnapshot, mergeCh)
-                                       if err != nil {
-                                               
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", 
curSnapshot.epoch)
-                                               tst.incTotalFlushLoopErr(1)
+                                       if !ok {
                                                return false
                                        }
                                        if !merged {
                                                tst.flush(curSnapshot, flushCh)
                                        }
-                                       epoch = curSnapshot.epoch
-                                       // Notify merger to start a new round 
of merge.
-                                       // This round might have be triggered 
in pauseFlusherToPileupMemParts.
                                        flusherWatchers.Notify(math.MaxUint64)
                                        flusherWatchers = nil
+                                       epoch = curSnapshot.epoch
                                        if tst.currentEpoch() != epoch {
                                                tst.incTotalFlushLoopProgress(1)
                                                return false
@@ -132,6 +119,36 @@ func (tst *tsTable) flusherLoopNoMerger(flushCh chan 
*flusherIntroduction, intro
        }
 }
 
+func (tst *tsTable) pauseFlusherToPileupMemPartsWithMerge(
+       curSnapshot *snapshot, flusherWatcher watcher.Channel, flusherWatchers 
watcher.Epochs,
+       epoch uint64, mergeCh chan *mergerIntroduction,
+) (*snapshot, watcher.Epochs, bool, bool) {
+       if tst.option.flushTimeout < 1 {
+               return curSnapshot, flusherWatchers, true, false
+       }
+       if curSnapshot != nil {
+               flusherWatchers = tst.pauseFlusherToPileupMemParts(epoch, 
flusherWatcher, flusherWatchers)
+               curSnapshot.decRef()
+               curSnapshot = nil
+       }
+       tst.RLock()
+       if tst.snapshot != nil && tst.snapshot.epoch > epoch {
+               curSnapshot = tst.snapshot
+               curSnapshot.incRef()
+       }
+       tst.RUnlock()
+       if curSnapshot == nil {
+               return nil, flusherWatchers, false, false
+       }
+       merged, err := tst.mergeMemParts(curSnapshot, mergeCh)
+       if err != nil {
+               tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", 
curSnapshot.epoch)
+               tst.incTotalFlushLoopErr(1)
+               return curSnapshot, flusherWatchers, false, false
+       }
+       return curSnapshot, flusherWatchers, true, merged
+}
+
 // pauseFlusherToPileupMemParts takes a pause to wait for in-memory parts to 
pile up.
 // If there is no in-memory part, we can skip the pause.
 // When a merging is finished, we can skip the pause.
@@ -155,33 +172,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch 
uint64, flushWatcher watc
 }
 
 func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan 
*mergerIntroduction) (bool, error) {
+       var merged bool
+       var currentSegmentID int64
        var memParts []*partWrapper
        mergedIDs := make(map[partHandle]struct{})
+
+       // Helper function to merge current segment's parts
+       mergeCurrentSegment := func() (bool, error) {
+               if len(memParts) < 2 {
+                       return false, nil
+               }
+
+               // Create a copy of mergedIDs for this merge operation
+               currentMergedIDs := make(map[partHandle]struct{}, 
len(mergedIDs))
+               for id := range mergedIDs {
+                       currentMergedIDs[id] = struct{}{}
+               }
+
+               // merge memory must not be closed by the tsTable.close
+               closeCh := make(chan struct{})
+               newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+                       currentMergedIDs, mergeCh, closeCh, "mem")
+               close(closeCh)
+               if err != nil {
+                       if errors.Is(err, errClosed) {
+                               return true, nil
+                       }
+                       return false, err
+               }
+               return newPart != nil, nil
+       }
+
+       // Process parts grouped by segmentID
        for i := range snp.parts {
-               if snp.parts[i].mp != nil {
-                       memParts = append(memParts, snp.parts[i])
-                       mergedIDs[partHandle{partID: snp.parts[i].ID(), 
partType: PartTypeCore}] = struct{}{}
+               if snp.parts[i].mp == nil {
                        continue
                }
-       }
-       if len(memParts) < 2 {
-               return false, nil
-       }
-       // merge memory must not be closed by the tsTable.close
-       closeCh := make(chan struct{})
-       newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
-               mergedIDs, mergeCh, closeCh, "mem")
-       close(closeCh)
-       if err != nil {
-               if errors.Is(err, errClosed) {
-                       return true, nil
+
+               segID := snp.parts[i].mp.segmentID
+
+               // If this is a new segment, merge the previous segment first
+               if currentSegmentID != 0 && currentSegmentID != segID {
+                       m, err := mergeCurrentSegment()
+                       if err != nil {
+                               return false, err
+                       }
+                       if m {
+                               merged = true
+                       }
+
+                       // Reset for next segment
+                       memParts = memParts[:0]
+                       for id := range mergedIDs {
+                               delete(mergedIDs, id)
+                       }
                }
-               return false, err
+
+               // Add part to current segment
+               currentSegmentID = segID
+               memParts = append(memParts, snp.parts[i])
+               mergedIDs[partHandle{partID: snp.parts[i].ID(), partType: 
PartTypeCore}] = struct{}{}
        }
-       if newPart == nil {
-               return false, nil
+
+       // Merge the last segment if it has parts
+       if len(memParts) >= 2 {
+               m, err := mergeCurrentSegment()
+               if err != nil {
+                       return false, err
+               }
+               if m {
+                       merged = true
+               }
        }
-       return true, nil
+
+       return merged, nil
 }
 
 func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction) {
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index d8ba721d..b76caab9 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -113,6 +113,7 @@ type memPart struct {
        meta          bytes.Buffer
        primary       bytes.Buffer
        partMetadata  partMetadata
+       segmentID     int64
 }
 
 func (mp *memPart) mustCreateMemTagWriters(name string) (fs.Writer, fs.Writer) 
{
@@ -143,6 +144,7 @@ func (mp *memPart) reset() {
        mp.meta.Reset()
        mp.primary.Reset()
        mp.spans.Reset()
+       mp.segmentID = 0
        if mp.tags != nil {
                for k, t := range mp.tags {
                        t.Reset()
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 8e3be10f..645dde18 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -248,6 +248,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, 
p common.Position,
                fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
        }
        if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+               for _, id := range loadedSnapshots {
+                       fileSystem.MustRMAll(filepath.Join(rootPath, 
snapshotName(id)))
+               }
                return &tst, uint64(time.Now().UnixNano())
        }
        sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -412,12 +415,17 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
 }
 
 func (tst *tsTable) mustAddTraces(ts *traces) {
+       tst.mustAddTracesWithSegmentID(ts, 0)
+}
+
+func (tst *tsTable) mustAddTracesWithSegmentID(ts *traces, segmentID int64) {
        if len(ts.traceIDs) == 0 {
                return
        }
 
        mp := generateMemPart()
        mp.mustInitFromTraces(ts)
+       mp.segmentID = segmentID
        tst.mustAddMemPart(mp)
 }
 
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index 58a37f60..094f1fb7 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -191,7 +191,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                g := groups[i]
                for j := range g.tables {
                        es := g.tables[j]
-                       es.tsTable.mustAddTraces(es.traces)
+                       es.tsTable.mustAddTracesWithSegmentID(es.traces, 
es.timeRange.Start.UnixNano())
                        releaseTraces(es.traces)
 
                        for sidxName, sidxReqs := range es.sidxReqsMap {
diff --git a/test/cases/measure/data/input/linked_or.yaml 
b/test/cases/measure/data/input/linked_or.yaml
index 898a02a2..d2662a84 100644
--- a/test/cases/measure/data/input/linked_or.yaml
+++ b/test/cases/measure/data/input/linked_or.yaml
@@ -16,7 +16,6 @@
 # under the License.
 
 name: "service_cpm_minute"
-trace: true
 groups: ["sw_metric"]
 tagProjection:
   tagFamilies:
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 12a693ca..274dd3cb 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -67,7 +67,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("filter by entity id and service id", helpers.Args{Input: 
"entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
DisOrder: true}),
        g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
        g.Entry("invalid logical expression", helpers.Args{Input: 
"err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
WantErr: true}),
-       g.FEntry("linked or expressions", helpers.Args{Input: "linked_or", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("linked or expressions", helpers.Args{Input: "linked_or", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("In and not In expressions", helpers.Args{Input: "in", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
@@ -77,7 +77,8 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
        g.Entry("all of index mode in a larger time range",
                helpers.Args{Input: "index_mode_all", Want: 
"index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour, 
DisOrder: true}),
-       g.Entry("all in all segments of index mode", helpers.Args{Input: 
"index_mode_all", Want: "index_mode_all_segs", Duration: 96 * time.Hour, 
Offset: -72 * time.Hour, DisOrder: true}),
+       g.Entry("all in all segments of index mode",
+               helpers.Args{Input: "index_mode_all", Want: 
"index_mode_all_segs", Duration: 96 * time.Hour, Offset: -72 * time.Hour, 
DisOrder: true}),
        g.Entry("order by desc of index mode", helpers.Args{Input: 
"index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 * 
time.Minute}),
        g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
        g.Entry("none of index mode", helpers.Args{Input: "index_mode_none", 
WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
diff --git 
a/test/integration/distributed/multi_segments/multi_segments_suite_test.go 
b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
index bb1af63d..5a8fa9fe 100644
--- a/test/integration/distributed/multi_segments/multi_segments_suite_test.go
+++ b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
@@ -42,11 +42,13 @@ import (
        test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        test_cases "github.com/apache/skywalking-banyandb/test/cases"
        casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
        casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
        casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+       casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
 )
 
 func TestDistributedMultiSegments(t *testing.T) {
@@ -95,6 +97,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        ctx := context.Background()
        test_stream.PreloadSchema(ctx, schemaRegistry)
        test_measure.PreloadSchema(ctx, schemaRegistry)
+       test_trace.PreloadSchema(ctx, schemaRegistry)
 
        By("Starting data node 0")
        closeDataNode0 := setup.DataNode(ep)
@@ -106,7 +109,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        By("Initializing test cases")
        ns := timestamp.NowMilli().UnixNano()
        now = time.Unix(0, ns-ns%int64(time.Minute))
-       baseTime = time.Date(now.Year(), now.Month(), now.Day(), 00, 02, 0, 0, 
now.Location())
+       baseTime = time.Date(now.Year(), now.Month(), now.Day(), 0o0, 0o2, 0, 
0, now.Location())
        test_cases.Initialize(liaisonAddr, baseTime)
 
        deferFunc = func() {
@@ -134,6 +137,10 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                Connection: connection,
                BaseTime:   baseTime,
        }
+       casestrace.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
        Expect(err).NotTo(HaveOccurred())
 })
 
diff --git 
a/test/integration/standalone/multi_segments/multi_segments_suite_test.go 
b/test/integration/standalone/multi_segments/multi_segments_suite_test.go
index b3ea9011..5c8e1c61 100644
--- a/test/integration/standalone/multi_segments/multi_segments_suite_test.go
+++ b/test/integration/standalone/multi_segments/multi_segments_suite_test.go
@@ -66,7 +66,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        addr, _, deferFunc = setup.Standalone()
        ns := timestamp.NowMilli().UnixNano()
        now = time.Unix(0, ns-ns%int64(time.Minute))
-       baseTime = time.Date(now.Year(), now.Month(), now.Day(), 00, 02, 0, 0, 
now.Location())
+       baseTime = time.Date(now.Year(), now.Month(), now.Day(), 0o0, 0o2, 0, 
0, now.Location())
        test_cases.Initialize(addr, baseTime)
        return []byte(addr)
 }, func(address []byte) {

Reply via email to