This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 2de59d80 Add query test cases of lifecycle (#660)
2de59d80 is described below

commit 2de59d80f7edbc57ada66212bcfdc79f5b3b28a9
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Apr 29 09:46:03 2025 +0800

    Add query test cases of lifecycle (#660)
---
 api/proto/banyandb/database/v1/database.proto      |   4 -
 banyand/backup/lifecycle/steps.go                  |   1 +
 banyand/dquery/dquery.go                           |  11 +-
 banyand/dquery/measure.go                          |   4 +
 banyand/dquery/stream.go                           |   4 +
 banyand/dquery/topn.go                             |   4 +
 banyand/internal/storage/rotation_test.go          |  89 +--------
 banyand/internal/storage/segment.go                |  76 +++-----
 banyand/internal/storage/segment_test.go           |   4 -
 banyand/internal/storage/tsdb.go                   |   7 +-
 banyand/measure/metadata.go                        |   1 -
 banyand/metadata/client.go                         |  49 -----
 banyand/metadata/metadata.go                       |   2 -
 banyand/metadata/schema/register_test.go           |  25 ---
 banyand/queue/pub/client_test.go                   |  66 -------
 banyand/queue/pub/pub.go                           |   9 +-
 banyand/queue/pub/pub_suite_test.go                |   3 +-
 banyand/queue/pub/pub_test.go                      | 189 +------------------
 banyand/stream/metadata.go                         |   1 -
 docs/api-reference.md                              | 206 ++++++++++-----------
 pkg/test/helpers/context.go                        |  32 ++--
 pkg/test/measure/etcd.go                           |  72 ++++---
 .../measure/testdata/group_stages/exception.json   |  18 ++
 .../measure/testdata/group_stages/index_mode.json  |  18 ++
 .../{groups => group_stages}/sw_metric.json        |   0
 pkg/test/measure/testdata/groups/sw_metric.json    |  17 +-
 pkg/test/stream/etcd.go                            |  65 +++++--
 pkg/test/stream/testdata/group.json                |  17 +-
 .../{group.json => group_with_stages.json}         |   0
 test/cases/lifecycle/lifecycle.go                  |  87 ++++++++-
 test/cases/measure/data/data.go                    |   1 +
 test/cases/stream/data/data.go                     |  18 +-
 test/cases/topn/data/data.go                       |   1 +
 test/e2e-v2/script/env                             |   2 +-
 .../distributed/lifecycle/lifecycle_suite_test.go  |  37 ++--
 35 files changed, 422 insertions(+), 718 deletions(-)

diff --git a/api/proto/banyandb/database/v1/database.proto 
b/api/proto/banyandb/database/v1/database.proto
index a5caddba..2b4de339 100644
--- a/api/proto/banyandb/database/v1/database.proto
+++ b/api/proto/banyandb/database/v1/database.proto
@@ -20,7 +20,6 @@ syntax = "proto3";
 package banyandb.database.v1;
 
 import "banyandb/common/v1/common.proto";
-import "banyandb/model/v1/query.proto";
 import "google/protobuf/timestamp.proto";
 
 option go_package = 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
@@ -41,9 +40,6 @@ message Node {
   google.protobuf.Timestamp created_at = 5;
   // labels is a set of key-value pairs to describe the node.
   map<string, string> labels = 6;
-  // data_segments_boundary is the time range of the data segments that the 
node is responsible for.
-  // [start, end) is the time range.
-  map<string, model.v1.TimeRange> data_segments_boundary = 7;
 }
 
 message Shard {
diff --git a/banyand/backup/lifecycle/steps.go 
b/banyand/backup/lifecycle/steps.go
index 7816b881..620be9e9 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -249,6 +249,7 @@ func migrateMeasure(ctx context.Context, m 
*databasev1.Measure, result model.Mea
                                Metadata: m.Metadata,
                                DataPoint: &measurev1.DataPointValue{
                                        Timestamp: timestamppb.New(time.Unix(0, 
mr.Timestamps[i])),
+                                       Version:   mr.Versions[i],
                                },
                                MessageId: uint64(time.Now().UnixNano()),
                        }
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index 653aa811..74cedafe 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -151,8 +151,8 @@ func (q *queryService) parseNodeSelector(stages []string, 
resource *commonv1.Res
        }
 
        var nodeSelectors []string
-       for _, stage := range resource.Stages {
-               for _, sn := range stages {
+       for _, sn := range stages {
+               for _, stage := range resource.Stages {
                        if strings.EqualFold(sn, stage.Name) {
                                ns := stage.NodeSelector
                                ns = strings.TrimSpace(ns)
@@ -162,10 +162,9 @@ func (q *queryService) parseNodeSelector(stages []string, 
resource *commonv1.Res
                                nodeSelectors = append(nodeSelectors, ns)
                                break
                        }
-                       if strings.EqualFold(sn, hotStageName) && 
q.hotStageNodeSelector != "" {
-                               nodeSelectors = append(nodeSelectors, 
q.hotStageNodeSelector)
-                               break
-                       }
+               }
+               if strings.EqualFold(sn, hotStageName) && 
q.hotStageNodeSelector != "" {
+                       nodeSelectors = append(nodeSelectors, 
q.hotStageNodeSelector)
                }
        }
        if len(nodeSelectors) == 0 {
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index 56548d7a..1b1a62e0 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -88,6 +88,10 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (r
                if gs, ok := p.measureService.LoadGroup(g); ok {
                        if ns, exist := 
p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist {
                                nodeSelectors[g] = ns
+                       } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
+                               ml.Error().Strs("req_stages", 
queryCriteria.Stages).Strs("default_stages", 
gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
+                               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("no stage found in request or default stages in resource opts"))
+                               return
                        }
                } else {
                        ml.Error().RawJSON("req", 
logger.Proto(queryCriteria)).Msg("group not found")
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 6cdd2ae7..184e3438 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -84,6 +84,10 @@ func (p *streamQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (re
                if gs, ok := p.streamService.LoadGroup(g); ok {
                        if ns, exist := 
p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist {
                                nodeSelectors[g] = ns
+                       } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
+                               p.log.Error().Strs("req_stages", 
queryCriteria.Stages).Strs("default_stages", 
gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
+                               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("no stage found in request or default stages in resource opts"))
+                               return
                        }
                } else {
                        p.log.Error().RawJSON("req", 
logger.Proto(queryCriteria)).Msg("group not found")
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index b7adf5c1..1bb3ad99 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -71,6 +71,10 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                if gs, ok := t.measureService.LoadGroup(g); ok {
                        if ns, exist := t.parseNodeSelector(request.Stages, 
gs.GetSchema().ResourceOpts); exist {
                                nodeSelectors[g] = ns
+                       } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
+                               t.log.Error().Strs("req_stages", 
request.Stages).Strs("default_stages", 
gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
+                               resp = bus.NewMessage(now, common.NewError("no 
stage found in request or default stages in resource opts"))
+                               return
                        }
                } else {
                        t.log.Error().Str("group", g).Msg("failed to load 
group")
diff --git a/banyand/internal/storage/rotation_test.go 
b/banyand/internal/storage/rotation_test.go
index 43f292ef..ccc9e271 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -19,7 +19,6 @@ package storage
 
 import (
        "context"
-       "sync"
        "testing"
        "time"
 
@@ -27,7 +26,6 @@ import (
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -36,77 +34,9 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-type boundaryUpdateTracker struct {
-       updates []*modelv1.TimeRange
-       called  int
-       mu      sync.Mutex
-}
-
-func (t *boundaryUpdateTracker) recordUpdate(_, _ string, boundary 
*modelv1.TimeRange) {
-       t.mu.Lock()
-       defer t.mu.Unlock()
-       t.updates = append(t.updates, boundary)
-       t.called++
-}
-
-func TestSegmentBoundaryUpdateFn(t *testing.T) {
-       t.Run("called when a segment is created", func(t *testing.T) {
-               tsdb, c, _, tracker, dfFn := setUpDB(t)
-               defer dfFn()
-
-               // Initial segment creation should have triggered it once
-               require.Equal(t, 1, tracker.called)
-
-               // Create new segment
-               ts := c.Now().Add(23*time.Hour + time.Second)
-               tsdb.Tick(ts.UnixNano())
-
-               // Verify the function was called again
-               assert.Eventually(t, func() bool {
-                       tracker.mu.Lock()
-                       defer tracker.mu.Unlock()
-                       return tracker.called >= 2
-               }, flags.EventuallyTimeout, time.Millisecond, "boundary update 
function should be called")
-       })
-
-       t.Run("called when a segment is deleted", func(t *testing.T) {
-               tsdb, c, segCtrl, tracker, dfFn := setUpDB(t)
-               defer dfFn()
-
-               ts := c.Now()
-               for i := 0; i < 4; i++ {
-                       ts = ts.Add(23 * time.Hour)
-                       c.Set(ts)
-                       tsdb.Tick(ts.UnixNano())
-                       t.Logf("current time: %s", ts.Format(time.RFC3339))
-                       expected := i + 2
-                       require.EventuallyWithTf(t, func(ct *assert.CollectT) {
-                               segments, _ := segCtrl.segments(false)
-                               if len(segments) != expected {
-                                       ct.Errorf("expect %d segments, got %d", 
expected, len(segments))
-                               }
-                       }, flags.EventuallyTimeout, time.Millisecond, "wait for 
%d segment to be created", expected)
-                       ts = ts.Add(time.Hour)
-               }
-
-               tracker.mu.Lock()
-               called := tracker.called
-               tracker.mu.Unlock()
-
-               c.Set(ts)
-               tsdb.Tick(ts.UnixNano())
-
-               assert.Eventually(t, func() bool {
-                       tracker.mu.Lock()
-                       defer tracker.mu.Unlock()
-                       return tracker.called > called
-               }, flags.EventuallyTimeout, time.Millisecond, "boundary update 
function should be called after deletion")
-       })
-}
-
 func TestForwardRotation(t *testing.T) {
        t.Run("create a new segment when the time is up", func(t *testing.T) {
-               tsdb, c, segCtrl, _, dfFn := setUpDB(t)
+               tsdb, c, segCtrl, dfFn := setUpDB(t)
                defer dfFn()
                ts := c.Now().Add(23*time.Hour + time.Second)
                t.Logf("current time: %s", ts.Format(time.RFC3339))
@@ -118,7 +48,7 @@ func TestForwardRotation(t *testing.T) {
        })
 
        t.Run("no new segment created when the time is not up", func(t 
*testing.T) {
-               tsdb, c, segCtrl, _, dfFn := setUpDB(t)
+               tsdb, c, segCtrl, dfFn := setUpDB(t)
                defer dfFn()
                ts := c.Now().Add(22*time.Hour + 59*time.Minute + 
59*time.Second)
                t.Logf("current time: %s", ts.Format(time.RFC3339))
@@ -132,7 +62,7 @@ func TestForwardRotation(t *testing.T) {
 
 func TestRetention(t *testing.T) {
        t.Run("delete the segment and index when the TTL is up", func(t 
*testing.T) {
-               tsdb, c, segCtrl, _, dfFn := setUpDB(t)
+               tsdb, c, segCtrl, dfFn := setUpDB(t)
                defer dfFn()
                ts := c.Now()
                for i := 0; i < 4; i++ {
@@ -161,7 +91,7 @@ func TestRetention(t *testing.T) {
        })
 
        t.Run("keep the segment volume stable", func(t *testing.T) {
-               tsdb, c, segCtrl, _, dfFn := setUpDB(t)
+               tsdb, c, segCtrl, dfFn := setUpDB(t)
                defer dfFn()
                ts := c.Now()
                for i := 0; i < 10; i++ {
@@ -207,22 +137,15 @@ func TestRetention(t *testing.T) {
        })
 }
 
-func setUpDB(t *testing.T) (*database[*MockTSTable, any], timestamp.MockClock, 
*segmentController[*MockTSTable, any], *boundaryUpdateTracker, func()) {
+func setUpDB(t *testing.T) (*database[*MockTSTable, any], timestamp.MockClock, 
*segmentController[*MockTSTable, any], func()) {
        dir, defFn := test.Space(require.New(t))
 
-       // Create tracker for boundary updates
-       tracker := &boundaryUpdateTracker{
-               updates: make([]*modelv1.TimeRange, 0),
-       }
-
        TSDBOpts := TSDBOpts[*MockTSTable, any]{
                Location:        dir,
                SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
                TTL:             IntervalRule{Unit: DAY, Num: 3},
                ShardNum:        1,
                TSTableCreator:  MockTSTableCreator,
-               // Add the boundary update function
-               SegmentBoundaryUpdateFn: tracker.recordUpdate,
        }
        ctx := context.Background()
        mc := timestamp.NewMockClock()
@@ -240,7 +163,7 @@ func setUpDB(t *testing.T) (*database[*MockTSTable, any], 
timestamp.MockClock, *
        db := tsdb.(*database[*MockTSTable, any])
        segments, _ := db.segmentController.segments(false)
        require.Equal(t, len(segments), 1)
-       return db, mc, db.segmentController, tracker, func() {
+       return db, mc, db.segmentController, func() {
                tsdb.Close()
                defFn()
        }
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 27bb2059..3193697a 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -30,11 +30,9 @@ import (
        "time"
 
        "github.com/pkg/errors"
-       "google.golang.org/protobuf/types/known/timestamppb"
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -278,41 +276,39 @@ func (s *segment[T, O]) String() string {
 }
 
 type segmentController[T TSTable, O any] struct {
-       clock                   timestamp.Clock
-       metrics                 Metrics
-       opts                    *TSDBOpts[T, O]
-       l                       *logger.Logger
-       indexMetrics            *inverted.Metrics
-       segmentBoundaryUpdateFn SegmentBoundaryUpdateFn
-       position                common.Position
-       db                      string
-       stage                   string
-       location                string
-       lst                     []*segment[T, O]
-       deadline                atomic.Int64
-       idleTimeout             time.Duration
-       optsMutex               sync.RWMutex
+       clock        timestamp.Clock
+       metrics      Metrics
+       opts         *TSDBOpts[T, O]
+       l            *logger.Logger
+       indexMetrics *inverted.Metrics
+       position     common.Position
+       db           string
+       stage        string
+       location     string
+       lst          []*segment[T, O]
+       deadline     atomic.Int64
+       idleTimeout  time.Duration
+       optsMutex    sync.RWMutex
        sync.RWMutex
 }
 
 func newSegmentController[T TSTable, O any](ctx context.Context, location 
string,
        l *logger.Logger, opts TSDBOpts[T, O], indexMetrics *inverted.Metrics, 
metrics Metrics,
-       segmentsBoundaryUpdateFn SegmentBoundaryUpdateFn, idleTimeout 
time.Duration,
+       idleTimeout time.Duration,
 ) *segmentController[T, O] {
        clock, _ := timestamp.GetClock(ctx)
        p := common.GetPosition(ctx)
        return &segmentController[T, O]{
-               location:                location,
-               opts:                    &opts,
-               l:                       l,
-               clock:                   clock,
-               position:                common.GetPosition(ctx),
-               metrics:                 metrics,
-               indexMetrics:            indexMetrics,
-               segmentBoundaryUpdateFn: segmentsBoundaryUpdateFn,
-               stage:                   p.Stage,
-               db:                      p.Database,
-               idleTimeout:             idleTimeout,
+               location:     location,
+               opts:         &opts,
+               l:            l,
+               clock:        clock,
+               position:     common.GetPosition(ctx),
+               metrics:      metrics,
+               indexMetrics: indexMetrics,
+               stage:        p.Stage,
+               db:           p.Database,
+               idleTimeout:  idleTimeout,
        }
 }
 
@@ -364,31 +360,9 @@ func (sc *segmentController[T, O]) createSegment(ts 
time.Time) (*segment[T, O],
        if err != nil {
                return nil, err
        }
-       sc.notifySegmentBoundaryUpdate()
        return s, s.incRef(context.WithValue(context.Background(), 
logger.ContextKey, sc.l))
 }
 
-func (sc *segmentController[T, O]) notifySegmentBoundaryUpdate() {
-       if sc.segmentBoundaryUpdateFn == nil {
-               return
-       }
-       // No error if we do not open closed segments.
-       segs, _ := sc.segments(false)
-       defer func() {
-               for i := range segs {
-                       segs[i].DecRef()
-               }
-       }()
-       var tr *modelv1.TimeRange
-       if len(segs) > 0 {
-               tr = &modelv1.TimeRange{
-                       Begin: timestamppb.New(segs[0].Start),
-                       End:   timestamppb.New(segs[len(segs)-1].End),
-               }
-       }
-       sc.segmentBoundaryUpdateFn(sc.stage, sc.db, tr)
-}
-
 func (sc *segmentController[T, O]) segments(reopenClosed bool) (ss 
[]*segment[T, O], err error) {
        sc.RLock()
        defer sc.RUnlock()
@@ -570,7 +544,6 @@ func (sc *segmentController[T, O]) remove(deadline 
time.Time) (hasSegment bool,
                }
                s.DecRef()
        }
-       sc.notifySegmentBoundaryUpdate()
        return hasSegment, err
 }
 
@@ -607,7 +580,6 @@ func (sc *segmentController[T, O]) 
deleteExpiredSegments(timeRange timestamp.Tim
                }
                s.DecRef()
        }
-       sc.notifySegmentBoundaryUpdate()
        return count
 }
 
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index 094bf475..87f62dbe 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -108,7 +108,6 @@ func TestSegmentOpenAndReopen(t *testing.T) {
                opts,
                nil,           // indexMetrics
                nil,           // metrics
-               nil,           // segmentBoundaryUpdateFn
                5*time.Minute, // idleTimeout
        )
 
@@ -199,7 +198,6 @@ func TestSegmentCloseIfIdle(t *testing.T) {
                opts,
                nil,         // indexMetrics
                nil,         // metrics
-               nil,         // segmentBoundaryUpdateFn
                time.Second, // Set short idle timeout for testing
        )
 
@@ -282,7 +280,6 @@ func TestCloseIdleAndSelectSegments(t *testing.T) {
                opts,
                nil,         // indexMetrics
                nil,         // metrics
-               nil,         // segmentBoundaryUpdateFn
                idleTimeout, // short idle timeout
        )
 
@@ -414,7 +411,6 @@ func TestOpenExistingSegmentWithShards(t *testing.T) {
                opts,
                nil,           // indexMetrics
                nil,           // metrics
-               nil,           // segmentBoundaryUpdateFn
                5*time.Minute, // idleTimeout
        )
 
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index eca40aa7..a0ff3eb7 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -29,7 +29,6 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
@@ -50,16 +49,12 @@ const (
        lockFilename = "lock"
 )
 
-// SegmentBoundaryUpdateFn is a callback function to update the segment 
boundary.
-type SegmentBoundaryUpdateFn func(stage, group string, boundary 
*modelv1.TimeRange)
-
 // TSDBOpts wraps options to create a tsdb.
 type TSDBOpts[T TSTable, O any] struct {
        Option                         O
        TableMetrics                   Metrics
        TSTableCreator                 TSTableCreator[T, O]
        StorageMetricsFactory          *observability.Factory
-       SegmentBoundaryUpdateFn        SegmentBoundaryUpdateFn
        Location                       string
        SegmentInterval                IntervalRule
        TTL                            IntervalRule
@@ -138,7 +133,7 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O]) (TSDB[
                tsEventCh: make(chan int64),
                p:         p,
                segmentController: newSegmentController(ctx, location,
-                       l, opts, indexMetrics, opts.TableMetrics, 
opts.SegmentBoundaryUpdateFn, opts.SegmentIdleTimeout),
+                       l, opts, indexMetrics, opts.TableMetrics, 
opts.SegmentIdleTimeout),
                metrics:          newMetrics(opts.StorageMetricsFactory),
                disableRetention: opts.DisableRetention,
        }
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 85560df8..dca0b6d2 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -419,7 +419,6 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
                SeriesIndexFlushTimeoutSeconds: 
s.option.flushTimeout.Nanoseconds() / int64(time.Second),
                SeriesIndexCacheMaxBytes:       
int(s.option.seriesCacheMaxSize),
                StorageMetricsFactory:          factory,
-               SegmentBoundaryUpdateFn:        
s.metadata.UpdateSegmentsBoundary,
                SegmentIdleTimeout:             segmentIdleTimeout,
        }
        return storage.OpenTSDB(
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 7e410af7..f935f432 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -21,7 +21,6 @@ import (
        "context"
        "os"
        "os/signal"
-       "strings"
        "sync"
        "syscall"
        "time"
@@ -33,7 +32,6 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -361,50 +359,3 @@ func contains(s []string, e string) bool {
        }
        return false
 }
-
-func (s *clientService) UpdateSegmentsBoundary(stage, group string, boundary 
*modelv1.TimeRange) {
-       s.nodeInfoMux.Lock()
-       defer s.nodeInfoMux.Unlock()
-       nodeInfo := s.nodeInfo
-       b := nodeInfo.DataSegmentsBoundary
-       if b == nil {
-               b = make(map[string]*modelv1.TimeRange)
-       }
-       key := getNodeBoundaryKey(group, stage)
-       if boundary == nil {
-               delete(b, key)
-       } else {
-               b[key] = boundary
-       }
-       nodeInfo.DataSegmentsBoundary = b
-       s.nodeInfo = nodeInfo
-       if err := s.schemaRegistry.UpdateNode(context.Background(), nodeInfo); 
err != nil {
-               logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to 
update node")
-       }
-}
-
-func getNodeBoundaryKey(items ...string) string {
-       return strings.Join(items, "|")
-}
-
-// FindSegmentsBoundary finds the segments boundary for the given group.
-func FindSegmentsBoundary(nodeInfo *databasev1.Node, group string) 
*modelv1.TimeRange {
-       b := nodeInfo.DataSegmentsBoundary
-       if len(b) == 0 {
-               return nil
-       }
-       result := &modelv1.TimeRange{}
-       for g, tr := range b {
-               segments := strings.Split(g, "|")
-               if segments[0] != group {
-                       continue
-               }
-               if result.Begin == nil || 
result.Begin.AsTime().After(tr.Begin.AsTime()) {
-                       result.Begin = tr.Begin
-               }
-               if result.End == nil || 
result.End.AsTime().Before(tr.End.AsTime()) {
-                       result.End = tr.End
-               }
-       }
-       return result
-}
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index e7e981e7..14467c50 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -24,7 +24,6 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -49,7 +48,6 @@ type Repo interface {
        GroupRegistry() schema.Group
        TopNAggregationRegistry() schema.TopNAggregation
        RegisterHandler(string, schema.Kind, schema.EventHandler)
-       UpdateSegmentsBoundary(stage, group string, boundary *modelv1.TimeRange)
        NodeRegistry() schema.Node
        PropertyRegistry() schema.Property
 }
diff --git a/banyand/metadata/schema/register_test.go 
b/banyand/metadata/schema/register_test.go
index 8280e150..1e9ceef8 100644
--- a/banyand/metadata/schema/register_test.go
+++ b/banyand/metadata/schema/register_test.go
@@ -26,11 +26,9 @@ import (
        "github.com/onsi/ginkgo/v2"
        "github.com/onsi/gomega"
        "github.com/onsi/gomega/gleak"
-       "google.golang.org/protobuf/types/known/timestamppb"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/test"
@@ -120,27 +118,4 @@ var _ = ginkgo.Describe("etcd_register", func() {
                        return err
                }, flags.EventuallyTimeout).ShouldNot(gomega.HaveOccurred())
        })
-
-       ginkgo.It("should update node's data segments boundary", func() {
-               gomega.Expect(r.Register(context.Background(), md, 
false)).ShouldNot(gomega.HaveOccurred())
-
-               n, err := r.GetNode(context.Background(), node)
-               gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-
-               newBoundary := &modelv1.TimeRange{
-                       Begin: timestamppb.New(time.Unix(1000, 0)),
-                       End:   timestamppb.New(time.Unix(2000, 0)),
-               }
-               n.DataSegmentsBoundary = 
map[string]*modelv1.TimeRange{"default": newBoundary}
-
-               gomega.Expect(r.UpdateNode(context.Background(), 
n)).ShouldNot(gomega.HaveOccurred())
-
-               updatedNode, err := r.GetNode(context.Background(), node)
-               gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-               
gomega.Expect(updatedNode.DataSegmentsBoundary).ShouldNot(gomega.BeNil())
-               
gomega.Expect(updatedNode.DataSegmentsBoundary).Should(gomega.HaveLen(1))
-               
gomega.Expect(updatedNode.DataSegmentsBoundary["default"]).ShouldNot(gomega.BeNil())
-               
gomega.Expect(updatedNode.DataSegmentsBoundary["default"].Begin.Seconds).Should(gomega.Equal(int64(1000)))
-               
gomega.Expect(updatedNode.DataSegmentsBoundary["default"].End.Seconds).Should(gomega.Equal(int64(2000)))
-       })
 })
diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go
index a96d7da8..43a43736 100644
--- a/banyand/queue/pub/client_test.go
+++ b/banyand/queue/pub/client_test.go
@@ -26,7 +26,6 @@ import (
        "github.com/onsi/gomega/gleak"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/health/grpc_health_v1"
-       "google.golang.org/protobuf/types/known/timestamppb"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
@@ -147,71 +146,6 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                        verifyClientsWithGomega(g, p, data.TopicStreamWrite, 1, 
0, 2, 1)
                }, flags.EventuallyTimeout).Should(gomega.Succeed())
        })
-
-       ginkgo.It("should update node when labels or data boundaries change", 
func() {
-               addr1 := getAddress()
-               closeFn := setup(addr1, codes.OK, 200*time.Millisecond)
-               defer closeFn()
-               p := newPub()
-               defer p.GracefulStop()
-
-               // Replace hard-coded "service" with the Service constant
-               group1 := svc
-               now := time.Now()
-               timeRange1 := &modelv1.TimeRange{
-                       Begin: timestamppb.New(now.Add(-3 * time.Hour)),
-                       End:   timestamppb.New(now.Add(-2 * time.Hour)),
-               }
-
-               initialLabels := map[string]string{
-                       "role": "ingest",
-                       "zone": "east",
-               }
-               initialBoundaries := map[string]*modelv1.TimeRange{
-                       group1: timeRange1,
-               }
-
-               node1 := getDataNodeWithLabels("node1", addr1, initialLabels, 
initialBoundaries)
-               p.OnAddOrUpdate(node1)
-               verifyClients(p, 1, 0, 1, 0)
-
-               p.mu.RLock()
-               registeredNode := p.registered["node1"]
-               
gomega.Expect(registeredNode.Labels).Should(gomega.Equal(initialLabels))
-               
gomega.Expect(registeredNode.DataSegmentsBoundary).Should(gomega.Equal(initialBoundaries))
-               p.mu.RUnlock()
-
-               updatedLabels := map[string]string{
-                       "role": "query",
-                       "zone": "east",
-                       "env":  "prod",
-               }
-               updatedNode1 := getDataNodeWithLabels("node1", addr1, 
updatedLabels, initialBoundaries)
-               p.OnAddOrUpdate(updatedNode1)
-
-               p.mu.RLock()
-               registeredNode = p.registered["node1"]
-               
gomega.Expect(registeredNode.Labels).Should(gomega.Equal(updatedLabels))
-               gomega.Expect(len(p.active)).Should(gomega.Equal(1))
-               p.mu.RUnlock()
-
-               timeRange2 := &modelv1.TimeRange{
-                       Begin: timestamppb.New(now.Add(-1 * time.Hour)),
-                       End:   timestamppb.New(now),
-               }
-               updatedBoundaries := map[string]*modelv1.TimeRange{
-                       group1:      timeRange1,
-                       "inventory": timeRange2,
-               }
-               updatedNode2 := getDataNodeWithLabels("node1", addr1, 
updatedLabels, updatedBoundaries)
-               p.OnAddOrUpdate(updatedNode2)
-
-               p.mu.RLock()
-               registeredNode = p.registered["node1"]
-               
gomega.Expect(registeredNode.DataSegmentsBoundary).Should(gomega.Equal(updatedBoundaries))
-               gomega.Expect(len(p.active)).Should(gomega.Equal(1))
-               p.mu.RUnlock()
-       })
 })
 
 func verifyClients(p *pub, active, evict, onAdd, onDelete int) {
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index da5efca0..ae63065d 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -45,7 +45,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
-       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var (
@@ -129,7 +128,7 @@ func (p *pub) Broadcast(timeout time.Duration, topic 
bus.Topic, messages bus.Mes
                        names[n.Metadata.GetName()] = struct{}{}
                }
        } else {
-               for g, sel := range messages.NodeSelectors() {
+               for _, sel := range messages.NodeSelectors() {
                        var matches []MatchFunc
                        if sel == nil {
                                matches = bypassMatches
@@ -143,12 +142,8 @@ func (p *pub) Broadcast(timeout time.Duration, topic 
bus.Topic, messages bus.Mes
                                }
                        }
                        for _, n := range nodes {
-                               tr := metadata.FindSegmentsBoundary(n, g)
-                               if tr == nil {
-                                       continue
-                               }
                                for _, m := range matches {
-                                       if m(n.Labels) && 
timestamp.PbHasOverlap(messages.TimeRange(), tr) {
+                                       if m(n.Labels) {
                                                names[n.Metadata.Name] = 
struct{}{}
                                                break
                                        }
diff --git a/banyand/queue/pub/pub_suite_test.go 
b/banyand/queue/pub/pub_suite_test.go
index 8a1001bb..e6bb424a 100644
--- a/banyand/queue/pub/pub_suite_test.go
+++ b/banyand/queue/pub/pub_suite_test.go
@@ -237,10 +237,9 @@ func getDataNode(name string, address string) 
schema.Metadata {
        }
 }
 
-func getDataNodeWithLabels(name string, address string, labels 
map[string]string, dataBoundary map[string]*modelv1.TimeRange) schema.Metadata {
+func getDataNodeWithLabels(name string, address string, labels 
map[string]string) schema.Metadata {
        node := getDataNode(name, address)
        nodeInfo := node.Spec.(*databasev1.Node)
        nodeInfo.Labels = labels
-       nodeInfo.DataSegmentsBoundary = dataBoundary
        return node
 }
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
index daaf2505..a8f4251b 100644
--- a/banyand/queue/pub/pub_test.go
+++ b/banyand/queue/pub/pub_test.go
@@ -311,21 +311,15 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                                "role": "ingest",
                                "zone": "east",
                        }
-                       node1Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange,
-                       }
 
                        node2Labels := map[string]string{
                                "role": "query",
                                "zone": "west",
                        }
-                       node2Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange,
-                       }
 
-                       node1 := getDataNodeWithLabels("node1", addr1, 
node1Labels, node1Boundaries)
+                       node1 := getDataNodeWithLabels("node1", addr1, 
node1Labels)
                        p.OnAddOrUpdate(node1)
-                       node2 := getDataNodeWithLabels("node2", addr2, 
node2Labels, node2Boundaries)
+                       node2 := getDataNodeWithLabels("node2", addr2, 
node2Labels)
                        p.OnAddOrUpdate(node2)
 
                        nodeSelectors := map[string][]string{
@@ -368,21 +362,15 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                                "role": "ingest",
                                "zone": "east",
                        }
-                       node1Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange,
-                       }
 
                        node2Labels := map[string]string{
                                "role": "query",
                                "zone": "west",
                        }
-                       node2Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange,
-                       }
 
-                       node1 := getDataNodeWithLabels("node1", addr1, 
node1Labels, node1Boundaries)
+                       node1 := getDataNodeWithLabels("node1", addr1, 
node1Labels)
                        p.OnAddOrUpdate(node1)
-                       node2 := getDataNodeWithLabels("node2", addr2, 
node2Labels, node2Boundaries)
+                       node2 := getDataNodeWithLabels("node2", addr2, 
node2Labels)
                        p.OnAddOrUpdate(node2)
 
                        nodeSelectors := map[string][]string{
@@ -399,174 +387,5 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                        gomega.Expect(messages).Should(gomega.HaveLen(1))
                })
-
-               ginkgo.It("should broadcast messages based on time range 
overlap", func() {
-                       addr1 := getAddress()
-                       addr2 := getAddress()
-                       addr3 := getAddress()
-                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
-                       closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond)
-                       closeFn3 := setup(addr3, codes.OK, 10*time.Millisecond)
-                       p := newPub()
-                       defer func() {
-                               p.GracefulStop()
-                               closeFn1()
-                               closeFn2()
-                               closeFn3()
-                       }()
-
-                       group1 := svc
-
-                       now := time.Now()
-                       timeRange1 := &modelv1.TimeRange{
-                               Begin: timestamppb.New(now.Add(-3 * time.Hour)),
-                               End:   timestamppb.New(now.Add(-2 * time.Hour)),
-                       }
-
-                       timeRange2 := &modelv1.TimeRange{
-                               Begin: timestamppb.New(now.Add(-2 * time.Hour)),
-                               End:   timestamppb.New(now.Add(-1 * time.Hour)),
-                       }
-
-                       timeRange3 := &modelv1.TimeRange{
-                               Begin: timestamppb.New(now.Add(-1 * time.Hour)),
-                               End:   timestamppb.New(now),
-                       }
-
-                       node1Labels := map[string]string{"zone": "east"}
-                       node1Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange1,
-                       }
-
-                       node2Labels := map[string]string{"zone": "east"}
-                       node2Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange2,
-                       }
-
-                       node3Labels := map[string]string{"zone": "east"}
-                       node3Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange3,
-                       }
-
-                       node1 := getDataNodeWithLabels("node1", addr1, 
node1Labels, node1Boundaries)
-                       p.OnAddOrUpdate(node1)
-                       node2 := getDataNodeWithLabels("node2", addr2, 
node2Labels, node2Boundaries)
-                       p.OnAddOrUpdate(node2)
-                       node3 := getDataNodeWithLabels("node3", addr3, 
node3Labels, node3Boundaries)
-                       p.OnAddOrUpdate(node3)
-
-                       queryTimeRange := &modelv1.TimeRange{
-                               Begin: timestamppb.New(now.Add(-2 * 
time.Hour).Add(-30 * time.Minute)),
-                               End:   timestamppb.New(now.Add(-1 * 
time.Hour).Add(-30 * time.Minute)),
-                       }
-
-                       nodeSelectors := map[string][]string{
-                               group1: {""},
-                       }
-
-                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery,
-                               
bus.NewMessageWithNodeSelectors(bus.MessageID(1), nodeSelectors, 
queryTimeRange, &streamv1.QueryRequest{}))
-
-                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-                       gomega.Expect(ff).Should(gomega.HaveLen(2))
-
-                       for _, f := range ff {
-                               messages, err := f.GetAll()
-                               
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-                               
gomega.Expect(messages).Should(gomega.HaveLen(1))
-                       }
-               })
-
-               ginkgo.It("should broadcast messages with both label selector 
and time range filter", func() {
-                       addr1 := getAddress()
-                       addr2 := getAddress()
-                       addr3 := getAddress()
-                       addr4 := getAddress()
-                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
-                       closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond)
-                       closeFn3 := setup(addr3, codes.OK, 10*time.Millisecond)
-                       closeFn4 := setup(addr4, codes.OK, 10*time.Millisecond)
-                       p := newPub()
-                       defer func() {
-                               p.GracefulStop()
-                               closeFn1()
-                               closeFn2()
-                               closeFn3()
-                               closeFn4()
-                       }()
-
-                       group1 := svc
-
-                       now := time.Now()
-                       timeRange1 := &modelv1.TimeRange{
-                               Begin: timestamppb.New(now.Add(-2 * time.Hour)),
-                               End:   timestamppb.New(now.Add(-1 * time.Hour)),
-                       }
-
-                       timeRange2 := &modelv1.TimeRange{
-                               Begin: timestamppb.New(now.Add(-1 * time.Hour)),
-                               End:   timestamppb.New(now),
-                       }
-
-                       node1Labels := map[string]string{
-                               "env":  "prod",
-                               "zone": "east",
-                       }
-                       node1Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange1,
-                       }
-
-                       node2Labels := map[string]string{
-                               "env":  "prod",
-                               "zone": "west",
-                       }
-                       node2Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange1,
-                       }
-
-                       node3Labels := map[string]string{
-                               "env":  "dev",
-                               "zone": "east",
-                       }
-                       node3Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange2,
-                       }
-
-                       node4Labels := map[string]string{
-                               "env":  "prod",
-                               "zone": "east",
-                       }
-                       node4Boundaries := map[string]*modelv1.TimeRange{
-                               group1: timeRange2,
-                       }
-
-                       node1 := getDataNodeWithLabels("node1", addr1, 
node1Labels, node1Boundaries)
-                       p.OnAddOrUpdate(node1)
-                       node2 := getDataNodeWithLabels("node2", addr2, 
node2Labels, node2Boundaries)
-                       p.OnAddOrUpdate(node2)
-                       node3 := getDataNodeWithLabels("node3", addr3, 
node3Labels, node3Boundaries)
-                       p.OnAddOrUpdate(node3)
-                       node4 := getDataNodeWithLabels("node4", addr4, 
node4Labels, node4Boundaries)
-                       p.OnAddOrUpdate(node4)
-
-                       queryTimeRange := &modelv1.TimeRange{
-                               Begin: timestamppb.New(now.Add(-30 * 
time.Minute)),
-                               End:   timestamppb.New(now.Add(30 * 
time.Minute)),
-                       }
-
-                       nodeSelectors := map[string][]string{
-                               group1: {"env=prod"},
-                       }
-
-                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery,
-                               
bus.NewMessageWithNodeSelectors(bus.MessageID(1), nodeSelectors, 
queryTimeRange, &streamv1.QueryRequest{}))
-
-                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-                       gomega.Expect(ff).Should(gomega.HaveLen(1))
-
-                       messages, err := ff[0].GetAll()
-                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-                       gomega.Expect(messages).Should(gomega.HaveLen(1))
-               })
        })
 })
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 55624e6f..82b28495 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -331,7 +331,6 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
                SeriesIndexFlushTimeoutSeconds: 
s.option.flushTimeout.Nanoseconds() / int64(time.Second),
                SeriesIndexCacheMaxBytes:       
int(s.option.seriesCacheMaxSize),
                StorageMetricsFactory:          
s.omr.With(storageScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), 
p.DBLabelValues()))),
-               SegmentBoundaryUpdateFn:        
s.metadata.UpdateSegmentsBoundary,
                SegmentIdleTimeout:             segmentIdleTimeout,
        }
        return storage.OpenTSDB(
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 9fd262ad..ed3ec970 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -36,6 +36,13 @@
     - [Tag](#banyandb-common-v1-Tag)
     - [Trace](#banyandb-common-v1-Trace)
   
+- [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto)
+    - [Node](#banyandb-database-v1-Node)
+    - [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry)
+    - [Shard](#banyandb-database-v1-Shard)
+  
+    - [Role](#banyandb-database-v1-Role)
+  
 - [banyandb/model/v1/common.proto](#banyandb_model_v1_common-proto)
     - [FieldValue](#banyandb-model-v1-FieldValue)
     - [Float](#banyandb-model-v1-Float)
@@ -65,14 +72,6 @@
     - 
[LogicalExpression.LogicalOp](#banyandb-model-v1-LogicalExpression-LogicalOp)
     - [Sort](#banyandb-model-v1-Sort)
   
-- [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto)
-    - [Node](#banyandb-database-v1-Node)
-    - 
[Node.DataSegmentsBoundaryEntry](#banyandb-database-v1-Node-DataSegmentsBoundaryEntry)
-    - [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry)
-    - [Shard](#banyandb-database-v1-Shard)
-  
-    - [Role](#banyandb-database-v1-Role)
-  
 - [banyandb/database/v1/schema.proto](#banyandb_database_v1_schema-proto)
     - [Entity](#banyandb-database-v1-Entity)
     - [FieldSpec](#banyandb-database-v1-FieldSpec)
@@ -663,6 +662,93 @@ Trace is the top level message of a trace.
 
 
 
+<a name="banyandb_database_v1_database-proto"></a>
+<p align="right"><a href="#top">Top</a></p>
+
+## banyandb/database/v1/database.proto
+
+
+
+<a name="banyandb-database-v1-Node"></a>
+
+### Node
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
+| roles | [Role](#banyandb-database-v1-Role) | repeated |  |
+| grpc_address | [string](#string) |  |  |
+| http_address | [string](#string) |  |  |
+| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
+| labels | [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry) | 
repeated | labels is a set of key-value pairs to describe the node. |
+
+
+
+
+
+
+<a name="banyandb-database-v1-Node-LabelsEntry"></a>
+
+### Node.LabelsEntry
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| key | [string](#string) |  |  |
+| value | [string](#string) |  |  |
+
+
+
+
+
+
+<a name="banyandb-database-v1-Shard"></a>
+
+### Shard
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| id | [uint64](#uint64) |  |  |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
+| catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) |  |  |
+| node | [string](#string) |  |  |
+| total | [uint32](#uint32) |  |  |
+| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
+| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
+
+
+
+
+
+ 
+
+
+<a name="banyandb-database-v1-Role"></a>
+
+### Role
+
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| ROLE_UNSPECIFIED | 0 |  |
+| ROLE_META | 1 |  |
+| ROLE_DATA | 2 |  |
+| ROLE_LIAISON | 3 |  |
+
+
+ 
+
+ 
+
+ 
+
+
+
 <a name="banyandb_model_v1_common-proto"></a>
 <p align="right"><a href="#top">Top</a></p>
 
@@ -1076,110 +1162,6 @@ Each item in a string array is seen as a token instead 
of a query expression.
 
 
 
-<a name="banyandb_database_v1_database-proto"></a>
-<p align="right"><a href="#top">Top</a></p>
-
-## banyandb/database/v1/database.proto
-
-
-
-<a name="banyandb-database-v1-Node"></a>
-
-### Node
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
-| roles | [Role](#banyandb-database-v1-Role) | repeated |  |
-| grpc_address | [string](#string) |  |  |
-| http_address | [string](#string) |  |  |
-| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
-| labels | [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry) | 
repeated | labels is a set of key-value pairs to describe the node. |
-| data_segments_boundary | 
[Node.DataSegmentsBoundaryEntry](#banyandb-database-v1-Node-DataSegmentsBoundaryEntry)
 | repeated | data_segments_boundary is the time range of the data segments 
that the node is responsible for. [start, end) is the time range. |
-
-
-
-
-
-
-<a name="banyandb-database-v1-Node-DataSegmentsBoundaryEntry"></a>
-
-### Node.DataSegmentsBoundaryEntry
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| key | [string](#string) |  |  |
-| value | [banyandb.model.v1.TimeRange](#banyandb-model-v1-TimeRange) |  |  |
-
-
-
-
-
-
-<a name="banyandb-database-v1-Node-LabelsEntry"></a>
-
-### Node.LabelsEntry
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| key | [string](#string) |  |  |
-| value | [string](#string) |  |  |
-
-
-
-
-
-
-<a name="banyandb-database-v1-Shard"></a>
-
-### Shard
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| id | [uint64](#uint64) |  |  |
-| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
-| catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) |  |  |
-| node | [string](#string) |  |  |
-| total | [uint32](#uint32) |  |  |
-| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
-| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
-
-
-
-
-
- 
-
-
-<a name="banyandb-database-v1-Role"></a>
-
-### Role
-
-
-| Name | Number | Description |
-| ---- | ------ | ----------- |
-| ROLE_UNSPECIFIED | 0 |  |
-| ROLE_META | 1 |  |
-| ROLE_DATA | 2 |  |
-| ROLE_LIAISON | 3 |  |
-
-
- 
-
- 
-
- 
-
-
-
 <a name="banyandb_database_v1_schema-proto"></a>
 <p align="right"><a href="#top">Top</a></p>
 
diff --git a/pkg/test/helpers/context.go b/pkg/test/helpers/context.go
index 82854237..edd816e5 100644
--- a/pkg/test/helpers/context.go
+++ b/pkg/test/helpers/context.go
@@ -38,15 +38,17 @@ type SharedContext struct {
 
 // Args is a wrapper seals all necessary info for table specs.
 type Args struct {
-       Begin     *timestamppb.Timestamp
-       End       *timestamppb.Timestamp
-       Input     string
-       Want      string
-       Offset    time.Duration
-       Duration  time.Duration
-       WantEmpty bool
-       WantErr   bool
-       DisOrder  bool
+       Begin           *timestamppb.Timestamp
+       End             *timestamppb.Timestamp
+       Input           string
+       Want            string
+       Stages          []string
+       Offset          time.Duration
+       Duration        time.Duration
+       WantEmpty       bool
+       WantErr         bool
+       DisOrder        bool
+       IgnoreElementID bool
 }
 
 // UnmarshalYAML decodes YAML raw bytes to proto.Message.
@@ -80,9 +82,11 @@ type BackupSharedContext struct {
 
 // LifecycleSharedContext is the context shared between test cases in the 
lifecycle testing.
 type LifecycleSharedContext struct {
-       DataAddr   string
-       Connection *grpclib.ClientConn
-       SrcDir     string
-       DestDir    string
-       EtcdAddr   string
+       BaseTime    time.Time
+       Connection  *grpclib.ClientConn
+       LiaisonAddr string
+       DataAddr    string
+       SrcDir      string
+       DestDir     string
+       EtcdAddr    string
 }
diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go
index 4838c2eb..54910694 100644
--- a/pkg/test/measure/etcd.go
+++ b/pkg/test/measure/etcd.go
@@ -38,6 +38,7 @@ const (
        indexRuleDir        = "testdata/index_rules"
        indexRuleBindingDir = "testdata/index_rule_bindings"
        topNAggregationDir  = "testdata/topn_aggregations"
+       groupStagesDir      = "testdata/group_stages" // new directory
 )
 
 //go:embed testdata/*
@@ -45,31 +46,52 @@ var store embed.FS
 
 // PreloadSchema loads schemas from files in the booting process.
 func PreloadSchema(ctx context.Context, e schema.Registry) error {
-       if err := loadSchema(groupDir, &commonv1.Group{}, func(group 
*commonv1.Group) error {
-               return e.CreateGroup(ctx, group)
-       }); err != nil {
-               return errors.WithStack(err)
-       }
-       if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure 
*databasev1.Measure) error {
-               _, innerErr := e.CreateMeasure(ctx, measure)
-               return innerErr
-       }); err != nil {
-               return errors.WithStack(err)
-       }
-       if err := loadSchema(indexRuleDir, &databasev1.IndexRule{}, 
func(indexRule *databasev1.IndexRule) error {
-               return e.CreateIndexRule(ctx, indexRule)
-       }); err != nil {
-               return errors.WithStack(err)
-       }
-       if err := loadSchema(indexRuleBindingDir, 
&databasev1.IndexRuleBinding{}, func(indexRuleBinding 
*databasev1.IndexRuleBinding) error {
-               return e.CreateIndexRuleBinding(ctx, indexRuleBinding)
-       }); err != nil {
-               return errors.WithStack(err)
-       }
-       if err := loadSchema(topNAggregationDir, &databasev1.TopNAggregation{}, 
func(topN *databasev1.TopNAggregation) error {
-               return e.CreateTopNAggregation(ctx, topN)
-       }); err != nil {
-               return errors.WithStack(err)
+       return loadAllSchemas(ctx, e, groupDir)
+}
+
+// LoadSchemaWithStages loads group schemas from the groupStagesDir.
+func LoadSchemaWithStages(ctx context.Context, e schema.Registry) error {
+       return loadAllSchemas(ctx, e, groupStagesDir)
+}
+
+// loadAllSchemas is the common logic to load schemas from a given group 
directory.
+func loadAllSchemas(ctx context.Context, e schema.Registry, groupDirectory 
string) error {
+       return preloadSchemaWithFuncs(ctx, e,
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(groupDirectory, &commonv1.Group{}, 
func(group *commonv1.Group) error {
+                               return e.CreateGroup(ctx, group)
+                       })
+               },
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(measureDir, &databasev1.Measure{}, 
func(measure *databasev1.Measure) error {
+                               _, innerErr := e.CreateMeasure(ctx, measure)
+                               return innerErr
+                       })
+               },
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(indexRuleDir, 
&databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error {
+                               return e.CreateIndexRule(ctx, indexRule)
+                       })
+               },
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(indexRuleBindingDir, 
&databasev1.IndexRuleBinding{}, func(indexRuleBinding 
*databasev1.IndexRuleBinding) error {
+                               return e.CreateIndexRuleBinding(ctx, 
indexRuleBinding)
+                       })
+               },
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(topNAggregationDir, 
&databasev1.TopNAggregation{}, func(topN *databasev1.TopNAggregation) error {
+                               return e.CreateTopNAggregation(ctx, topN)
+                       })
+               },
+       )
+}
+
+// preloadSchemaWithFuncs extracts the common logic for loading schemas.
+func preloadSchemaWithFuncs(ctx context.Context, e schema.Registry, loaders 
...func(context.Context, schema.Registry) error) error {
+       for _, loader := range loaders {
+               if err := loader(ctx, e); err != nil {
+                       return errors.WithStack(err)
+               }
        }
        return nil
 }
diff --git a/pkg/test/measure/testdata/group_stages/exception.json 
b/pkg/test/measure/testdata/group_stages/exception.json
new file mode 100644
index 00000000..fadbd5a5
--- /dev/null
+++ b/pkg/test/measure/testdata/group_stages/exception.json
@@ -0,0 +1,18 @@
+{
+  "metadata": {
+    "name": "exception"
+  },
+  "catalog": "CATALOG_MEASURE",
+  "resource_opts": {
+    "shard_num": 2,
+    "segment_interval": {
+      "unit": "UNIT_DAY",
+      "num": 1
+    },
+    "ttl": {
+      "unit": "UNIT_DAY",
+      "num": 7
+    }
+  },
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/group_stages/index_mode.json 
b/pkg/test/measure/testdata/group_stages/index_mode.json
new file mode 100644
index 00000000..fb0d9b4b
--- /dev/null
+++ b/pkg/test/measure/testdata/group_stages/index_mode.json
@@ -0,0 +1,18 @@
+{
+  "metadata": {
+    "name": "index_mode"
+  },
+  "catalog": "CATALOG_MEASURE",
+  "resource_opts": {
+    "shard_num": 2,
+    "segment_interval": {
+      "unit": "UNIT_DAY",
+      "num": 1
+    },
+    "ttl": {
+      "unit": "UNIT_DAY",
+      "num": 7
+    }
+  },
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/groups/sw_metric.json 
b/pkg/test/measure/testdata/group_stages/sw_metric.json
similarity index 100%
copy from pkg/test/measure/testdata/groups/sw_metric.json
copy to pkg/test/measure/testdata/group_stages/sw_metric.json
diff --git a/pkg/test/measure/testdata/groups/sw_metric.json 
b/pkg/test/measure/testdata/groups/sw_metric.json
index 703ae9f5..a91becb7 100644
--- a/pkg/test/measure/testdata/groups/sw_metric.json
+++ b/pkg/test/measure/testdata/groups/sw_metric.json
@@ -12,22 +12,7 @@
     "ttl": {
       "unit": "UNIT_DAY",
       "num": 7
-    },
-    "stages": [
-      {
-        "name": "warm",
-        "shard_num": 1,
-        "segment_interval": {
-          "unit": "UNIT_DAY",
-          "num": 3
-        },
-        "ttl": {
-          "unit": "UNIT_DAY",
-          "num": 30
-        },
-        "node_selector": "type=warm"
-      }
-    ]
+    }
   },
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
\ No newline at end of file
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index 93dcf0fb..d48c3f4f 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -47,26 +47,12 @@ var (
        streamStore embed.FS
        //go:embed testdata/group.json
        groupJSON string
+       //go:embed testdata/group_with_stages.json
+       groupWithStagesJSON string
 )
 
-// PreloadSchema loads schemas from files in the booting process.
-func PreloadSchema(ctx context.Context, e schema.Registry) error {
-       if e == nil {
-               return nil
-       }
-       g := &commonv1.Group{}
-       if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
-               return err
-       }
-       _, err := e.GetGroup(ctx, g.Metadata.Name)
-       if !errors.Is(err, schema.ErrGRPCResourceNotFound) {
-               logger.Infof("group %s already exists", g.Metadata.Name)
-               return nil
-       }
-       if innerErr := e.CreateGroup(ctx, g); innerErr != nil {
-               return innerErr
-       }
-
+// loadSchemas loads streams, index rules, and index rule bindings.
+func loadSchemas(ctx context.Context, e schema.Registry) error {
        streams, err := streamStore.ReadDir(streamDir)
        if err != nil {
                return err
@@ -126,3 +112,46 @@ func PreloadSchema(ctx context.Context, e schema.Registry) 
error {
 
        return nil
 }
+
+// LoadSchemaWithStages loads schemas from files, including group stages.
+func LoadSchemaWithStages(ctx context.Context, e schema.Registry) error {
+       if e == nil {
+               return nil
+       }
+       g := &commonv1.Group{}
+       if err := protojson.Unmarshal([]byte(groupWithStagesJSON), g); err != 
nil {
+               return err
+       }
+       _, err := e.GetGroup(ctx, g.Metadata.Name)
+       if !errors.Is(err, schema.ErrGRPCResourceNotFound) {
+               logger.Infof("group %s already exists", g.Metadata.Name)
+               return nil
+       }
+       if innerErr := e.CreateGroup(ctx, g); innerErr != nil {
+               return innerErr
+       }
+
+       return loadSchemas(ctx, e)
+}
+
+// PreloadSchema loads schemas from files in the booting process.
+// This version loads group without stages.
+func PreloadSchema(ctx context.Context, e schema.Registry) error {
+       if e == nil {
+               return nil
+       }
+       g := &commonv1.Group{}
+       if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
+               return err
+       }
+       _, err := e.GetGroup(ctx, g.Metadata.Name)
+       if !errors.Is(err, schema.ErrGRPCResourceNotFound) {
+               logger.Infof("group %s already exists", g.Metadata.Name)
+               return nil
+       }
+       if innerErr := e.CreateGroup(ctx, g); innerErr != nil {
+               return innerErr
+       }
+
+       return loadSchemas(ctx, e)
+}
diff --git a/pkg/test/stream/testdata/group.json 
b/pkg/test/stream/testdata/group.json
index f38a2436..b9e8dc80 100644
--- a/pkg/test/stream/testdata/group.json
+++ b/pkg/test/stream/testdata/group.json
@@ -12,22 +12,7 @@
     "ttl": {
       "unit": "UNIT_DAY",
       "num": 3
-    },
-    "stages": [
-      {
-        "name": "warm",
-        "shard_num": 1,
-        "segment_interval": {
-          "unit": "UNIT_DAY",
-          "num": 3
-        },
-        "ttl": {
-          "unit": "UNIT_DAY",
-          "num": 7
-        },
-        "node_selector": "type=warm"
-      }
-    ]
+    }
   },
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
\ No newline at end of file
diff --git a/pkg/test/stream/testdata/group.json 
b/pkg/test/stream/testdata/group_with_stages.json
similarity index 100%
copy from pkg/test/stream/testdata/group.json
copy to pkg/test/stream/testdata/group_with_stages.json
diff --git a/test/cases/lifecycle/lifecycle.go 
b/test/cases/lifecycle/lifecycle.go
index 55f68ee0..b1ebbfc3 100644
--- a/test/cases/lifecycle/lifecycle.go
+++ b/test/cases/lifecycle/lifecycle.go
@@ -22,12 +22,20 @@ import (
        "io/fs"
        "os"
        "path/filepath"
+       "time"
 
        "github.com/onsi/ginkgo/v2"
        "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
 
        "github.com/apache/skywalking-banyandb/banyand/backup/lifecycle"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       measureTestData 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+       streamTestData 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
+       topNTestData 
"github.com/apache/skywalking-banyandb/test/cases/topn/data"
 )
 
 // SharedContext is the shared context for the snapshot test cases.
@@ -49,12 +57,87 @@ var _ = ginkgo.Describe("Lifecycle", func() {
                })
                err = lifecycleCmd.Execute()
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
-               verifySourceDirectoriesBeforeMigration()
+               verifySourceDirectoriesAfterMigration()
                verifyDestinationDirectoriesAfterMigration()
+               conn, err := grpchelper.Conn(SharedContext.LiaisonAddr, 
10*time.Second,
+                       
grpc.WithTransportCredentials(insecure.NewCredentials()))
+               defer func() {
+                       if conn != nil {
+                               _ = conn.Close()
+                       }
+               }()
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               sc := helpers.SharedContext{
+                       Connection: conn,
+                       BaseTime:   SharedContext.BaseTime,
+               }
+               // Verify measure data lifecycle stages
+               verifyLifecycleStages(sc, measureTestData.VerifyFn, 
helpers.Args{
+                       Input:    "all",
+                       Duration: 25 * time.Minute,
+                       Offset:   -20 * time.Minute,
+               })
+
+               // Verify stream data lifecycle stages
+               verifyLifecycleStages(sc, streamTestData.VerifyFn, helpers.Args{
+                       Input:           "all",
+                       Duration:        time.Hour,
+                       IgnoreElementID: true,
+               })
+
+               // Verify topN data lifecycle stages
+               verifyLifecycleStages(sc, topNTestData.VerifyFn, helpers.Args{
+                       Input:    "aggr_desc",
+                       Duration: 25 * time.Minute,
+                       Offset:   -20 * time.Minute,
+               })
        })
 })
 
-func verifySourceDirectoriesBeforeMigration() {
+func verifyLifecycleStages(sc helpers.SharedContext, verifyFn 
func(gomega.Gomega, helpers.SharedContext, helpers.Args), args helpers.Args) {
+       // Initial verification expecting error before migration
+       verifyFn(gomega.Default, sc, helpers.Args{
+               Input:    args.Input,
+               Duration: args.Duration,
+               Offset:   args.Offset,
+               WantErr:  true,
+               Stages:   args.Stages,
+       })
+
+       // Verify hot+warm stages exist after migration
+       gomega.Eventually(func(innerGm gomega.Gomega) {
+               verifyFn(innerGm, sc, helpers.Args{
+                       Input:           args.Input,
+                       Duration:        args.Duration,
+                       Offset:          args.Offset,
+                       Stages:          []string{"hot", "warm"},
+                       IgnoreElementID: args.IgnoreElementID,
+               })
+       }, flags.EventuallyTimeout).Should(gomega.Succeed())
+
+       // Verify warm stage only after retention
+       gomega.Eventually(func(innerGm gomega.Gomega) {
+               verifyFn(innerGm, sc, helpers.Args{
+                       Input:           args.Input,
+                       Duration:        args.Duration,
+                       Offset:          args.Offset,
+                       Stages:          []string{"warm"},
+                       IgnoreElementID: args.IgnoreElementID,
+               })
+       }, flags.EventuallyTimeout).Should(gomega.Succeed())
+
+       // Verify hot stage is empty after retention
+       verifyFn(gomega.Default, sc, helpers.Args{
+               Input:           args.Input,
+               Duration:        args.Duration,
+               Offset:          args.Offset,
+               WantEmpty:       true,
+               Stages:          []string{"hot"},
+               IgnoreElementID: args.IgnoreElementID,
+       })
+}
+
+func verifySourceDirectoriesAfterMigration() {
        streamSrcPath := filepath.Join(SharedContext.SrcDir, "stream", "data", 
"default")
        streamEntries, err := os.ReadDir(streamSrcPath)
        gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Stream source 
directory should exist")
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index ff17c45c..f4c78699 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -54,6 +54,7 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext 
helpers.SharedContext, args
        query := &measurev1.QueryRequest{}
        helpers.UnmarshalYAML(i, query)
        query.TimeRange = helpers.TimeRange(args, sharedContext)
+       query.Stages = args.Stages
        c := measurev1.NewMeasureServiceClient(sharedContext.Connection)
        ctx := context.Background()
        resp, err := c.Query(ctx, query)
diff --git a/test/cases/stream/data/data.go b/test/cases/stream/data/data.go
index 6701e166..b3df58d4 100644
--- a/test/cases/stream/data/data.go
+++ b/test/cases/stream/data/data.go
@@ -65,6 +65,7 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext 
helpers.SharedContext, args
        query := &streamv1.QueryRequest{}
        helpers.UnmarshalYAML(i, query)
        query.TimeRange = helpers.TimeRange(args, sharedContext)
+       query.Stages = args.Stages
        c := streamv1.NewStreamServiceClient(sharedContext.Connection)
        ctx := context.Background()
        resp, err := c.Query(ctx, query)
@@ -86,8 +87,10 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext 
helpers.SharedContext, args
        innerGm.Expect(err).NotTo(gm.HaveOccurred())
        want := &streamv1.QueryResponse{}
        helpers.UnmarshalYAML(ww, want)
-       for i := range want.Elements {
-               want.Elements[i].ElementId = 
hex.EncodeToString(convert.Uint64ToBytes(convert.HashStr(query.Name + "|" + 
want.Elements[i].ElementId)))
+       if !args.IgnoreElementID {
+               for i := range want.Elements {
+                       want.Elements[i].ElementId = 
hex.EncodeToString(convert.Uint64ToBytes(convert.HashStr(query.Name + "|" + 
want.Elements[i].ElementId)))
+               }
        }
        if args.DisOrder {
                slices.SortFunc(want.Elements, func(a, b *streamv1.Element) int 
{
@@ -97,10 +100,15 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext 
helpers.SharedContext, args
                        return strings.Compare(a.ElementId, b.ElementId)
                })
        }
-       success := innerGm.Expect(cmp.Equal(resp, want,
-               protocmp.IgnoreUnknown(),
+       var extra []cmp.Option
+       extra = append(extra, protocmp.IgnoreUnknown(),
                protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"),
-               protocmp.Transform())).
+               protocmp.Transform())
+       if args.IgnoreElementID {
+               extra = append(extra, 
protocmp.IgnoreFields(&streamv1.Element{}, "element_id"))
+       }
+       success := innerGm.Expect(cmp.Equal(resp, want,
+               extra...)).
                To(gm.BeTrue(), func() string {
                        var j []byte
                        j, err = protojson.Marshal(resp)
diff --git a/test/cases/topn/data/data.go b/test/cases/topn/data/data.go
index 983be9c4..67a1bbd3 100644
--- a/test/cases/topn/data/data.go
+++ b/test/cases/topn/data/data.go
@@ -46,6 +46,7 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext 
helpers.SharedContext, args
        query := &measurev1.TopNRequest{}
        helpers.UnmarshalYAML(i, query)
        query.TimeRange = helpers.TimeRange(args, sharedContext)
+       query.Stages = args.Stages
        c := measurev1.NewMeasureServiceClient(sharedContext.Connection)
        ctx := context.Background()
        resp, err := c.TopN(ctx, query)
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 122c5d1b..103c9a00 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -26,6 +26,6 @@ SW_ROVER_COMMIT=4c0cb8429a96f190ea30eac1807008d523c749c3
 SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3
 SW_PREDICTOR_COMMIT=54a0197654a3781a6f73ce35146c712af297c994
 
-SW_OAP_COMMIT=c63fe21e75c7b898564add0035c220d9b242f7d2
+SW_OAP_COMMIT=03b03518616a91129cfced2972621df3f0bac3db
 SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=d3f8fe894d1a206164b73f5b523d2eb62d9e9965
 SW_CTL_COMMIT=67cbc89dd7b214d5791321a7ca992f940cb586ba
\ No newline at end of file
diff --git a/test/integration/distributed/lifecycle/lifecycle_suite_test.go 
b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
index 0b5059fc..4fc99cbd 100644
--- a/test/integration/distributed/lifecycle/lifecycle_suite_test.go
+++ b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
@@ -51,13 +51,15 @@ func TestLifecycle(t *testing.T) {
 }
 
 var (
-       connection *grpc.ClientConn
-       srcDir     string
-       destDir    string
-       deferFunc  func()
-       goods      []gleak.Goroutine
-       dataAddr   string
-       ep         string
+       connection       *grpc.ClientConn
+       srcDir           string
+       destDir          string
+       deferFunc        func()
+       goods            []gleak.Goroutine
+       dataAddr         string
+       ep               string
+       tenDaysBeforeNow time.Time
+       liaisonAddr      string
 )
 
 var _ = SynchronizedBeforeSuite(func() []byte {
@@ -86,8 +88,8 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        Expect(err).NotTo(HaveOccurred())
        defer schemaRegistry.Close()
        ctx := context.Background()
-       test_stream.PreloadSchema(ctx, schemaRegistry)
-       test_measure.PreloadSchema(ctx, schemaRegistry)
+       test_stream.LoadSchemaWithStages(ctx, schemaRegistry)
+       test_measure.LoadSchemaWithStages(ctx, schemaRegistry)
        By("Starting hot data node")
        var closeDataNode0 func()
        dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep, 
"--node-labels", "type=hot", "--measure-flush-timeout", "0s", 
"--stream-flush-timeout", "0s")
@@ -95,11 +97,12 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        var closeDataNode1 func()
        _, destDir, closeDataNode1 = setup.DataNodeWithAddrAndDir(ep, 
"--node-labels", "type=warm", "--measure-flush-timeout", "0s", 
"--stream-flush-timeout", "0s")
        By("Starting liaison node")
-       liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep, 
"--data-node-selector", "type=hot")
+       var closerLiaisonNode func()
+       liaisonAddr, closerLiaisonNode = setup.LiaisonNode(ep, 
"--data-node-selector", "type=hot")
        By("Initializing test cases with 10 days before")
        ns := timestamp.NowMilli().UnixNano()
        now := time.Unix(0, ns-ns%int64(time.Minute))
-       tenDaysBeforeNow := now.Add(-10 * 24 * time.Hour)
+       tenDaysBeforeNow = now.Add(-10 * 24 * time.Hour)
        test_cases.Initialize(liaisonAddr, tenDaysBeforeNow)
        deferFunc = func() {
                closerLiaisonNode()
@@ -116,11 +119,13 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                grpc.WithTransportCredentials(insecure.NewCredentials()))
        Expect(err).NotTo(HaveOccurred())
        caseslifecycle.SharedContext = helpers.LifecycleSharedContext{
-               DataAddr:   dataAddr,
-               Connection: connection,
-               SrcDir:     srcDir,
-               DestDir:    destDir,
-               EtcdAddr:   ep,
+               LiaisonAddr: liaisonAddr,
+               DataAddr:    dataAddr,
+               Connection:  connection,
+               SrcDir:      srcDir,
+               DestDir:     destDir,
+               EtcdAddr:    ep,
+               BaseTime:    tenDaysBeforeNow,
        }
 })
 

Reply via email to