This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch test/multi-segments
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 75eecf971b8e3883b37e0b33575c608a9dc99997
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Sep 12 13:34:43 2025 +0800

    Refactor newSupplier function to remove unused nodeID parameter. Enhance 
searchSeriesList function to collect and sort segment results based on query 
options. Introduce segResultHeap for efficient sorting of segment results. Add 
tests for segResultHeap sorting and NPE prevention. Improve logging in syncer 
and write_liaison components for better traceability.
---
 banyand/measure/metadata.go                        |   6 +-
 banyand/measure/query.go                           | 217 ++++++++++---
 banyand/measure/query_test.go                      | 342 +++++++++++++++++++++
 banyand/measure/syncer.go                          |  24 +-
 banyand/measure/write_liaison.go                   |   3 +
 banyand/measure/write_standalone.go                |   3 +
 test/cases/measure/data/data.go                    |  21 ++
 test/cases/measure/data/input/linked_or.yaml       |   1 +
 test/cases/measure/data/want/entity_in.yaml        |   8 +-
 test/cases/measure/data/want/tag_filter_int.yaml   |   4 +-
 test/cases/measure/measure.go                      |  20 +-
 .../multi_segments/multi_segments_suite_test.go    | 154 ++++++++++
 .../multi_segments/multi_segments_suite_test.go    | 109 +++++++
 13 files changed, 849 insertions(+), 63 deletions(-)

diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index f9c64eb2..82017b7f 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -96,7 +96,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels 
map[string]string, n
        sr.Repository = resourceSchema.NewRepository(
                svc.metadata,
                svc.l,
-               newSupplier(path, svc, sr, nodeLabels, nodeID),
+               newSupplier(path, svc, sr, nodeLabels),
                resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
        )
        sr.start()
@@ -392,12 +392,11 @@ type supplier struct {
        l          *logger.Logger
        schemaRepo *schemaRepo
        nodeLabels map[string]string
-       nodeID     string
        path       string
        option     option
 }
 
-func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels 
map[string]string, nodeID string) *supplier {
+func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels 
map[string]string) *supplier {
        if svc.pm == nil {
                svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier")
        }
@@ -418,7 +417,6 @@ func newSupplier(path string, svc *standalone, sr 
*schemaRepo, nodeLabels map[st
                pm:         svc.pm,
                schemaRepo: sr,
                nodeLabels: nodeLabels,
-               nodeID:     nodeID,
        }
 }
 
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 066f732a..8e0fceeb 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -226,9 +226,14 @@ func (m *measure) searchSeriesList(ctx context.Context, 
series []*pbv1.Series, m
                        newTagProjection = append(newTagProjection, 
tagProjection)
                }
        }
+
+       // Collect all search results first
+       var segResults []*segResult
+       var needsSorting bool
        seriesFilter := roaring.NewPostingList()
+
        for i := range segments {
-               sd, _, err := segments[i].IndexDB().Search(ctx, series, 
storage.IndexSearchOpts{
+               sd, sortedValues, err := segments[i].IndexDB().Search(ctx, 
series, storage.IndexSearchOpts{
                        Query:       mqo.Query,
                        Order:       mqo.Order,
                        PreloadSize: preloadSize,
@@ -241,44 +246,135 @@ func (m *measure) searchSeriesList(ctx context.Context, 
series []*pbv1.Series, m
                        tt, cc := segments[i].Tables()
                        tables = append(tables, tt...)
                        caches = append(caches, cc...)
+
+                       // Create segResult for this segment
+                       sr := &segResult{
+                               SeriesData:   sd,
+                               sortedValues: sortedValues,
+                               i:            0,
+                       }
+                       segResults = append(segResults, sr)
+
+                       // Check if we need sorting
+                       if mqo.Order != nil && sortedValues != nil {
+                               needsSorting = true
+                       }
                }
-               for j := range sd.SeriesList {
-                       if seriesFilter.Contains(uint64(sd.SeriesList[j].ID)) {
+       }
+
+       // Sort if needed, otherwise use original order
+       if needsSorting && len(segResults) > 0 {
+               // Use segResultHeap to sort
+               segHeap := &segResultHeap{
+                       results:  segResults,
+                       sortDesc: mqo.Order.Sort == modelv1.Sort_SORT_DESC,
+               }
+               heap.Init(segHeap)
+
+               // Extract sorted series IDs
+               for segHeap.Len() > 0 {
+                       top := heap.Pop(segHeap).(*segResult)
+                       series := top.SeriesList[top.i]
+
+                       if seriesFilter.Contains(uint64(series.ID)) {
+                               // Move to next in this segment
+                               top.i++
+                               if top.i < len(top.SeriesList) {
+                                       heap.Push(segHeap, top)
+                               }
                                continue
                        }
-                       seriesFilter.Insert(uint64(sd.SeriesList[j].ID))
-                       sl = append(sl, sd.SeriesList[j].ID)
-                       if projectedEntityOffsets == nil && sd.Fields == nil {
-                               continue
+
+                       seriesFilter.Insert(uint64(series.ID))
+                       sl = append(sl, series.ID)
+
+                       // Build storedIndexValue for this series
+                       var fieldResult map[string][]byte
+                       if top.Fields != nil && top.i < len(top.Fields) {
+                               fieldResult = top.Fields[top.i]
                        }
-                       if storedIndexValue == nil {
-                               storedIndexValue = 
make(map[common.SeriesID]map[string]*modelv1.TagValue)
+                       storedIndexValue = m.buildStoredIndexValue(
+                               series.ID,
+                               series.EntityValues,
+                               fieldResult,
+                               projectedEntityOffsets,
+                               fieldToValueType,
+                               storedIndexValue,
+                       )
+
+                       // Move to next in this segment
+                       top.i++
+                       if top.i < len(top.SeriesList) {
+                               heap.Push(segHeap, top)
                        }
-                       tagValues := make(map[string]*modelv1.TagValue)
-                       storedIndexValue[sd.SeriesList[j].ID] = tagValues
-                       for name, offset := range projectedEntityOffsets {
-                               if offset < 0 || offset >= 
len(sd.SeriesList[j].EntityValues) {
-                                       logger.Warningf("offset %d for tag %s 
is out of range for series ID %v", offset, name, sd.SeriesList[j].ID)
-                                       tagValues[name] = pbv1.NullTagValue
+               }
+       } else {
+               // Original logic when no sorting is needed
+               for _, sr := range segResults {
+                       for j := range sr.SeriesList {
+                               if 
seriesFilter.Contains(uint64(sr.SeriesList[j].ID)) {
                                        continue
                                }
-                               tagValues[name] = 
sd.SeriesList[j].EntityValues[offset]
-                       }
-                       if sd.Fields == nil {
-                               continue
-                       }
-                       for f, v := range sd.Fields[j] {
-                               if tnt, ok := fieldToValueType[f]; ok {
-                                       tagValues[tnt.fieldName] = 
mustDecodeTagValue(tnt.typ, v)
-                               } else {
-                                       logger.Panicf("unknown field %s not 
found in fieldToValueType", f)
+                               seriesFilter.Insert(uint64(sr.SeriesList[j].ID))
+                               sl = append(sl, sr.SeriesList[j].ID)
+
+                               var fieldResult map[string][]byte
+                               if sr.Fields != nil && j < len(sr.Fields) {
+                                       fieldResult = sr.Fields[j]
                                }
+                               storedIndexValue = m.buildStoredIndexValue(
+                                       sr.SeriesList[j].ID,
+                                       sr.SeriesList[j].EntityValues,
+                                       fieldResult,
+                                       projectedEntityOffsets,
+                                       fieldToValueType,
+                                       storedIndexValue,
+                               )
                        }
                }
        }
+
        return sl, tables, caches, storedIndexValue, newTagProjection, nil
 }
 
+func (m *measure) buildStoredIndexValue(
+       seriesID common.SeriesID,
+       entityValues []*modelv1.TagValue,
+       fieldResult map[string][]byte,
+       projectedEntityOffsets map[string]int,
+       fieldToValueType map[string]tagNameWithType,
+       storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
+) map[common.SeriesID]map[string]*modelv1.TagValue {
+       if projectedEntityOffsets == nil && fieldResult == nil {
+               return storedIndexValue
+       }
+
+       if storedIndexValue == nil {
+               storedIndexValue = 
make(map[common.SeriesID]map[string]*modelv1.TagValue)
+       }
+       tagValues := make(map[string]*modelv1.TagValue)
+       storedIndexValue[seriesID] = tagValues
+
+       for name, offset := range projectedEntityOffsets {
+               if offset < 0 || offset >= len(entityValues) {
+                       logger.Warningf("offset %d for tag %s is out of range 
for series ID %v", offset, name, seriesID)
+                       tagValues[name] = pbv1.NullTagValue
+                       continue
+               }
+               tagValues[name] = entityValues[offset]
+       }
+
+       for f, v := range fieldResult {
+               if tnt, ok := fieldToValueType[f]; ok {
+                       tagValues[tnt.fieldName] = mustDecodeTagValue(tnt.typ, 
v)
+               } else {
+                       logger.Panicf("unknown field %s not found in 
fieldToValueType", f)
+               }
+       }
+
+       return storedIndexValue
+}
+
 func (m *measure) buildIndexQueryResult(ctx context.Context, mqo 
model.MeasureQueryOptions,
        segments []storage.Segment[*tsTable, option],
 ) (model.MeasureQueryResult, error) {
@@ -351,11 +447,17 @@ func (m *measure) buildIndexQueryResult(ctx 
context.Context, mqo model.MeasureQu
                if len(sr.SeriesList) < 1 {
                        continue
                }
-               r.segResults = append(r.segResults, sr)
+               r.segResults.results = append(r.segResults.results, sr)
        }
-       if len(r.segResults) < 1 {
+       if len(r.segResults.results) < 1 {
                return nilResult, nil
        }
+
+       // Set sort order based on mqo.Order.Sort
+       if mqo.Order != nil && mqo.Order.Sort == modelv1.Sort_SORT_DESC {
+               r.segResults.sortDesc = true
+       }
+
        heap.Init(&r.segResults)
        return r, nil
 }
@@ -770,18 +872,18 @@ type indexSortResult struct {
 
 // Pull implements model.MeasureQueryResult.
 func (iqr *indexSortResult) Pull() *model.MeasureResult {
-       if len(iqr.segResults) < 1 {
+       if len(iqr.segResults.results) < 1 {
                return nil
        }
-       if len(iqr.segResults) == 1 {
-               if iqr.segResults[0].i >= len(iqr.segResults[0].SeriesList) {
+       if len(iqr.segResults.results) == 1 {
+               if iqr.segResults.results[0].i >= 
len(iqr.segResults.results[0].SeriesList) {
                        return nil
                }
-               sr := iqr.segResults[0]
+               sr := iqr.segResults.results[0]
                r := iqr.copyTo(sr)
                sr.i++
                if sr.i >= len(sr.SeriesList) {
-                       iqr.segResults = iqr.segResults[:0]
+                       iqr.segResults.results = iqr.segResults.results[:0]
                }
                return r
        }
@@ -861,25 +963,56 @@ func (sr *segResult) remove(i int) {
        }
 }
 
-type segResultHeap []*segResult
+type segResultHeap struct {
+       results  []*segResult
+       sortDesc bool
+}
+
+func (h *segResultHeap) Len() int { return len(h.results) }
+func (h *segResultHeap) Less(i, j int) bool {
+       // Handle NPE - check for nil results or invalid indices
+       if i >= len(h.results) || j >= len(h.results) {
+               return false
+       }
+       if h.results[i] == nil || h.results[j] == nil {
+               return false
+       }
+       if h.results[i].i >= len(h.results[i].SeriesList) || h.results[j].i >= 
len(h.results[j].SeriesList) {
+               return false
+       }
+
+       // If no sortedValues, compare by SeriesID
+       if h.results[i].sortedValues == nil || h.results[j].sortedValues == nil 
{
+               if h.sortDesc {
+                       return h.results[i].SeriesList[h.results[i].i].ID > 
h.results[j].SeriesList[h.results[j].i].ID
+               }
+               return h.results[i].SeriesList[h.results[i].i].ID < 
h.results[j].SeriesList[h.results[j].i].ID
+       }
+
+       // Handle potential index out of bounds for sortedValues
+       if h.results[i].i >= len(h.results[i].sortedValues) || h.results[j].i 
>= len(h.results[j].sortedValues) {
+               if h.sortDesc {
+                       return h.results[i].SeriesList[h.results[i].i].ID > 
h.results[j].SeriesList[h.results[j].i].ID
+               }
+               return h.results[i].SeriesList[h.results[i].i].ID < 
h.results[j].SeriesList[h.results[j].i].ID
+       }
 
-func (h segResultHeap) Len() int { return len(h) }
-func (h segResultHeap) Less(i, j int) bool {
-       if h[i].sortedValues == nil {
-               return h[i].SeriesList[h[i].i].ID < h[j].SeriesList[h[j].i].ID
+       cmp := bytes.Compare(h.results[i].sortedValues[h.results[i].i], 
h.results[j].sortedValues[h.results[j].i])
+       if h.sortDesc {
+               return cmp > 0
        }
-       return bytes.Compare(h[i].sortedValues[h[i].i], 
h[j].sortedValues[h[j].i]) < 0
+       return cmp < 0
 }
-func (h segResultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+func (h *segResultHeap) Swap(i, j int) { h.results[i], h.results[j] = 
h.results[j], h.results[i] }
 
 func (h *segResultHeap) Push(x interface{}) {
-       *h = append(*h, x.(*segResult))
+       h.results = append(h.results, x.(*segResult))
 }
 
 func (h *segResultHeap) Pop() interface{} {
-       old := *h
+       old := h.results
        n := len(old)
        x := old[n-1]
-       *h = old[0 : n-1]
+       h.results = old[0 : n-1]
        return x
 }
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 1f2d24a8..85d1af5f 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -34,6 +34,7 @@ import (
        itest "github.com/apache/skywalking-banyandb/banyand/internal/test"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -1515,3 +1516,344 @@ func TestQueryResult_QuotaExceeded(t *testing.T) {
                })
        }
 }
+
+func TestSegResultHeap_Sorting(t *testing.T) {
+       tests := []struct {
+               name        string
+               segResults  []*segResult
+               expectOrder []int
+               sortDesc    bool
+       }{
+               {
+                       name:     "Sort ascending by SeriesID (no 
sortedValues)",
+                       sortDesc: false,
+                       segResults: []*segResult{
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(30)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(10)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(20)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                       },
+                       expectOrder: []int{1, 0, 2}, // SeriesID order: 10, 30, 
20
+               },
+               {
+                       name:     "Sort descending by SeriesID (no 
sortedValues)",
+                       sortDesc: true,
+                       segResults: []*segResult{
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(10)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(30)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(20)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                       },
+                       expectOrder: []int{1, 0, 2}, // SeriesID order: 30, 10, 
20
+               },
+               {
+                       name:     "Sort ascending by sortedValues",
+                       sortDesc: false,
+                       segResults: []*segResult{
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(1)},
+                                               },
+                                       },
+                                       sortedValues: 
[][]byte{[]byte("charlie")},
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(2)},
+                                               },
+                                       },
+                                       sortedValues: [][]byte{[]byte("alpha")},
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(3)},
+                                               },
+                                       },
+                                       sortedValues: [][]byte{[]byte("beta")},
+                                       i:            0,
+                               },
+                       },
+                       expectOrder: []int{1, 0, 2}, // alpha, charlie, beta
+               },
+               {
+                       name:     "Sort descending by sortedValues",
+                       sortDesc: true,
+                       segResults: []*segResult{
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(1)},
+                                               },
+                                       },
+                                       sortedValues: [][]byte{[]byte("alpha")},
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(2)},
+                                               },
+                                       },
+                                       sortedValues: 
[][]byte{[]byte("charlie")},
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(3)},
+                                               },
+                                       },
+                                       sortedValues: [][]byte{[]byte("beta")},
+                                       i:            0,
+                               },
+                       },
+                       expectOrder: []int{1, 0, 2}, // charlie, alpha, beta
+               },
+               {
+                       name:     "Mixed sortedValues and nil sortedValues 
ascending",
+                       sortDesc: false,
+                       segResults: []*segResult{
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(30)},
+                                               },
+                                       },
+                                       sortedValues: nil, // Will use SeriesID 
for sorting
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(10)},
+                                               },
+                                       },
+                                       sortedValues: [][]byte{[]byte("zzz")}, 
// Should come after nil sortedValues when sorted by SeriesID
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(20)},
+                                               },
+                                       },
+                                       sortedValues: nil, // Will use SeriesID 
for sorting
+                                       i:            0,
+                               },
+                       },
+                       expectOrder: []int{1, 0, 2}, // SeriesID 10, 30, 20 
(nil sortedValues use SeriesID)
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Create and initialize heap
+                       heap := segResultHeap{
+                               results:  make([]*segResult, 0),
+                               sortDesc: tt.sortDesc,
+                       }
+
+                       // Add all results to heap
+                       for _, sr := range tt.segResults {
+                               heap.results = append(heap.results, sr)
+                       }
+
+                       // Initialize heap
+                       require.Equal(t, len(tt.segResults), heap.Len())
+
+                       // Sort using Go's heap
+                       heapImpl := &heap
+                       heap2 := make([]*segResult, len(tt.segResults))
+                       copy(heap2, heap.results)
+
+                       // Sort manually to get expected order
+                       sort.Slice(heap2, func(i, j int) bool {
+                               return heapImpl.Less(i, j)
+                       })
+
+                       // Verify the order matches expectation
+                       for i, expectedIdx := range tt.expectOrder {
+                               actual := heap2[i]
+                               expected := tt.segResults[expectedIdx]
+                               require.Equal(t, 
expected.SeriesList[expected.i].ID, actual.SeriesList[actual.i].ID,
+                                       "Position %d: expected SeriesID %d, got 
%d", i, expected.SeriesList[expected.i].ID, actual.SeriesList[actual.i].ID)
+                       }
+               })
+       }
+}
+
+func TestSegResultHeap_NPE_Prevention(t *testing.T) {
+       tests := []struct {
+               name       string
+               segResults []*segResult
+               i, j       int
+               expectLess bool
+       }{
+               {
+                       name:       "Out of bounds indices",
+                       segResults: []*segResult{{SeriesData: 
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}}, i: 0}},
+                       i:          0,
+                       j:          5, // Out of bounds
+                       expectLess: false,
+               },
+               {
+                       name:       "Nil segResult",
+                       segResults: []*segResult{nil, {SeriesData: 
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}}, i: 0}},
+                       i:          0,
+                       j:          1,
+                       expectLess: false,
+               },
+               {
+                       name: "Index out of bounds for SeriesList",
+                       segResults: []*segResult{
+                               {SeriesData: storage.SeriesData{SeriesList: 
pbv1.SeriesList{{ID: 1}}}, i: 5}, // i is out of bounds
+                               {SeriesData: storage.SeriesData{SeriesList: 
pbv1.SeriesList{{ID: 2}}}, i: 0},
+                       },
+                       i:          0,
+                       j:          1,
+                       expectLess: false,
+               },
+               {
+                       name: "Index out of bounds for sortedValues",
+                       segResults: []*segResult{
+                               {
+                                       SeriesData:   
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}},
+                                       sortedValues: [][]byte{[]byte("test")},
+                                       i:            5, // i is out of bounds 
for sortedValues
+                               },
+                               {
+                                       SeriesData:   
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 2}}},
+                                       sortedValues: [][]byte{[]byte("test2")},
+                                       i:            0,
+                               },
+                       },
+                       i:          0,
+                       j:          1,
+                       expectLess: false, // Should fallback to SeriesID 
comparison
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       heap := segResultHeap{
+                               results:  tt.segResults,
+                               sortDesc: false,
+                       }
+
+                       // This should not panic due to NPE prevention
+                       result := heap.Less(tt.i, tt.j)
+                       require.Equal(t, tt.expectLess, result)
+               })
+       }
+}
+
+func TestIndexSortResult_OrderBySortDesc(t *testing.T) {
+       tests := []struct {
+               name       string
+               sortOrder  modelv1.Sort
+               expectDesc bool
+       }{
+               {
+                       name:       "SORT_ASC should be ascending",
+                       sortOrder:  modelv1.Sort_SORT_ASC,
+                       expectDesc: false,
+               },
+               {
+                       name:       "SORT_UNSPECIFIED should be ascending",
+                       sortOrder:  modelv1.Sort_SORT_UNSPECIFIED,
+                       expectDesc: false,
+               },
+               {
+                       name:       "SORT_DESC should be descending",
+                       sortOrder:  modelv1.Sort_SORT_DESC,
+                       expectDesc: true,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Create mock segments and measure data
+                       mqo := model.MeasureQueryOptions{
+                               Order: &index.OrderBy{
+                                       Sort: tt.sortOrder,
+                               },
+                       }
+
+                       // Create a simple segResult
+                       sr := &segResult{
+                               SeriesData: storage.SeriesData{
+                                       SeriesList: pbv1.SeriesList{
+                                               &pbv1.Series{ID: 
common.SeriesID(1)},
+                                               &pbv1.Series{ID: 
common.SeriesID(2)},
+                                       },
+                                       Timestamps: []int64{1000, 2000},
+                                       Versions:   []int64{1, 2},
+                               },
+                               sortedValues: [][]byte{[]byte("test1"), 
[]byte("test2")},
+                               i:            0,
+                       }
+
+                       // Create index sort result
+                       r := &indexSortResult{
+                               tfl: []tagFamilyLocation{},
+                               segResults: segResultHeap{
+                                       results:  []*segResult{sr},
+                                       sortDesc: false, // This should be set 
by buildIndexQueryResult
+                               },
+                       }
+
+                       // Simulate the logic from buildIndexQueryResult
+                       if mqo.Order != nil && mqo.Order.Sort == 
modelv1.Sort_SORT_DESC {
+                               r.segResults.sortDesc = true
+                       }
+
+                       // Verify the sort order was set correctly
+                       require.Equal(t, tt.expectDesc, r.segResults.sortDesc)
+               })
+       }
+}
diff --git a/banyand/measure/syncer.go b/banyand/measure/syncer.go
index f91841b3..791a97e0 100644
--- a/banyand/measure/syncer.go
+++ b/banyand/measure/syncer.go
@@ -20,6 +20,7 @@ package measure
 import (
        "context"
        "fmt"
+       "strings"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/data"
@@ -230,7 +231,28 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                        // Create streaming reader for the part.
                        files, release := createPartFileReaders(part)
                        releaseFuncs = append(releaseFuncs, release)
-
+                       builder := strings.Builder{}
+                       for i := range part.primaryBlockMetadata {
+                               offset := part.primaryBlockMetadata[i].offset
+                               size := part.primaryBlockMetadata[i].size
+                               buf := make([]byte, size)
+                               part.primary.Read(int64(offset), buf)
+                               uncompressedBuf, err := zstd.Decompress(nil, 
buf)
+                               if err != nil {
+                                       return fmt.Errorf("cannot decompress 
block metadata: %w", err)
+                               }
+                               blockMetadata, err := 
unmarshalBlockMetadata(nil, uncompressedBuf)
+                               if err != nil {
+                                       return fmt.Errorf("cannot unmarshal 
block metadata: %w", err)
+                               }
+                               for _, block := range blockMetadata {
+                                       builder.WriteString(fmt.Sprintf("%v", 
block.seriesID))
+                                       builder.WriteString(",")
+                               }
+                       }
+                       timeStart := time.Unix(0, 
part.partMetadata.MinTimestamp)
+                       timeEnd := time.Unix(0, part.partMetadata.MaxTimestamp)
+                       fmt.Printf("snp %v primary block metadata: %v total 
count: %v time range: %v-%v group: %v shard: %v - %v\n", curSnapshot.epoch, 
builder.String(), part.partMetadata.TotalCount, timeStart, timeEnd, tst.group, 
tst.shardID, time.Now().Format(time.StampNano))
                        // Create streaming part sync data.
                        streamingParts = append(streamingParts, 
queue.StreamingPartData{
                                ID:                    part.partMetadata.ID,
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 7b287ff5..6bae9dc4 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -108,6 +108,9 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                for j := range g.tables {
                        es := g.tables[j]
                        if es.tsTable != nil && es.dataPoints != nil {
+                               for i := range es.dataPoints.timestamps {
+                                       fmt.Printf("series id: %v timestamp: %v 
time range: %v\n", es.dataPoints.seriesIDs[i], es.dataPoints.timestamps[i], 
es.timeRange)
+                               }
                                es.tsTable.mustAddDataPoints(es.dataPoints)
                                releaseDataPoints(es.dataPoints)
                        }
diff --git a/banyand/measure/write_standalone.go 
b/banyand/measure/write_standalone.go
index dc7fd8b1..759af0e2 100644
--- a/banyand/measure/write_standalone.go
+++ b/banyand/measure/write_standalone.go
@@ -80,6 +80,9 @@ func processDataPoint(dpt *dataPointsInTable, req 
*measurev1.WriteRequest, write
        if err := series.Marshal(); err != nil {
                return 0, fmt.Errorf("cannot marshal series: %w", err)
        }
+       if req.Metadata.Name == "service_cpm_minute" {
+               fmt.Printf("entity values: %v time range: %v series id: %v\n", 
writeEvent.EntityValues, dpt.timeRange, series.ID)
+       }
 
        if stm.schema.IndexMode {
                fields := handleIndexMode(stm.schema, req, is.indexRuleLocators)
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 77274d08..1e8bb2b2 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -23,6 +23,7 @@ import (
        "embed"
        "encoding/json"
        "io"
+       "slices"
        "time"
 
        "github.com/google/go-cmp/cmp"
@@ -75,6 +76,26 @@ func verifyWithContext(ctx context.Context, innerGm 
gm.Gomega, sharedContext hel
        innerGm.Expect(err).NotTo(gm.HaveOccurred())
        want := &measurev1.QueryResponse{}
        helpers.UnmarshalYAML(ww, want)
+       if args.DisOrder {
+               slices.SortFunc(want.DataPoints, func(a, b 
*measurev1.DataPoint) int {
+                       if a.Sid != b.Sid {
+                               if a.Sid < b.Sid {
+                                       return -1
+                               }
+                               return 1
+                       }
+                       return 
a.Timestamp.AsTime().Compare(b.Timestamp.AsTime())
+               })
+               slices.SortFunc(resp.DataPoints, func(a, b 
*measurev1.DataPoint) int {
+                       if a.Sid != b.Sid {
+                               if a.Sid < b.Sid {
+                                       return -1
+                               }
+                               return 1
+                       }
+                       return 
a.Timestamp.AsTime().Compare(b.Timestamp.AsTime())
+               })
+       }
        for i := range resp.DataPoints {
                if resp.DataPoints[i].Timestamp != nil {
                        
innerGm.Expect(resp.DataPoints[i].Version).Should(gm.BeNumerically(">", 0))
diff --git a/test/cases/measure/data/input/linked_or.yaml 
b/test/cases/measure/data/input/linked_or.yaml
index d2662a84..898a02a2 100644
--- a/test/cases/measure/data/input/linked_or.yaml
+++ b/test/cases/measure/data/input/linked_or.yaml
@@ -16,6 +16,7 @@
 # under the License.
 
 name: "service_cpm_minute"
+trace: true
 groups: ["sw_metric"]
 tagProjection:
   tagFamilies:
diff --git a/test/cases/measure/data/want/entity_in.yaml 
b/test/cases/measure/data/want/entity_in.yaml
index 73ba584e..125eb6aa 100644
--- a/test/cases/measure/data/want/entity_in.yaml
+++ b/test/cases/measure/data/want/entity_in.yaml
@@ -22,19 +22,19 @@ dataPoints:
     - key: name
       value:
         str:
-          value: service_name_1
+          value: service_name_2
     - key: short_name
       value:
         str:
-          value: service_short_name_1
+          value: service_short_name_2
 - tagFamilies:
   - name: default
     tags:
     - key: name
       value:
         str:
-          value: service_name_2
+          value: service_name_1
     - key: short_name
       value:
         str:
-          value: service_short_name_2
+          value: service_short_name_1
diff --git a/test/cases/measure/data/want/tag_filter_int.yaml 
b/test/cases/measure/data/want/tag_filter_int.yaml
index 5a726b9d..3a6ffc22 100644
--- a/test/cases/measure/data/want/tag_filter_int.yaml
+++ b/test/cases/measure/data/want/tag_filter_int.yaml
@@ -22,7 +22,7 @@ dataPoints:
           - key: name
             value:
               str:
-                value: service_name_1
+                value: service_name_3
           - key: layer
             value:
               int:
@@ -33,7 +33,7 @@ dataPoints:
           - key: name
             value:
               str:
-                value: service_name_3
+                value: service_name_1
           - key: layer
             value:
               int:
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index aeacde4f..12a693ca 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -44,7 +44,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("all only fields", helpers.Args{Input: "all_only_fields", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("the max limit", helpers.Args{Input: "all_max_limit", Want: 
"all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("filter by a integer tag", helpers.Args{Input: 
"tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("filter by a integer tag", helpers.Args{Input: 
"tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
DisOrder: true}),
        g.Entry("filter by an unknown tag", helpers.Args{Input: 
"tag_filter_unknown", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
WantEmpty: true}),
        g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("group and min", helpers.Args{Input: "group_min", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
@@ -62,24 +62,24 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("match a node", helpers.Args{Input: "match_node", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 
25 * time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("filter by several entity ids", helpers.Args{Input: 
"entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("filter by entity id and service id", helpers.Args{Input: 
"entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 
25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
+       g.Entry("filter by several entity ids", helpers.Args{Input: 
"entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: 
true}),
+       g.Entry("filter by entity id and service id", helpers.Args{Input: 
"entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
DisOrder: true}),
+       g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
        g.Entry("invalid logical expression", helpers.Args{Input: 
"err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
WantErr: true}),
-       g.Entry("linked or expressions", helpers.Args{Input: "linked_or", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.FEntry("linked or expressions", helpers.Args{Input: "linked_or", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("In and not In expressions", helpers.Args{Input: "in", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("match a tag belongs to the entity", helpers.Args{Input: 
"entity_match", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
        g.Entry("all of index mode in a larger time range",
-               helpers.Args{Input: "index_mode_all", Want: 
"index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour}),
-       g.Entry("all in all segments of index mode", helpers.Args{Input: 
"index_mode_all", Want: "index_mode_all_segs", Duration: 96 * time.Hour, 
Offset: -72 * time.Hour}),
+               helpers.Args{Input: "index_mode_all", Want: 
"index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour, 
DisOrder: true}),
+       g.Entry("all in all segments of index mode", helpers.Args{Input: 
"index_mode_all", Want: "index_mode_all_segs", Duration: 96 * time.Hour, 
Offset: -72 * time.Hour, DisOrder: true}),
        g.Entry("order by desc of index mode", helpers.Args{Input: 
"index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 * 
time.Minute}),
-       g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
        g.Entry("none of index mode", helpers.Args{Input: "index_mode_none", 
WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("query by id in index mode", helpers.Args{Input: 
"index_mode_by_id", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("multi groups: unchanged", helpers.Args{Input: 
"multi_group_unchanged", Duration: 35 * time.Minute, Offset: -20 * 
time.Minute}),
diff --git 
a/test/integration/distributed/multi_segments/multi_segments_suite_test.go 
b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
new file mode 100644
index 00000000..bb1af63d
--- /dev/null
+++ b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
@@ -0,0 +1,154 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_multi_segments_test
+
+import (
+       "context"
+       "fmt"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
+       casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+       casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+       casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+)
+
+func TestDistributedMultiSegments(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Distributed Multi Segments Suite")
+}
+
+var (
+       connection *grpc.ClientConn
+       now        time.Time
+       baseTime   time.Time
+       deferFunc  func()
+       goods      []gleak.Goroutine
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       goods = gleak.Goroutines()
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+
+       By("Starting etcd server")
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       dir, spaceDef, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(dir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       Expect(err).ShouldNot(HaveOccurred())
+       <-server.ReadyNotify()
+
+       By("Loading schema")
+       schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{ep}),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       defer schemaRegistry.Close()
+       ctx := context.Background()
+       test_stream.PreloadSchema(ctx, schemaRegistry)
+       test_measure.PreloadSchema(ctx, schemaRegistry)
+
+       By("Starting data node 0")
+       closeDataNode0 := setup.DataNode(ep)
+       By("Starting data node 1")
+       closeDataNode1 := setup.DataNode(ep)
+       By("Starting liaison node")
+       liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+
+       By("Initializing test cases")
+       ns := timestamp.NowMilli().UnixNano()
+       now = time.Unix(0, ns-ns%int64(time.Minute))
+       baseTime = time.Date(now.Year(), now.Month(), now.Day(), 00, 02, 0, 0, 
now.Location())
+       test_cases.Initialize(liaisonAddr, baseTime)
+
+       deferFunc = func() {
+               closerLiaisonNode()
+               closeDataNode0()
+               closeDataNode1()
+               _ = server.Close()
+               <-server.StopNotify()
+               spaceDef()
+       }
+       return []byte(liaisonAddr)
+}, func(address []byte) {
+       var err error
+       connection, err = grpchelper.Conn(string(address), 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       casesstream.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casesmeasure.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casestopn.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Multi Segments Suite", func(report 
Report) {
+       if report.SuiteSucceeded {
+               if deferFunc != nil {
+                       deferFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+       }
+})
diff --git 
a/test/integration/standalone/multi_segments/multi_segments_suite_test.go 
b/test/integration/standalone/multi_segments/multi_segments_suite_test.go
new file mode 100644
index 00000000..b3ea9011
--- /dev/null
+++ b/test/integration/standalone/multi_segments/multi_segments_suite_test.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_query_test
+
+import (
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
+       casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+       casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+       casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+       casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
+       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+)
+
+func TestIntegrationMultiSegments(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Integration Multi Segments Suite", 
Label(integration_standalone.Labels...))
+}
+
+var (
+       connection *grpc.ClientConn
+       now        time.Time
+       baseTime   time.Time
+       deferFunc  func()
+       goods      []gleak.Goroutine
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       goods = gleak.Goroutines()
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       var addr string
+       addr, _, deferFunc = setup.Standalone()
+       ns := timestamp.NowMilli().UnixNano()
+       now = time.Unix(0, ns-ns%int64(time.Minute))
+       baseTime = time.Date(now.Year(), now.Month(), now.Day(), 00, 02, 0, 0, 
now.Location())
+       test_cases.Initialize(addr, baseTime)
+       return []byte(addr)
+}, func(address []byte) {
+       var err error
+       connection, err = grpchelper.Conn(string(address), 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       casesstream.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casesmeasure.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casestopn.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casestrace.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Integration Query Suite", func(report Report) {
+       if report.SuiteSucceeded {
+               if deferFunc != nil {
+                       deferFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+       }
+})


Reply via email to