This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 704432c9 [Merged without e2e] add lifecycle query test (#657)
704432c9 is described below
commit 704432c90450daceca573b55eb890aabe2f6ca53
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Apr 27 10:45:46 2025 +0800
[Merged without e2e] add lifecycle query test (#657)
---
api/common/id.go | 5 +
api/proto/banyandb/measure/v1/topn.proto | 2 +
banyand/dquery/dquery.go | 71 ++++++++--
banyand/dquery/measure.go | 24 +++-
banyand/dquery/stream.go | 23 +++-
banyand/dquery/topn.go | 24 +++-
banyand/liaison/grpc/discovery.go | 32 -----
banyand/liaison/grpc/measure.go | 11 +-
banyand/liaison/grpc/stream.go | 10 +-
banyand/queue/pub/pub.go | 4 +
docs/api-reference.md | 1 +
pkg/cmdsetup/liaison.go | 5 +-
pkg/query/executor/interface.go | 1 +
.../logical/measure/measure_plan_distributed.go | 5 +-
.../logical/stream/stream_plan_distributed.go | 3 +-
test/docker/Dockerfile | 2 +-
test/docker/base-compose.yml | 2 +
test/e2e-v2/cases/lifecycle/docker-compose.yml | 153 +++++++++++++++++++++
test/e2e-v2/script/env | 9 +-
19 files changed, 310 insertions(+), 77 deletions(-)
diff --git a/api/common/id.go b/api/common/id.go
index aafbfb53..6a8e48e7 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -253,3 +253,8 @@ type contextNodeKey struct{}
var ContextNodeRolesKey = contextNodeRolesKey{}
type contextNodeRolesKey struct{}
+
+// ContextNodeSelectorKey is a context key to store the node selector.
+var ContextNodeSelectorKey = contextNodeSelectorKey{}
+
+type contextNodeSelectorKey struct{}
diff --git a/api/proto/banyandb/measure/v1/topn.proto
b/api/proto/banyandb/measure/v1/topn.proto
index dc15a444..48afc024 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -67,4 +67,6 @@ message TopNRequest {
model.v1.Sort field_value_sort = 7;
// trace is used to enable trace for the query
bool trace = 8;
+ // stages is used to specify the stage of the data points in the lifecycle
+ repeated string stages = 9;
}
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index c56f410e..653aa811 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -21,12 +21,14 @@ package dquery
import (
"context"
"errors"
+ "strings"
"time"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -41,7 +43,8 @@ import (
)
const (
- moduleName = "distributed-query"
+ moduleName = "distributed-query"
+ hotStageName = "hot"
)
var (
@@ -52,16 +55,17 @@ var (
)
type queryService struct {
- metaService metadata.Repo
- pipeline queue.Server
- omr observability.MetricsRegistry
- log *logger.Logger
- sqp *streamQueryProcessor
- mqp *measureQueryProcessor
- tqp *topNQueryProcessor
- closer *run.Closer
- nodeID string
- slowQuery time.Duration
+ metaService metadata.Repo
+ pipeline queue.Server
+ omr observability.MetricsRegistry
+ log *logger.Logger
+ sqp *streamQueryProcessor
+ mqp *measureQueryProcessor
+ tqp *topNQueryProcessor
+ closer *run.Closer
+ nodeID string
+ hotStageNodeSelector string
+ slowQuery time.Duration
}
// NewService return a new query service.
@@ -109,11 +113,17 @@ func (q *queryService) PreRun(ctx context.Context) error {
}
node := val.(common.Node)
q.nodeID = node.NodeID
+ val = ctx.Value(common.ContextNodeSelectorKey)
+ if val != nil {
+ q.hotStageNodeSelector = val.(string)
+ }
+
q.log = logger.GetLogger(moduleName)
q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log,
schema.NewMetrics(q.omr.With(streamScope)))
q.mqp.measureService = measure.NewPortableRepository(q.metaService,
q.log,
schema.NewMetrics(q.omr.With(measureScope)))
+ q.tqp.measureService = q.mqp.measureService
return multierr.Combine(
q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
@@ -132,13 +142,50 @@ func (q *queryService) Serve() run.StopNotify {
return q.closer.CloseNotify()
}
+func (q *queryService) parseNodeSelector(stages []string, resource
*commonv1.ResourceOpts) ([]string, bool) {
+ if len(stages) == 0 {
+ stages = resource.DefaultStages
+ }
+ if len(stages) == 0 {
+ return nil, false
+ }
+
+ var nodeSelectors []string
+ for _, stage := range resource.Stages {
+ for _, sn := range stages {
+ if strings.EqualFold(sn, stage.Name) {
+ ns := stage.NodeSelector
+ ns = strings.TrimSpace(ns)
+ if ns == "" {
+ continue
+ }
+ nodeSelectors = append(nodeSelectors, ns)
+ break
+ }
+ if strings.EqualFold(sn, hotStageName) &&
q.hotStageNodeSelector != "" {
+ nodeSelectors = append(nodeSelectors,
q.hotStageNodeSelector)
+ break
+ }
+ }
+ }
+ if len(nodeSelectors) == 0 {
+ return nil, false
+ }
+ return nodeSelectors, true
+}
+
var _ executor.DistributedExecutionContext = (*distributedContext)(nil)
type distributedContext struct {
bus.Broadcaster
- timeRange *modelv1.TimeRange
+ timeRange *modelv1.TimeRange
+ nodeSelectors map[string][]string
}
func (dc *distributedContext) TimeRange() *modelv1.TimeRange {
return dc.timeRange
}
+
+func (dc *distributedContext) NodeSelectors() map[string][]string {
+ return dc.nodeSelectors
+}
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index a1e3ecce..56548d7a 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -83,12 +83,30 @@ func (p *measureQueryProcessor) Rev(ctx context.Context,
message bus.Message) (r
if e := ml.Debug(); e.Enabled() {
e.Str("plan", plan.String()).Msg("query plan")
}
+ nodeSelectors := make(map[string][]string)
+ for _, g := range queryCriteria.Groups {
+ if gs, ok := p.measureService.LoadGroup(g); ok {
+ if ns, exist :=
p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist {
+ nodeSelectors[g] = ns
+ }
+ } else {
+ ml.Error().RawJSON("req",
logger.Proto(queryCriteria)).Msg("group not found")
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("group %s not found", g))
+ return
+ }
+ }
+ if len(queryCriteria.Stages) > 0 && len(nodeSelectors) == 0 {
+ ml.Error().RawJSON("req", logger.Proto(queryCriteria)).Msg("no
stage found")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("no
stage found"))
+ return
+ }
var tracer *query.Tracer
var span *query.Span
if queryCriteria.Trace {
tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, ctx = tracer.StartSpan(ctx, "distributed-%s",
p.queryService.nodeID)
span.Tag("plan", plan.String())
+ span.Tagf("nodeSelectors", "%v", nodeSelectors)
defer func() {
data := resp.Data()
switch d := data.(type) {
@@ -103,9 +121,11 @@ func (p *measureQueryProcessor) Rev(ctx context.Context,
message bus.Message) (r
span.Stop()
}()
}
+
mIterator, err :=
plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(ctx,
&distributedContext{
- Broadcaster: p.broadcaster,
- timeRange: queryCriteria.TimeRange,
+ Broadcaster: p.broadcaster,
+ timeRange: queryCriteria.TimeRange,
+ nodeSelectors: nodeSelectors,
}))
if err != nil {
ml.Error().Err(err).Dur("latency",
time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to query")
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 80a31c9d..6cdd2ae7 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -79,12 +79,30 @@ func (p *streamQueryProcessor) Rev(ctx context.Context,
message bus.Message) (re
if p.log.Debug().Enabled() {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
+ nodeSelectors := make(map[string][]string)
+ for _, g := range queryCriteria.Groups {
+ if gs, ok := p.streamService.LoadGroup(g); ok {
+ if ns, exist :=
p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist {
+ nodeSelectors[g] = ns
+ }
+ } else {
+ p.log.Error().RawJSON("req",
logger.Proto(queryCriteria)).Msg("group not found")
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("group %s not found", g))
+ return
+ }
+ }
+ if len(queryCriteria.Stages) > 0 && len(nodeSelectors) == 0 {
+ p.log.Error().RawJSON("req",
logger.Proto(queryCriteria)).Msg("no stage found")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("no
stage found"))
+ return
+ }
if queryCriteria.Trace {
var tracer *query.Tracer
var span *query.Span
tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, ctx = tracer.StartSpan(ctx, "distributed-%s",
p.queryService.nodeID)
span.Tag("plan", plan.String())
+ span.Tagf("nodeSelectors", "%v", nodeSelectors)
defer func() {
data := resp.Data()
switch d := data.(type) {
@@ -102,8 +120,9 @@ func (p *streamQueryProcessor) Rev(ctx context.Context,
message bus.Message) (re
se := plan.(executor.StreamExecutable)
defer se.Close()
entities, err :=
se.Execute(executor.WithDistributedExecutionContext(ctx, &distributedContext{
- Broadcaster: p.broadcaster,
- timeRange: queryCriteria.TimeRange,
+ Broadcaster: p.broadcaster,
+ timeRange: queryCriteria.TimeRange,
+ nodeSelectors: nodeSelectors,
}))
if err != nil {
p.log.Error().Err(err).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to execute the query plan")
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index 067d7f0e..b7adf5c1 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/api/data"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
@@ -40,7 +41,8 @@ import (
const defaultTopNQueryTimeout = 10 * time.Second
type topNQueryProcessor struct {
- broadcaster bus.Broadcaster
+ measureService measure.SchemaService
+ broadcaster bus.Broadcaster
*queryService
*bus.UnImplementedHealthyListener
}
@@ -64,11 +66,29 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
if e := t.log.Debug(); e.Enabled() {
e.RawJSON("req", logger.Proto(request)).Msg("received a topN
query event")
}
+ nodeSelectors := make(map[string][]string)
+ for _, g := range request.Groups {
+ if gs, ok := t.measureService.LoadGroup(g); ok {
+ if ns, exist := t.parseNodeSelector(request.Stages,
gs.GetSchema().ResourceOpts); exist {
+ nodeSelectors[g] = ns
+ }
+ } else {
+ t.log.Error().Str("group", g).Msg("failed to load
group")
+ resp = bus.NewMessage(now, common.NewError("failed to
load group %s", g))
+ return
+ }
+ }
+ if len(request.Stages) > 0 && len(nodeSelectors) == 0 {
+ t.log.Error().RawJSON("req", logger.Proto(request)).Msg("no
stage found")
+ resp = bus.NewMessage(now, common.NewError("no stage found"))
+ return
+ }
if request.Trace {
var tracer *pkgquery.Tracer
tracer, ctx = pkgquery.NewTracer(ctx,
n.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "distributed-client")
span.Tag("request",
convert.BytesToString(logger.Proto(request)))
+ span.Tagf("nodeSelectors", "%v", nodeSelectors)
defer func() {
data := resp.Data()
switch d := data.(type) {
@@ -85,7 +105,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
}
agg := request.Agg
request.Agg =
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
- ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout,
data.TopicTopNQuery, bus.NewMessage(now, request))
+ ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout,
data.TopicTopNQuery, bus.NewMessageWithNodeSelectors(now, nodeSelectors,
request.TimeRange, request))
if err != nil {
resp = bus.NewMessage(now, common.NewError("execute the query
%s: %v", request.GetName(), err))
return
diff --git a/banyand/liaison/grpc/discovery.go
b/banyand/liaison/grpc/discovery.go
index b85fd472..0df7b6ef 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -19,7 +19,6 @@ package grpc
import (
"fmt"
- "strings"
"sync"
"github.com/pkg/errors"
@@ -163,37 +162,6 @@ func (s *groupRepo) shardNum(groupName string) (uint32,
bool) {
return r.ShardNum, true
}
-func (s *groupRepo) getNodeSelector(groupName string, stages []string)
([]string, bool) {
- s.RWMutex.RLock()
- defer s.RWMutex.RUnlock()
- r, ok := s.resourceOpts[groupName]
- if !ok {
- return nil, false
- }
- if len(stages) == 0 {
- stages = r.DefaultStages
- }
- if len(stages) == 0 {
- return nil, false
- }
-
- var nodeSelectors []string
- for _, stage := range r.Stages {
- for _, sn := range stages {
- if strings.EqualFold(sn, stage.Name) {
- ns := stage.NodeSelector
- ns = strings.TrimSpace(ns)
- if ns == "" {
- continue
- }
- nodeSelectors = append(nodeSelectors, ns)
- break
- }
- }
- }
- return nodeSelectors, true
-}
-
func getID(metadata *commonv1.Metadata) identity {
return identity{
name: metadata.GetName(),
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 15c72fa5..7dd38590 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -215,16 +215,7 @@ func (ms *measureService) Query(ctx context.Context, req
*measurev1.QueryRequest
span.Stop()
}()
}
- nodeSelectors := make(map[string][]string)
- for _, g := range req.Groups {
- if ns, exist := ms.groupRepo.getNodeSelector(g, req.Stages);
exist {
- nodeSelectors[g] = ns
- } else {
- nodeSelectors[g] = nil
- }
- }
-
- feat, err := ms.broadcaster.Publish(ctx, data.TopicMeasureQuery,
bus.NewMessageWithNodeSelectors(bus.MessageID(now.UnixNano()), nodeSelectors,
req.TimeRange, req))
+ feat, err := ms.broadcaster.Publish(ctx, data.TopicMeasureQuery,
bus.NewMessage(bus.MessageID(now.UnixNano()), req))
if err != nil {
return nil, err
}
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index b1920878..c7c0c245 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -213,15 +213,7 @@ func (s *streamService) Query(ctx context.Context, req
*streamv1.QueryRequest) (
span.Stop()
}()
}
- nodeSelectors := make(map[string][]string)
- for _, g := range req.Groups {
- if ns, exist := s.groupRepo.getNodeSelector(g, req.Stages);
exist {
- nodeSelectors[g] = ns
- } else {
- nodeSelectors[g] = nil
- }
- }
- message :=
bus.NewMessageWithNodeSelectors(bus.MessageID(now.UnixNano()), nodeSelectors,
req.TimeRange, req)
+ message := bus.NewMessage(bus.MessageID(now.UnixNano()), req)
feat, errQuery := s.broadcaster.Publish(ctx, data.TopicStreamQuery,
message)
if errQuery != nil {
if errors.Is(errQuery, io.EOF) {
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 77972843..da5efca0 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -161,6 +161,10 @@ func (p *pub) Broadcast(timeout time.Duration, topic
bus.Topic, messages bus.Mes
l.Msgf("broadcasting message to %s nodes", names)
}
+ if len(names) == 0 {
+ return nil, fmt.Errorf("no nodes match the selector %v",
messages.NodeSelectors())
+ }
+
futureCh := make(chan publishResult, len(names))
var wg sync.WaitGroup
for n := range names {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index c90c4b39..9fd262ad 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -3112,6 +3112,7 @@ TopNRequest is the request contract for query.
| conditions | [banyandb.model.v1.Condition](#banyandb-model-v1-Condition) |
repeated | criteria select counters. Only equals are acceptable. |
| field_value_sort | [banyandb.model.v1.Sort](#banyandb-model-v1-Sort) | |
field_value_sort indicates how to sort fields |
| trace | [bool](#bool) | | trace is used to enable trace for the query |
+| stages | [string](#string) | repeated | stages is used to specify the stage
of the data points in the lifecycle |
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 1ae588d8..48082843 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -86,7 +86,9 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
Version: version.Build(),
Short: "Run as the liaison server",
RunE: func(_ *cobra.Command, _ []string) (err error) {
+ ctx := context.Background()
if nodeSelector != "" {
+ ctx = context.WithValue(ctx,
common.ContextNodeSelectorKey, nodeSelector)
var ls *pub.LabelSelector
ls, err = pub.ParseLabelSelector(nodeSelector)
if err != nil {
@@ -101,8 +103,9 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
return err
}
logger.GetLogger().Info().Msg("starting as a liaison
server")
+ ctx = context.WithValue(ctx, common.ContextNodeKey,
node)
// Spawn our go routines and wait for shutdown.
- if err :=
liaisonGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey,
node)); err != nil {
+ if err := liaisonGroup.Run(ctx); err != nil {
logger.GetLogger().Error().Err(err).Stack().Str("name",
liaisonGroup.Name()).Msg("Exit")
os.Exit(-1)
}
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index 6ba7071b..d48675f6 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -92,6 +92,7 @@ type MeasureExecutable interface {
type DistributedExecutionContext interface {
bus.Broadcaster
TimeRange() *modelv1.TimeRange
+ NodeSelectors() map[string][]string
}
// DistributedExecutionContextKey is the key of distributed execution context
in context.Context.
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index 3362e6cc..c6843029 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -170,6 +170,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi
executor.MIterator, e
span, _ = tracer.StartSpan(ctx, "distributed-client")
queryRequest.Trace = true
span.Tag("request",
convert.BytesToString(logger.Proto(queryRequest)))
+ span.Tag("node_selectors", fmt.Sprintf("%v",
dctx.NodeSelectors()))
+ span.Tag("time_range", dctx.TimeRange().String())
defer func() {
if err != nil {
span.Error(err)
@@ -178,7 +180,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi
executor.MIterator, e
}
}()
}
- ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
+ ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery,
+
bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos),
dctx.NodeSelectors(), dctx.TimeRange(), queryRequest))
if err != nil {
return nil, err
}
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go
b/pkg/query/logical/stream/stream_plan_distributed.go
index cf8de2b2..ba55e3af 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -152,7 +152,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee
[]*streamv1.Element,
}
}()
}
- ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
+ ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery,
+
bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos),
dctx.NodeSelectors(), dctx.TimeRange(), queryRequest))
if err != nil {
return nil, err
}
diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile
index 4ce236a5..a35c463b 100644
--- a/test/docker/Dockerfile
+++ b/test/docker/Dockerfile
@@ -26,7 +26,7 @@ COPY
banyand/build/bin/linux/${TARGETARCH}/banyand-server-static /banyand
COPY bydbctl/build/bin/linux/${TARGETARCH}/bydbctl-cli-static /bydbctl
COPY --from=certs /etc/ssl/certs /etc/ssl/certs
-ENV GRPC_GO_LOG_SEVERITY_LEVEL=INFO
+ENV GRPC_GO_LOG_SEVERITY_LEVEL=WARN
ENV GRPC_GO_LOG_FORMATTER=json
EXPOSE 17912
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index 9fcb81f9..0488a9a7 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -28,6 +28,8 @@ services:
liaison:
hostname: liaison
+ ports:
+ - 17913:17913
expose:
- 17912
- 2121
diff --git a/test/e2e-v2/cases/lifecycle/docker-compose.yml
b/test/e2e-v2/cases/lifecycle/docker-compose.yml
new file mode 100644
index 00000000..fa776002
--- /dev/null
+++ b/test/e2e-v2/cases/lifecycle/docker-compose.yml
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+ etcd:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: etcd
+ networks:
+ - e2e
+
+ data-hot1:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: data
+ hostname: data-hot1
+ command: data --etcd-endpoints=http://etcd:2379 --node-labels type=hot
+ volumes:
+ - /tmp/measure/data-hot1:/tmp/measure
+ - /tmp/stream/data-hot1:/tmp/stream
+ - /tmp/property/data-hot1:/tmp/property
+ networks:
+ - e2e
+
+ data-hot2:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: data
+ hostname: data-hot2
+ command: data --etcd-endpoints=http://etcd:2379 --node-labels type=hot
+ volumes:
+ - /tmp/measure/data-hot2:/tmp/measure
+ - /tmp/stream/data-hot2:/tmp/stream
+ - /tmp/property/data-hot2:/tmp/property
+ networks:
+ - e2e
+
+ data-warm1:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: data
+ hostname: data-warm1
+ command: data --etcd-endpoints=http://etcd:2379 --node-labels type=warm
+ volumes:
+ - /tmp/measure/data-warm1:/tmp/measure
+ - /tmp/stream/data-warm1:/tmp/stream
+ - /tmp/property/data-warm1:/tmp/property
+ networks:
+ - e2e
+
+ data-warm2:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: data
+ hostname: data-warm2
+ command: data --etcd-endpoints=http://etcd:2379 --node-labels type=warm
+ volumes:
+ - /tmp/measure/data-warm2:/tmp/measure
+ - /tmp/stream/data-warm2:/tmp/stream
+ - /tmp/property/data-warm2:/tmp/property
+ networks:
+ - e2e
+
+ data-cold1:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: data
+ hostname: data-cold1
+ command: data --etcd-endpoints=http://etcd:2379 --node-labels type=cold
+ volumes:
+ - /tmp/measure/data-cold1:/tmp/measure
+ - /tmp/stream/data-cold1:/tmp/stream
+ - /tmp/property/data-cold1:/tmp/property
+ networks:
+ - e2e
+
+ liaison:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: liaison
+ command: liaison --etcd-endpoints=http://etcd:2379 --data-node-selector
type=hot
+ networks:
+ - e2e
+
+ oap:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: oap
+ environment:
+ SW_STORAGE: banyandb
+ SW_STORAGE_BANYANDB_TARGETS: "liaison:17912"
+ SW_STORAGE_BANYANDB_GM_MINUTE_ENABLE_WARM_STAGE: true
+ SW_STORAGE_BANYANDB_GM_MINUTE_ENABLE_COLD_STAGE: true
+ ports:
+ - 12800
+ depends_on:
+ liaison:
+ condition: service_healthy
+
+ agent:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: agent
+ networks:
+ - e2e
+
+ provider:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: provider
+ ports:
+ - 9090
+ networks:
+ - e2e
+ depends_on:
+ oap:
+ condition: service_healthy
+ agent:
+ condition: service_completed_successfully
+
+ consumer:
+ extends:
+ file: ../../script/docker-compose/base-compose.yml
+ service: consumer
+ ports:
+ - 9092
+ depends_on:
+ oap:
+ condition: service_healthy
+ provider:
+ condition: service_healthy
+ agent:
+ condition: service_completed_successfully
+
+networks:
+ e2e:
+
+volumes:
+ sw_agent:
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 5c7958af..122c5d1b 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -17,14 +17,15 @@
SW_AGENT_JAVA_COMMIT=f0245864e4388a388fe7445b56b6ce7cedc94aaf
SW_AGENT_SATELLITE_COMMIT=ea27a3f4e126a24775fe12e2aa2695bcb23d99c3
SW_AGENT_NGINX_LUA_COMMIT=c3cee4841798a147d83b96a10914d4ac0e11d0aa
SW_AGENT_NODEJS_COMMIT=4f9a91dad3dfd8cfe5ba8f7bd06b39e11eb5e65e
-SW_AGENT_GO_COMMIT=774a6d56baba1187eb03bf1861af542c923b3dff
+SW_AGENT_GO_COMMIT=154de50628e82e590941585411299459e352317d
SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=6fe5e6f0d3b7686c6be0457733e825ee68cb9b35
-SW_ROVER_COMMIT=40d03c14d638339bf4aa6ec7376d421323de865a
+SW_ROVER_COMMIT=4c0cb8429a96f190ea30eac1807008d523c749c3
SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3
-SW_CTL_COMMIT=67cbc89dd7b214d5791321a7ca992f940cb586ba
+SW_PREDICTOR_COMMIT=54a0197654a3781a6f73ce35146c712af297c994
-SW_OAP_COMMIT=6273cb2c16e69a0a4df4e20d4a182469ff3c276c
+SW_OAP_COMMIT=c63fe21e75c7b898564add0035c220d9b242f7d2
SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=d3f8fe894d1a206164b73f5b523d2eb62d9e9965
+SW_CTL_COMMIT=67cbc89dd7b214d5791321a7ca992f940cb586ba
\ No newline at end of file