This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 704432c9 [Merged without e2e] add lifecycle query test  (#657)
704432c9 is described below

commit 704432c90450daceca573b55eb890aabe2f6ca53
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Apr 27 10:45:46 2025 +0800

    [Merged without e2e] add lifecycle query test  (#657)
---
 api/common/id.go                                   |   5 +
 api/proto/banyandb/measure/v1/topn.proto           |   2 +
 banyand/dquery/dquery.go                           |  71 ++++++++--
 banyand/dquery/measure.go                          |  24 +++-
 banyand/dquery/stream.go                           |  23 +++-
 banyand/dquery/topn.go                             |  24 +++-
 banyand/liaison/grpc/discovery.go                  |  32 -----
 banyand/liaison/grpc/measure.go                    |  11 +-
 banyand/liaison/grpc/stream.go                     |  10 +-
 banyand/queue/pub/pub.go                           |   4 +
 docs/api-reference.md                              |   1 +
 pkg/cmdsetup/liaison.go                            |   5 +-
 pkg/query/executor/interface.go                    |   1 +
 .../logical/measure/measure_plan_distributed.go    |   5 +-
 .../logical/stream/stream_plan_distributed.go      |   3 +-
 test/docker/Dockerfile                             |   2 +-
 test/docker/base-compose.yml                       |   2 +
 test/e2e-v2/cases/lifecycle/docker-compose.yml     | 153 +++++++++++++++++++++
 test/e2e-v2/script/env                             |   9 +-
 19 files changed, 310 insertions(+), 77 deletions(-)

diff --git a/api/common/id.go b/api/common/id.go
index aafbfb53..6a8e48e7 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -253,3 +253,8 @@ type contextNodeKey struct{}
 var ContextNodeRolesKey = contextNodeRolesKey{}
 
 type contextNodeRolesKey struct{}
+
+// ContextNodeSelectorKey is a context key to store the node selector.
+var ContextNodeSelectorKey = contextNodeSelectorKey{}
+
+type contextNodeSelectorKey struct{}
diff --git a/api/proto/banyandb/measure/v1/topn.proto 
b/api/proto/banyandb/measure/v1/topn.proto
index dc15a444..48afc024 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -67,4 +67,6 @@ message TopNRequest {
   model.v1.Sort field_value_sort = 7;
   // trace is used to enable trace for the query
   bool trace = 8;
+  // stages is used to specify the stage of the data points in the lifecycle
+  repeated string stages = 9;
 }
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index c56f410e..653aa811 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -21,12 +21,14 @@ package dquery
 import (
        "context"
        "errors"
+       "strings"
        "time"
 
        "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -41,7 +43,8 @@ import (
 )
 
 const (
-       moduleName = "distributed-query"
+       moduleName   = "distributed-query"
+       hotStageName = "hot"
 )
 
 var (
@@ -52,16 +55,17 @@ var (
 )
 
 type queryService struct {
-       metaService metadata.Repo
-       pipeline    queue.Server
-       omr         observability.MetricsRegistry
-       log         *logger.Logger
-       sqp         *streamQueryProcessor
-       mqp         *measureQueryProcessor
-       tqp         *topNQueryProcessor
-       closer      *run.Closer
-       nodeID      string
-       slowQuery   time.Duration
+       metaService          metadata.Repo
+       pipeline             queue.Server
+       omr                  observability.MetricsRegistry
+       log                  *logger.Logger
+       sqp                  *streamQueryProcessor
+       mqp                  *measureQueryProcessor
+       tqp                  *topNQueryProcessor
+       closer               *run.Closer
+       nodeID               string
+       hotStageNodeSelector string
+       slowQuery            time.Duration
 }
 
 // NewService return a new query service.
@@ -109,11 +113,17 @@ func (q *queryService) PreRun(ctx context.Context) error {
        }
        node := val.(common.Node)
        q.nodeID = node.NodeID
+       val = ctx.Value(common.ContextNodeSelectorKey)
+       if val != nil {
+               q.hotStageNodeSelector = val.(string)
+       }
+
        q.log = logger.GetLogger(moduleName)
        q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log,
                schema.NewMetrics(q.omr.With(streamScope)))
        q.mqp.measureService = measure.NewPortableRepository(q.metaService, 
q.log,
                schema.NewMetrics(q.omr.With(measureScope)))
+       q.tqp.measureService = q.mqp.measureService
        return multierr.Combine(
                q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
                q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
@@ -132,13 +142,50 @@ func (q *queryService) Serve() run.StopNotify {
        return q.closer.CloseNotify()
 }
 
+func (q *queryService) parseNodeSelector(stages []string, resource 
*commonv1.ResourceOpts) ([]string, bool) {
+       if len(stages) == 0 {
+               stages = resource.DefaultStages
+       }
+       if len(stages) == 0 {
+               return nil, false
+       }
+
+       var nodeSelectors []string
+       for _, stage := range resource.Stages {
+               for _, sn := range stages {
+                       if strings.EqualFold(sn, stage.Name) {
+                               ns := stage.NodeSelector
+                               ns = strings.TrimSpace(ns)
+                               if ns == "" {
+                                       continue
+                               }
+                               nodeSelectors = append(nodeSelectors, ns)
+                               break
+                       }
+                       if strings.EqualFold(sn, hotStageName) && 
q.hotStageNodeSelector != "" {
+                               nodeSelectors = append(nodeSelectors, 
q.hotStageNodeSelector)
+                               break
+                       }
+               }
+       }
+       if len(nodeSelectors) == 0 {
+               return nil, false
+       }
+       return nodeSelectors, true
+}
+
 var _ executor.DistributedExecutionContext = (*distributedContext)(nil)
 
 type distributedContext struct {
        bus.Broadcaster
-       timeRange *modelv1.TimeRange
+       timeRange     *modelv1.TimeRange
+       nodeSelectors map[string][]string
 }
 
 func (dc *distributedContext) TimeRange() *modelv1.TimeRange {
        return dc.timeRange
 }
+
+func (dc *distributedContext) NodeSelectors() map[string][]string {
+       return dc.nodeSelectors
+}
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index a1e3ecce..56548d7a 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -83,12 +83,30 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (r
        if e := ml.Debug(); e.Enabled() {
                e.Str("plan", plan.String()).Msg("query plan")
        }
+       nodeSelectors := make(map[string][]string)
+       for _, g := range queryCriteria.Groups {
+               if gs, ok := p.measureService.LoadGroup(g); ok {
+                       if ns, exist := 
p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist {
+                               nodeSelectors[g] = ns
+                       }
+               } else {
+                       ml.Error().RawJSON("req", 
logger.Proto(queryCriteria)).Msg("group not found")
+                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("group %s not found", g))
+                       return
+               }
+       }
+       if len(queryCriteria.Stages) > 0 && len(nodeSelectors) == 0 {
+               ml.Error().RawJSON("req", logger.Proto(queryCriteria)).Msg("no 
stage found")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("no 
stage found"))
+               return
+       }
        var tracer *query.Tracer
        var span *query.Span
        if queryCriteria.Trace {
                tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
                span, ctx = tracer.StartSpan(ctx, "distributed-%s", 
p.queryService.nodeID)
                span.Tag("plan", plan.String())
+               span.Tagf("nodeSelectors", "%v", nodeSelectors)
                defer func() {
                        data := resp.Data()
                        switch d := data.(type) {
@@ -103,9 +121,11 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (r
                        span.Stop()
                }()
        }
+
        mIterator, err := 
plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(ctx,
 &distributedContext{
-               Broadcaster: p.broadcaster,
-               timeRange:   queryCriteria.TimeRange,
+               Broadcaster:   p.broadcaster,
+               timeRange:     queryCriteria.TimeRange,
+               nodeSelectors: nodeSelectors,
        }))
        if err != nil {
                ml.Error().Err(err).Dur("latency", 
time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to query")
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 80a31c9d..6cdd2ae7 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -79,12 +79,30 @@ func (p *streamQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (re
        if p.log.Debug().Enabled() {
                p.log.Debug().Str("plan", plan.String()).Msg("query plan")
        }
+       nodeSelectors := make(map[string][]string)
+       for _, g := range queryCriteria.Groups {
+               if gs, ok := p.streamService.LoadGroup(g); ok {
+                       if ns, exist := 
p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist {
+                               nodeSelectors[g] = ns
+                       }
+               } else {
+                       p.log.Error().RawJSON("req", 
logger.Proto(queryCriteria)).Msg("group not found")
+                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("group %s not found", g))
+                       return
+               }
+       }
+       if len(queryCriteria.Stages) > 0 && len(nodeSelectors) == 0 {
+               p.log.Error().RawJSON("req", 
logger.Proto(queryCriteria)).Msg("no stage found")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("no 
stage found"))
+               return
+       }
        if queryCriteria.Trace {
                var tracer *query.Tracer
                var span *query.Span
                tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
                span, ctx = tracer.StartSpan(ctx, "distributed-%s", 
p.queryService.nodeID)
                span.Tag("plan", plan.String())
+               span.Tagf("nodeSelectors", "%v", nodeSelectors)
                defer func() {
                        data := resp.Data()
                        switch d := data.(type) {
@@ -102,8 +120,9 @@ func (p *streamQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (re
        se := plan.(executor.StreamExecutable)
        defer se.Close()
        entities, err := 
se.Execute(executor.WithDistributedExecutionContext(ctx, &distributedContext{
-               Broadcaster: p.broadcaster,
-               timeRange:   queryCriteria.TimeRange,
+               Broadcaster:   p.broadcaster,
+               timeRange:     queryCriteria.TimeRange,
+               nodeSelectors: nodeSelectors,
        }))
        if err != nil {
                p.log.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to execute the query plan")
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index 067d7f0e..b7adf5c1 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/data"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/query"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
@@ -40,7 +41,8 @@ import (
 const defaultTopNQueryTimeout = 10 * time.Second
 
 type topNQueryProcessor struct {
-       broadcaster bus.Broadcaster
+       measureService measure.SchemaService
+       broadcaster    bus.Broadcaster
        *queryService
        *bus.UnImplementedHealthyListener
 }
@@ -64,11 +66,29 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
        if e := t.log.Debug(); e.Enabled() {
                e.RawJSON("req", logger.Proto(request)).Msg("received a topN 
query event")
        }
+       nodeSelectors := make(map[string][]string)
+       for _, g := range request.Groups {
+               if gs, ok := t.measureService.LoadGroup(g); ok {
+                       if ns, exist := t.parseNodeSelector(request.Stages, 
gs.GetSchema().ResourceOpts); exist {
+                               nodeSelectors[g] = ns
+                       }
+               } else {
+                       t.log.Error().Str("group", g).Msg("failed to load 
group")
+                       resp = bus.NewMessage(now, common.NewError("failed to 
load group %s", g))
+                       return
+               }
+       }
+       if len(request.Stages) > 0 && len(nodeSelectors) == 0 {
+               t.log.Error().RawJSON("req", logger.Proto(request)).Msg("no 
stage found")
+               resp = bus.NewMessage(now, common.NewError("no stage found"))
+               return
+       }
        if request.Trace {
                var tracer *pkgquery.Tracer
                tracer, ctx = pkgquery.NewTracer(ctx, 
n.Format(time.RFC3339Nano))
                span, _ := tracer.StartSpan(ctx, "distributed-client")
                span.Tag("request", 
convert.BytesToString(logger.Proto(request)))
+               span.Tagf("nodeSelectors", "%v", nodeSelectors)
                defer func() {
                        data := resp.Data()
                        switch d := data.(type) {
@@ -85,7 +105,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
        }
        agg := request.Agg
        request.Agg = 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
-       ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, 
data.TopicTopNQuery, bus.NewMessage(now, request))
+       ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, 
data.TopicTopNQuery, bus.NewMessageWithNodeSelectors(now, nodeSelectors, 
request.TimeRange, request))
        if err != nil {
                resp = bus.NewMessage(now, common.NewError("execute the query 
%s: %v", request.GetName(), err))
                return
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index b85fd472..0df7b6ef 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -19,7 +19,6 @@ package grpc
 
 import (
        "fmt"
-       "strings"
        "sync"
 
        "github.com/pkg/errors"
@@ -163,37 +162,6 @@ func (s *groupRepo) shardNum(groupName string) (uint32, 
bool) {
        return r.ShardNum, true
 }
 
-func (s *groupRepo) getNodeSelector(groupName string, stages []string) 
([]string, bool) {
-       s.RWMutex.RLock()
-       defer s.RWMutex.RUnlock()
-       r, ok := s.resourceOpts[groupName]
-       if !ok {
-               return nil, false
-       }
-       if len(stages) == 0 {
-               stages = r.DefaultStages
-       }
-       if len(stages) == 0 {
-               return nil, false
-       }
-
-       var nodeSelectors []string
-       for _, stage := range r.Stages {
-               for _, sn := range stages {
-                       if strings.EqualFold(sn, stage.Name) {
-                               ns := stage.NodeSelector
-                               ns = strings.TrimSpace(ns)
-                               if ns == "" {
-                                       continue
-                               }
-                               nodeSelectors = append(nodeSelectors, ns)
-                               break
-                       }
-               }
-       }
-       return nodeSelectors, true
-}
-
 func getID(metadata *commonv1.Metadata) identity {
        return identity{
                name:  metadata.GetName(),
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 15c72fa5..7dd38590 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -215,16 +215,7 @@ func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest
                        span.Stop()
                }()
        }
-       nodeSelectors := make(map[string][]string)
-       for _, g := range req.Groups {
-               if ns, exist := ms.groupRepo.getNodeSelector(g, req.Stages); 
exist {
-                       nodeSelectors[g] = ns
-               } else {
-                       nodeSelectors[g] = nil
-               }
-       }
-
-       feat, err := ms.broadcaster.Publish(ctx, data.TopicMeasureQuery, 
bus.NewMessageWithNodeSelectors(bus.MessageID(now.UnixNano()), nodeSelectors, 
req.TimeRange, req))
+       feat, err := ms.broadcaster.Publish(ctx, data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(now.UnixNano()), req))
        if err != nil {
                return nil, err
        }
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index b1920878..c7c0c245 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -213,15 +213,7 @@ func (s *streamService) Query(ctx context.Context, req 
*streamv1.QueryRequest) (
                        span.Stop()
                }()
        }
-       nodeSelectors := make(map[string][]string)
-       for _, g := range req.Groups {
-               if ns, exist := s.groupRepo.getNodeSelector(g, req.Stages); 
exist {
-                       nodeSelectors[g] = ns
-               } else {
-                       nodeSelectors[g] = nil
-               }
-       }
-       message := 
bus.NewMessageWithNodeSelectors(bus.MessageID(now.UnixNano()), nodeSelectors, 
req.TimeRange, req)
+       message := bus.NewMessage(bus.MessageID(now.UnixNano()), req)
        feat, errQuery := s.broadcaster.Publish(ctx, data.TopicStreamQuery, 
message)
        if errQuery != nil {
                if errors.Is(errQuery, io.EOF) {
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 77972843..da5efca0 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -161,6 +161,10 @@ func (p *pub) Broadcast(timeout time.Duration, topic 
bus.Topic, messages bus.Mes
                l.Msgf("broadcasting message to %s nodes", names)
        }
 
+       if len(names) == 0 {
+               return nil, fmt.Errorf("no nodes match the selector %v", 
messages.NodeSelectors())
+       }
+
        futureCh := make(chan publishResult, len(names))
        var wg sync.WaitGroup
        for n := range names {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index c90c4b39..9fd262ad 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -3112,6 +3112,7 @@ TopNRequest is the request contract for query.
 | conditions | [banyandb.model.v1.Condition](#banyandb-model-v1-Condition) | 
repeated | criteria select counters. Only equals are acceptable. |
 | field_value_sort | [banyandb.model.v1.Sort](#banyandb-model-v1-Sort) |  | 
field_value_sort indicates how to sort fields |
 | trace | [bool](#bool) |  | trace is used to enable trace for the query |
+| stages | [string](#string) | repeated | stages is used to specify the stage 
of the data points in the lifecycle |
 
 
 
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 1ae588d8..48082843 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -86,7 +86,9 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                Version: version.Build(),
                Short:   "Run as the liaison server",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       ctx := context.Background()
                        if nodeSelector != "" {
+                               ctx = context.WithValue(ctx, 
common.ContextNodeSelectorKey, nodeSelector)
                                var ls *pub.LabelSelector
                                ls, err = pub.ParseLabelSelector(nodeSelector)
                                if err != nil {
@@ -101,8 +103,9 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                                return err
                        }
                        logger.GetLogger().Info().Msg("starting as a liaison 
server")
+                       ctx = context.WithValue(ctx, common.ContextNodeKey, 
node)
                        // Spawn our go routines and wait for shutdown.
-                       if err := 
liaisonGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, 
node)); err != nil {
+                       if err := liaisonGroup.Run(ctx); err != nil {
                                
logger.GetLogger().Error().Err(err).Stack().Str("name", 
liaisonGroup.Name()).Msg("Exit")
                                os.Exit(-1)
                        }
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index 6ba7071b..d48675f6 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -92,6 +92,7 @@ type MeasureExecutable interface {
 type DistributedExecutionContext interface {
        bus.Broadcaster
        TimeRange() *modelv1.TimeRange
+       NodeSelectors() map[string][]string
 }
 
 // DistributedExecutionContextKey is the key of distributed execution context 
in context.Context.
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index 3362e6cc..c6843029 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -170,6 +170,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
                span, _ = tracer.StartSpan(ctx, "distributed-client")
                queryRequest.Trace = true
                span.Tag("request", 
convert.BytesToString(logger.Proto(queryRequest)))
+               span.Tag("node_selectors", fmt.Sprintf("%v", 
dctx.NodeSelectors()))
+               span.Tag("time_range", dctx.TimeRange().String())
                defer func() {
                        if err != nil {
                                span.Error(err)
@@ -178,7 +180,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
                        }
                }()
        }
-       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
+       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery,
+               
bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos), 
dctx.NodeSelectors(), dctx.TimeRange(), queryRequest))
        if err != nil {
                return nil, err
        }
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
index cf8de2b2..ba55e3af 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -152,7 +152,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee 
[]*streamv1.Element,
                        }
                }()
        }
-       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
+       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery,
+               
bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos), 
dctx.NodeSelectors(), dctx.TimeRange(), queryRequest))
        if err != nil {
                return nil, err
        }
diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile
index 4ce236a5..a35c463b 100644
--- a/test/docker/Dockerfile
+++ b/test/docker/Dockerfile
@@ -26,7 +26,7 @@ COPY 
banyand/build/bin/linux/${TARGETARCH}/banyand-server-static /banyand
 COPY bydbctl/build/bin/linux/${TARGETARCH}/bydbctl-cli-static /bydbctl
 COPY --from=certs /etc/ssl/certs /etc/ssl/certs
 
-ENV GRPC_GO_LOG_SEVERITY_LEVEL=INFO
+ENV GRPC_GO_LOG_SEVERITY_LEVEL=WARN
 ENV GRPC_GO_LOG_FORMATTER=json
 
 EXPOSE 17912
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index 9fcb81f9..0488a9a7 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -28,6 +28,8 @@ services:
 
   liaison:
     hostname: liaison
+    ports:
+      - 17913:17913
     expose:
       - 17912
       - 2121
diff --git a/test/e2e-v2/cases/lifecycle/docker-compose.yml 
b/test/e2e-v2/cases/lifecycle/docker-compose.yml
new file mode 100644
index 00000000..fa776002
--- /dev/null
+++ b/test/e2e-v2/cases/lifecycle/docker-compose.yml
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+  etcd:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: etcd
+    networks:
+      - e2e
+
+  data-hot1:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: data
+    hostname: data-hot1
+    command: data --etcd-endpoints=http://etcd:2379 --node-labels type=hot
+    volumes:
+      - /tmp/measure/data-hot1:/tmp/measure
+      - /tmp/stream/data-hot1:/tmp/stream
+      - /tmp/property/data-hot1:/tmp/property
+    networks:
+      - e2e
+
+  data-hot2:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: data
+    hostname: data-hot2  
+    command: data --etcd-endpoints=http://etcd:2379 --node-labels type=hot
+    volumes:
+      - /tmp/measure/data-hot2:/tmp/measure
+      - /tmp/stream/data-hot2:/tmp/stream
+      - /tmp/property/data-hot2:/tmp/property
+    networks:
+      - e2e
+
+  data-warm1:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: data
+    hostname: data-warm1
+    command: data --etcd-endpoints=http://etcd:2379 --node-labels type=warm
+    volumes:
+      - /tmp/measure/data-warm1:/tmp/measure
+      - /tmp/stream/data-warm1:/tmp/stream
+      - /tmp/property/data-warm1:/tmp/property
+    networks:
+      - e2e
+
+  data-warm2:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: data
+    hostname: data-warm2
+    command: data --etcd-endpoints=http://etcd:2379 --node-labels type=warm
+    volumes:
+      - /tmp/measure/data-warm2:/tmp/measure
+      - /tmp/stream/data-warm2:/tmp/stream
+      - /tmp/property/data-warm2:/tmp/property
+    networks:
+      - e2e
+
+  data-cold1:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: data
+    hostname: data-cold1
+    command: data --etcd-endpoints=http://etcd:2379 --node-labels type=cold
+    volumes:
+      - /tmp/measure/data-cold1:/tmp/measure
+      - /tmp/stream/data-cold1:/tmp/stream
+      - /tmp/property/data-cold1:/tmp/property
+    networks:
+      - e2e
+
+  liaison:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: liaison
+    command: liaison --etcd-endpoints=http://etcd:2379 --data-node-selector 
type=hot
+    networks:
+      - e2e
+
+  oap:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: oap
+    environment:
+      SW_STORAGE: banyandb
+      SW_STORAGE_BANYANDB_TARGETS: "liaison:17912"
+      SW_STORAGE_BANYANDB_GM_MINUTE_ENABLE_WARM_STAGE: true
+      SW_STORAGE_BANYANDB_GM_MINUTE_ENABLE_COLD_STAGE: true
+    ports:
+      - 12800
+    depends_on:
+      liaison:
+        condition: service_healthy
+
+  agent:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: agent
+    networks:
+      - e2e
+
+  provider:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: provider
+    ports:
+      - 9090
+    networks:
+      - e2e
+    depends_on:
+      oap:
+        condition: service_healthy
+      agent:
+        condition: service_completed_successfully
+
+  consumer:
+    extends:
+      file: ../../script/docker-compose/base-compose.yml
+      service: consumer
+    ports:
+      - 9092
+    depends_on:
+      oap:
+        condition: service_healthy
+      provider:
+        condition: service_healthy
+      agent:
+        condition: service_completed_successfully
+
+networks:
+  e2e:
+
+volumes:
+  sw_agent:
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 5c7958af..122c5d1b 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -17,14 +17,15 @@ 
SW_AGENT_JAVA_COMMIT=f0245864e4388a388fe7445b56b6ce7cedc94aaf
 SW_AGENT_SATELLITE_COMMIT=ea27a3f4e126a24775fe12e2aa2695bcb23d99c3
 SW_AGENT_NGINX_LUA_COMMIT=c3cee4841798a147d83b96a10914d4ac0e11d0aa
 SW_AGENT_NODEJS_COMMIT=4f9a91dad3dfd8cfe5ba8f7bd06b39e11eb5e65e
-SW_AGENT_GO_COMMIT=774a6d56baba1187eb03bf1861af542c923b3dff
+SW_AGENT_GO_COMMIT=154de50628e82e590941585411299459e352317d
 SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
 SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
 SW_KUBERNETES_COMMIT_SHA=6fe5e6f0d3b7686c6be0457733e825ee68cb9b35
-SW_ROVER_COMMIT=40d03c14d638339bf4aa6ec7376d421323de865a
+SW_ROVER_COMMIT=4c0cb8429a96f190ea30eac1807008d523c749c3
 SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3
-SW_CTL_COMMIT=67cbc89dd7b214d5791321a7ca992f940cb586ba
+SW_PREDICTOR_COMMIT=54a0197654a3781a6f73ce35146c712af297c994
 
-SW_OAP_COMMIT=6273cb2c16e69a0a4df4e20d4a182469ff3c276c
+SW_OAP_COMMIT=c63fe21e75c7b898564add0035c220d9b242f7d2
 SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=d3f8fe894d1a206164b73f5b523d2eb62d9e9965
+SW_CTL_COMMIT=67cbc89dd7b214d5791321a7ca992f940cb586ba
\ No newline at end of file

Reply via email to