This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 2de59d80 Add query test cases of lifecycle (#660)
2de59d80 is described below
commit 2de59d80f7edbc57ada66212bcfdc79f5b3b28a9
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Apr 29 09:46:03 2025 +0800
Add query test cases of lifecycle (#660)
---
api/proto/banyandb/database/v1/database.proto | 4 -
banyand/backup/lifecycle/steps.go | 1 +
banyand/dquery/dquery.go | 11 +-
banyand/dquery/measure.go | 4 +
banyand/dquery/stream.go | 4 +
banyand/dquery/topn.go | 4 +
banyand/internal/storage/rotation_test.go | 89 +--------
banyand/internal/storage/segment.go | 76 +++-----
banyand/internal/storage/segment_test.go | 4 -
banyand/internal/storage/tsdb.go | 7 +-
banyand/measure/metadata.go | 1 -
banyand/metadata/client.go | 49 -----
banyand/metadata/metadata.go | 2 -
banyand/metadata/schema/register_test.go | 25 ---
banyand/queue/pub/client_test.go | 66 -------
banyand/queue/pub/pub.go | 9 +-
banyand/queue/pub/pub_suite_test.go | 3 +-
banyand/queue/pub/pub_test.go | 189 +------------------
banyand/stream/metadata.go | 1 -
docs/api-reference.md | 206 ++++++++++-----------
pkg/test/helpers/context.go | 32 ++--
pkg/test/measure/etcd.go | 72 ++++---
.../measure/testdata/group_stages/exception.json | 18 ++
.../measure/testdata/group_stages/index_mode.json | 18 ++
.../{groups => group_stages}/sw_metric.json | 0
pkg/test/measure/testdata/groups/sw_metric.json | 17 +-
pkg/test/stream/etcd.go | 65 +++++--
pkg/test/stream/testdata/group.json | 17 +-
.../{group.json => group_with_stages.json} | 0
test/cases/lifecycle/lifecycle.go | 87 ++++++++-
test/cases/measure/data/data.go | 1 +
test/cases/stream/data/data.go | 18 +-
test/cases/topn/data/data.go | 1 +
test/e2e-v2/script/env | 2 +-
.../distributed/lifecycle/lifecycle_suite_test.go | 37 ++--
35 files changed, 422 insertions(+), 718 deletions(-)
diff --git a/api/proto/banyandb/database/v1/database.proto
b/api/proto/banyandb/database/v1/database.proto
index a5caddba..2b4de339 100644
--- a/api/proto/banyandb/database/v1/database.proto
+++ b/api/proto/banyandb/database/v1/database.proto
@@ -20,7 +20,6 @@ syntax = "proto3";
package banyandb.database.v1;
import "banyandb/common/v1/common.proto";
-import "banyandb/model/v1/query.proto";
import "google/protobuf/timestamp.proto";
option go_package =
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
@@ -41,9 +40,6 @@ message Node {
google.protobuf.Timestamp created_at = 5;
// labels is a set of key-value pairs to describe the node.
map<string, string> labels = 6;
- // data_segments_boundary is the time range of the data segments that the
node is responsible for.
- // [start, end) is the time range.
- map<string, model.v1.TimeRange> data_segments_boundary = 7;
}
message Shard {
diff --git a/banyand/backup/lifecycle/steps.go
b/banyand/backup/lifecycle/steps.go
index 7816b881..620be9e9 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -249,6 +249,7 @@ func migrateMeasure(ctx context.Context, m
*databasev1.Measure, result model.Mea
Metadata: m.Metadata,
DataPoint: &measurev1.DataPointValue{
Timestamp: timestamppb.New(time.Unix(0,
mr.Timestamps[i])),
+ Version: mr.Versions[i],
},
MessageId: uint64(time.Now().UnixNano()),
}
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index 653aa811..74cedafe 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -151,8 +151,8 @@ func (q *queryService) parseNodeSelector(stages []string,
resource *commonv1.Res
}
var nodeSelectors []string
- for _, stage := range resource.Stages {
- for _, sn := range stages {
+ for _, sn := range stages {
+ for _, stage := range resource.Stages {
if strings.EqualFold(sn, stage.Name) {
ns := stage.NodeSelector
ns = strings.TrimSpace(ns)
@@ -162,10 +162,9 @@ func (q *queryService) parseNodeSelector(stages []string,
resource *commonv1.Res
nodeSelectors = append(nodeSelectors, ns)
break
}
- if strings.EqualFold(sn, hotStageName) &&
q.hotStageNodeSelector != "" {
- nodeSelectors = append(nodeSelectors,
q.hotStageNodeSelector)
- break
- }
+ }
+ if strings.EqualFold(sn, hotStageName) &&
q.hotStageNodeSelector != "" {
+ nodeSelectors = append(nodeSelectors,
q.hotStageNodeSelector)
}
}
if len(nodeSelectors) == 0 {
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index 56548d7a..1b1a62e0 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -88,6 +88,10 @@ func (p *measureQueryProcessor) Rev(ctx context.Context,
message bus.Message) (r
if gs, ok := p.measureService.LoadGroup(g); ok {
if ns, exist :=
p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist {
nodeSelectors[g] = ns
+ } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
+ ml.Error().Strs("req_stages",
queryCriteria.Stages).Strs("default_stages",
gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("no stage found in request or default stages in resource opts"))
+ return
}
} else {
ml.Error().RawJSON("req",
logger.Proto(queryCriteria)).Msg("group not found")
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 6cdd2ae7..184e3438 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -84,6 +84,10 @@ func (p *streamQueryProcessor) Rev(ctx context.Context,
message bus.Message) (re
if gs, ok := p.streamService.LoadGroup(g); ok {
if ns, exist :=
p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist {
nodeSelectors[g] = ns
+ } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
+ p.log.Error().Strs("req_stages",
queryCriteria.Stages).Strs("default_stages",
gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("no stage found in request or default stages in resource opts"))
+ return
}
} else {
p.log.Error().RawJSON("req",
logger.Proto(queryCriteria)).Msg("group not found")
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index b7adf5c1..1bb3ad99 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -71,6 +71,10 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
if gs, ok := t.measureService.LoadGroup(g); ok {
if ns, exist := t.parseNodeSelector(request.Stages,
gs.GetSchema().ResourceOpts); exist {
nodeSelectors[g] = ns
+ } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
+ t.log.Error().Strs("req_stages",
request.Stages).Strs("default_stages",
gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
+ resp = bus.NewMessage(now, common.NewError("no
stage found in request or default stages in resource opts"))
+ return
}
} else {
t.log.Error().Str("group", g).Msg("failed to load
group")
diff --git a/banyand/internal/storage/rotation_test.go
b/banyand/internal/storage/rotation_test.go
index 43f292ef..ccc9e271 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -19,7 +19,6 @@ package storage
import (
"context"
- "sync"
"testing"
"time"
@@ -27,7 +26,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/api/common"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -36,77 +34,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
-type boundaryUpdateTracker struct {
- updates []*modelv1.TimeRange
- called int
- mu sync.Mutex
-}
-
-func (t *boundaryUpdateTracker) recordUpdate(_, _ string, boundary
*modelv1.TimeRange) {
- t.mu.Lock()
- defer t.mu.Unlock()
- t.updates = append(t.updates, boundary)
- t.called++
-}
-
-func TestSegmentBoundaryUpdateFn(t *testing.T) {
- t.Run("called when a segment is created", func(t *testing.T) {
- tsdb, c, _, tracker, dfFn := setUpDB(t)
- defer dfFn()
-
- // Initial segment creation should have triggered it once
- require.Equal(t, 1, tracker.called)
-
- // Create new segment
- ts := c.Now().Add(23*time.Hour + time.Second)
- tsdb.Tick(ts.UnixNano())
-
- // Verify the function was called again
- assert.Eventually(t, func() bool {
- tracker.mu.Lock()
- defer tracker.mu.Unlock()
- return tracker.called >= 2
- }, flags.EventuallyTimeout, time.Millisecond, "boundary update
function should be called")
- })
-
- t.Run("called when a segment is deleted", func(t *testing.T) {
- tsdb, c, segCtrl, tracker, dfFn := setUpDB(t)
- defer dfFn()
-
- ts := c.Now()
- for i := 0; i < 4; i++ {
- ts = ts.Add(23 * time.Hour)
- c.Set(ts)
- tsdb.Tick(ts.UnixNano())
- t.Logf("current time: %s", ts.Format(time.RFC3339))
- expected := i + 2
- require.EventuallyWithTf(t, func(ct *assert.CollectT) {
- segments, _ := segCtrl.segments(false)
- if len(segments) != expected {
- ct.Errorf("expect %d segments, got %d",
expected, len(segments))
- }
- }, flags.EventuallyTimeout, time.Millisecond, "wait for
%d segment to be created", expected)
- ts = ts.Add(time.Hour)
- }
-
- tracker.mu.Lock()
- called := tracker.called
- tracker.mu.Unlock()
-
- c.Set(ts)
- tsdb.Tick(ts.UnixNano())
-
- assert.Eventually(t, func() bool {
- tracker.mu.Lock()
- defer tracker.mu.Unlock()
- return tracker.called > called
- }, flags.EventuallyTimeout, time.Millisecond, "boundary update
function should be called after deletion")
- })
-}
-
func TestForwardRotation(t *testing.T) {
t.Run("create a new segment when the time is up", func(t *testing.T) {
- tsdb, c, segCtrl, _, dfFn := setUpDB(t)
+ tsdb, c, segCtrl, dfFn := setUpDB(t)
defer dfFn()
ts := c.Now().Add(23*time.Hour + time.Second)
t.Logf("current time: %s", ts.Format(time.RFC3339))
@@ -118,7 +48,7 @@ func TestForwardRotation(t *testing.T) {
})
t.Run("no new segment created when the time is not up", func(t
*testing.T) {
- tsdb, c, segCtrl, _, dfFn := setUpDB(t)
+ tsdb, c, segCtrl, dfFn := setUpDB(t)
defer dfFn()
ts := c.Now().Add(22*time.Hour + 59*time.Minute +
59*time.Second)
t.Logf("current time: %s", ts.Format(time.RFC3339))
@@ -132,7 +62,7 @@ func TestForwardRotation(t *testing.T) {
func TestRetention(t *testing.T) {
t.Run("delete the segment and index when the TTL is up", func(t
*testing.T) {
- tsdb, c, segCtrl, _, dfFn := setUpDB(t)
+ tsdb, c, segCtrl, dfFn := setUpDB(t)
defer dfFn()
ts := c.Now()
for i := 0; i < 4; i++ {
@@ -161,7 +91,7 @@ func TestRetention(t *testing.T) {
})
t.Run("keep the segment volume stable", func(t *testing.T) {
- tsdb, c, segCtrl, _, dfFn := setUpDB(t)
+ tsdb, c, segCtrl, dfFn := setUpDB(t)
defer dfFn()
ts := c.Now()
for i := 0; i < 10; i++ {
@@ -207,22 +137,15 @@ func TestRetention(t *testing.T) {
})
}
-func setUpDB(t *testing.T) (*database[*MockTSTable, any], timestamp.MockClock,
*segmentController[*MockTSTable, any], *boundaryUpdateTracker, func()) {
+func setUpDB(t *testing.T) (*database[*MockTSTable, any], timestamp.MockClock,
*segmentController[*MockTSTable, any], func()) {
dir, defFn := test.Space(require.New(t))
- // Create tracker for boundary updates
- tracker := &boundaryUpdateTracker{
- updates: make([]*modelv1.TimeRange, 0),
- }
-
TSDBOpts := TSDBOpts[*MockTSTable, any]{
Location: dir,
SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
TTL: IntervalRule{Unit: DAY, Num: 3},
ShardNum: 1,
TSTableCreator: MockTSTableCreator,
- // Add the boundary update function
- SegmentBoundaryUpdateFn: tracker.recordUpdate,
}
ctx := context.Background()
mc := timestamp.NewMockClock()
@@ -240,7 +163,7 @@ func setUpDB(t *testing.T) (*database[*MockTSTable, any],
timestamp.MockClock, *
db := tsdb.(*database[*MockTSTable, any])
segments, _ := db.segmentController.segments(false)
require.Equal(t, len(segments), 1)
- return db, mc, db.segmentController, tracker, func() {
+ return db, mc, db.segmentController, func() {
tsdb.Close()
defFn()
}
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 27bb2059..3193697a 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -30,11 +30,9 @@ import (
"time"
"github.com/pkg/errors"
- "google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -278,41 +276,39 @@ func (s *segment[T, O]) String() string {
}
type segmentController[T TSTable, O any] struct {
- clock timestamp.Clock
- metrics Metrics
- opts *TSDBOpts[T, O]
- l *logger.Logger
- indexMetrics *inverted.Metrics
- segmentBoundaryUpdateFn SegmentBoundaryUpdateFn
- position common.Position
- db string
- stage string
- location string
- lst []*segment[T, O]
- deadline atomic.Int64
- idleTimeout time.Duration
- optsMutex sync.RWMutex
+ clock timestamp.Clock
+ metrics Metrics
+ opts *TSDBOpts[T, O]
+ l *logger.Logger
+ indexMetrics *inverted.Metrics
+ position common.Position
+ db string
+ stage string
+ location string
+ lst []*segment[T, O]
+ deadline atomic.Int64
+ idleTimeout time.Duration
+ optsMutex sync.RWMutex
sync.RWMutex
}
func newSegmentController[T TSTable, O any](ctx context.Context, location
string,
l *logger.Logger, opts TSDBOpts[T, O], indexMetrics *inverted.Metrics,
metrics Metrics,
- segmentsBoundaryUpdateFn SegmentBoundaryUpdateFn, idleTimeout
time.Duration,
+ idleTimeout time.Duration,
) *segmentController[T, O] {
clock, _ := timestamp.GetClock(ctx)
p := common.GetPosition(ctx)
return &segmentController[T, O]{
- location: location,
- opts: &opts,
- l: l,
- clock: clock,
- position: common.GetPosition(ctx),
- metrics: metrics,
- indexMetrics: indexMetrics,
- segmentBoundaryUpdateFn: segmentsBoundaryUpdateFn,
- stage: p.Stage,
- db: p.Database,
- idleTimeout: idleTimeout,
+ location: location,
+ opts: &opts,
+ l: l,
+ clock: clock,
+ position: common.GetPosition(ctx),
+ metrics: metrics,
+ indexMetrics: indexMetrics,
+ stage: p.Stage,
+ db: p.Database,
+ idleTimeout: idleTimeout,
}
}
@@ -364,31 +360,9 @@ func (sc *segmentController[T, O]) createSegment(ts
time.Time) (*segment[T, O],
if err != nil {
return nil, err
}
- sc.notifySegmentBoundaryUpdate()
return s, s.incRef(context.WithValue(context.Background(),
logger.ContextKey, sc.l))
}
-func (sc *segmentController[T, O]) notifySegmentBoundaryUpdate() {
- if sc.segmentBoundaryUpdateFn == nil {
- return
- }
- // No error if we do not open closed segments.
- segs, _ := sc.segments(false)
- defer func() {
- for i := range segs {
- segs[i].DecRef()
- }
- }()
- var tr *modelv1.TimeRange
- if len(segs) > 0 {
- tr = &modelv1.TimeRange{
- Begin: timestamppb.New(segs[0].Start),
- End: timestamppb.New(segs[len(segs)-1].End),
- }
- }
- sc.segmentBoundaryUpdateFn(sc.stage, sc.db, tr)
-}
-
func (sc *segmentController[T, O]) segments(reopenClosed bool) (ss
[]*segment[T, O], err error) {
sc.RLock()
defer sc.RUnlock()
@@ -570,7 +544,6 @@ func (sc *segmentController[T, O]) remove(deadline
time.Time) (hasSegment bool,
}
s.DecRef()
}
- sc.notifySegmentBoundaryUpdate()
return hasSegment, err
}
@@ -607,7 +580,6 @@ func (sc *segmentController[T, O])
deleteExpiredSegments(timeRange timestamp.Tim
}
s.DecRef()
}
- sc.notifySegmentBoundaryUpdate()
return count
}
diff --git a/banyand/internal/storage/segment_test.go
b/banyand/internal/storage/segment_test.go
index 094bf475..87f62dbe 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -108,7 +108,6 @@ func TestSegmentOpenAndReopen(t *testing.T) {
opts,
nil, // indexMetrics
nil, // metrics
- nil, // segmentBoundaryUpdateFn
5*time.Minute, // idleTimeout
)
@@ -199,7 +198,6 @@ func TestSegmentCloseIfIdle(t *testing.T) {
opts,
nil, // indexMetrics
nil, // metrics
- nil, // segmentBoundaryUpdateFn
time.Second, // Set short idle timeout for testing
)
@@ -282,7 +280,6 @@ func TestCloseIdleAndSelectSegments(t *testing.T) {
opts,
nil, // indexMetrics
nil, // metrics
- nil, // segmentBoundaryUpdateFn
idleTimeout, // short idle timeout
)
@@ -414,7 +411,6 @@ func TestOpenExistingSegmentWithShards(t *testing.T) {
opts,
nil, // indexMetrics
nil, // metrics
- nil, // segmentBoundaryUpdateFn
5*time.Minute, // idleTimeout
)
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index eca40aa7..a0ff3eb7 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -29,7 +29,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
@@ -50,16 +49,12 @@ const (
lockFilename = "lock"
)
-// SegmentBoundaryUpdateFn is a callback function to update the segment
boundary.
-type SegmentBoundaryUpdateFn func(stage, group string, boundary
*modelv1.TimeRange)
-
// TSDBOpts wraps options to create a tsdb.
type TSDBOpts[T TSTable, O any] struct {
Option O
TableMetrics Metrics
TSTableCreator TSTableCreator[T, O]
StorageMetricsFactory *observability.Factory
- SegmentBoundaryUpdateFn SegmentBoundaryUpdateFn
Location string
SegmentInterval IntervalRule
TTL IntervalRule
@@ -138,7 +133,7 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts
TSDBOpts[T, O]) (TSDB[
tsEventCh: make(chan int64),
p: p,
segmentController: newSegmentController(ctx, location,
- l, opts, indexMetrics, opts.TableMetrics,
opts.SegmentBoundaryUpdateFn, opts.SegmentIdleTimeout),
+ l, opts, indexMetrics, opts.TableMetrics,
opts.SegmentIdleTimeout),
metrics: newMetrics(opts.StorageMetricsFactory),
disableRetention: opts.DisableRetention,
}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 85560df8..dca0b6d2 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -419,7 +419,6 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
SeriesIndexFlushTimeoutSeconds:
s.option.flushTimeout.Nanoseconds() / int64(time.Second),
SeriesIndexCacheMaxBytes:
int(s.option.seriesCacheMaxSize),
StorageMetricsFactory: factory,
- SegmentBoundaryUpdateFn:
s.metadata.UpdateSegmentsBoundary,
SegmentIdleTimeout: segmentIdleTimeout,
}
return storage.OpenTSDB(
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 7e410af7..f935f432 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -21,7 +21,6 @@ import (
"context"
"os"
"os/signal"
- "strings"
"sync"
"syscall"
"time"
@@ -33,7 +32,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -361,50 +359,3 @@ func contains(s []string, e string) bool {
}
return false
}
-
-func (s *clientService) UpdateSegmentsBoundary(stage, group string, boundary
*modelv1.TimeRange) {
- s.nodeInfoMux.Lock()
- defer s.nodeInfoMux.Unlock()
- nodeInfo := s.nodeInfo
- b := nodeInfo.DataSegmentsBoundary
- if b == nil {
- b = make(map[string]*modelv1.TimeRange)
- }
- key := getNodeBoundaryKey(group, stage)
- if boundary == nil {
- delete(b, key)
- } else {
- b[key] = boundary
- }
- nodeInfo.DataSegmentsBoundary = b
- s.nodeInfo = nodeInfo
- if err := s.schemaRegistry.UpdateNode(context.Background(), nodeInfo);
err != nil {
- logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to
update node")
- }
-}
-
-func getNodeBoundaryKey(items ...string) string {
- return strings.Join(items, "|")
-}
-
-// FindSegmentsBoundary finds the segments boundary for the given group.
-func FindSegmentsBoundary(nodeInfo *databasev1.Node, group string)
*modelv1.TimeRange {
- b := nodeInfo.DataSegmentsBoundary
- if len(b) == 0 {
- return nil
- }
- result := &modelv1.TimeRange{}
- for g, tr := range b {
- segments := strings.Split(g, "|")
- if segments[0] != group {
- continue
- }
- if result.Begin == nil ||
result.Begin.AsTime().After(tr.Begin.AsTime()) {
- result.Begin = tr.Begin
- }
- if result.End == nil ||
result.End.AsTime().Before(tr.End.AsTime()) {
- result.End = tr.End
- }
- }
- return result
-}
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index e7e981e7..14467c50 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -24,7 +24,6 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -49,7 +48,6 @@ type Repo interface {
GroupRegistry() schema.Group
TopNAggregationRegistry() schema.TopNAggregation
RegisterHandler(string, schema.Kind, schema.EventHandler)
- UpdateSegmentsBoundary(stage, group string, boundary *modelv1.TimeRange)
NodeRegistry() schema.Node
PropertyRegistry() schema.Property
}
diff --git a/banyand/metadata/schema/register_test.go
b/banyand/metadata/schema/register_test.go
index 8280e150..1e9ceef8 100644
--- a/banyand/metadata/schema/register_test.go
+++ b/banyand/metadata/schema/register_test.go
@@ -26,11 +26,9 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/onsi/gomega/gleak"
- "google.golang.org/protobuf/types/known/timestamppb"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/test"
@@ -120,27 +118,4 @@ var _ = ginkgo.Describe("etcd_register", func() {
return err
}, flags.EventuallyTimeout).ShouldNot(gomega.HaveOccurred())
})
-
- ginkgo.It("should update node's data segments boundary", func() {
- gomega.Expect(r.Register(context.Background(), md,
false)).ShouldNot(gomega.HaveOccurred())
-
- n, err := r.GetNode(context.Background(), node)
- gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-
- newBoundary := &modelv1.TimeRange{
- Begin: timestamppb.New(time.Unix(1000, 0)),
- End: timestamppb.New(time.Unix(2000, 0)),
- }
- n.DataSegmentsBoundary =
map[string]*modelv1.TimeRange{"default": newBoundary}
-
- gomega.Expect(r.UpdateNode(context.Background(),
n)).ShouldNot(gomega.HaveOccurred())
-
- updatedNode, err := r.GetNode(context.Background(), node)
- gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-
gomega.Expect(updatedNode.DataSegmentsBoundary).ShouldNot(gomega.BeNil())
-
gomega.Expect(updatedNode.DataSegmentsBoundary).Should(gomega.HaveLen(1))
-
gomega.Expect(updatedNode.DataSegmentsBoundary["default"]).ShouldNot(gomega.BeNil())
-
gomega.Expect(updatedNode.DataSegmentsBoundary["default"].Begin.Seconds).Should(gomega.Equal(int64(1000)))
-
gomega.Expect(updatedNode.DataSegmentsBoundary["default"].End.Seconds).Should(gomega.Equal(int64(2000)))
- })
})
diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go
index a96d7da8..43a43736 100644
--- a/banyand/queue/pub/client_test.go
+++ b/banyand/queue/pub/client_test.go
@@ -26,7 +26,6 @@ import (
"github.com/onsi/gomega/gleak"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
- "google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
@@ -147,71 +146,6 @@ var _ = ginkgo.Describe("publish clients
register/unregister", func() {
verifyClientsWithGomega(g, p, data.TopicStreamWrite, 1,
0, 2, 1)
}, flags.EventuallyTimeout).Should(gomega.Succeed())
})
-
- ginkgo.It("should update node when labels or data boundaries change",
func() {
- addr1 := getAddress()
- closeFn := setup(addr1, codes.OK, 200*time.Millisecond)
- defer closeFn()
- p := newPub()
- defer p.GracefulStop()
-
- // Replace hard-coded "service" with the Service constant
- group1 := svc
- now := time.Now()
- timeRange1 := &modelv1.TimeRange{
- Begin: timestamppb.New(now.Add(-3 * time.Hour)),
- End: timestamppb.New(now.Add(-2 * time.Hour)),
- }
-
- initialLabels := map[string]string{
- "role": "ingest",
- "zone": "east",
- }
- initialBoundaries := map[string]*modelv1.TimeRange{
- group1: timeRange1,
- }
-
- node1 := getDataNodeWithLabels("node1", addr1, initialLabels,
initialBoundaries)
- p.OnAddOrUpdate(node1)
- verifyClients(p, 1, 0, 1, 0)
-
- p.mu.RLock()
- registeredNode := p.registered["node1"]
-
gomega.Expect(registeredNode.Labels).Should(gomega.Equal(initialLabels))
-
gomega.Expect(registeredNode.DataSegmentsBoundary).Should(gomega.Equal(initialBoundaries))
- p.mu.RUnlock()
-
- updatedLabels := map[string]string{
- "role": "query",
- "zone": "east",
- "env": "prod",
- }
- updatedNode1 := getDataNodeWithLabels("node1", addr1,
updatedLabels, initialBoundaries)
- p.OnAddOrUpdate(updatedNode1)
-
- p.mu.RLock()
- registeredNode = p.registered["node1"]
-
gomega.Expect(registeredNode.Labels).Should(gomega.Equal(updatedLabels))
- gomega.Expect(len(p.active)).Should(gomega.Equal(1))
- p.mu.RUnlock()
-
- timeRange2 := &modelv1.TimeRange{
- Begin: timestamppb.New(now.Add(-1 * time.Hour)),
- End: timestamppb.New(now),
- }
- updatedBoundaries := map[string]*modelv1.TimeRange{
- group1: timeRange1,
- "inventory": timeRange2,
- }
- updatedNode2 := getDataNodeWithLabels("node1", addr1,
updatedLabels, updatedBoundaries)
- p.OnAddOrUpdate(updatedNode2)
-
- p.mu.RLock()
- registeredNode = p.registered["node1"]
-
gomega.Expect(registeredNode.DataSegmentsBoundary).Should(gomega.Equal(updatedBoundaries))
- gomega.Expect(len(p.active)).Should(gomega.Equal(1))
- p.mu.RUnlock()
- })
})
func verifyClients(p *pub, active, evict, onAdd, onDelete int) {
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index da5efca0..ae63065d 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -45,7 +45,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
- "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var (
@@ -129,7 +128,7 @@ func (p *pub) Broadcast(timeout time.Duration, topic
bus.Topic, messages bus.Mes
names[n.Metadata.GetName()] = struct{}{}
}
} else {
- for g, sel := range messages.NodeSelectors() {
+ for _, sel := range messages.NodeSelectors() {
var matches []MatchFunc
if sel == nil {
matches = bypassMatches
@@ -143,12 +142,8 @@ func (p *pub) Broadcast(timeout time.Duration, topic
bus.Topic, messages bus.Mes
}
}
for _, n := range nodes {
- tr := metadata.FindSegmentsBoundary(n, g)
- if tr == nil {
- continue
- }
for _, m := range matches {
- if m(n.Labels) &&
timestamp.PbHasOverlap(messages.TimeRange(), tr) {
+ if m(n.Labels) {
names[n.Metadata.Name] =
struct{}{}
break
}
diff --git a/banyand/queue/pub/pub_suite_test.go
b/banyand/queue/pub/pub_suite_test.go
index 8a1001bb..e6bb424a 100644
--- a/banyand/queue/pub/pub_suite_test.go
+++ b/banyand/queue/pub/pub_suite_test.go
@@ -237,10 +237,9 @@ func getDataNode(name string, address string)
schema.Metadata {
}
}
-func getDataNodeWithLabels(name string, address string, labels
map[string]string, dataBoundary map[string]*modelv1.TimeRange) schema.Metadata {
+func getDataNodeWithLabels(name string, address string, labels
map[string]string) schema.Metadata {
node := getDataNode(name, address)
nodeInfo := node.Spec.(*databasev1.Node)
nodeInfo.Labels = labels
- nodeInfo.DataSegmentsBoundary = dataBoundary
return node
}
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
index daaf2505..a8f4251b 100644
--- a/banyand/queue/pub/pub_test.go
+++ b/banyand/queue/pub/pub_test.go
@@ -311,21 +311,15 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
"role": "ingest",
"zone": "east",
}
- node1Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange,
- }
node2Labels := map[string]string{
"role": "query",
"zone": "west",
}
- node2Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange,
- }
- node1 := getDataNodeWithLabels("node1", addr1,
node1Labels, node1Boundaries)
+ node1 := getDataNodeWithLabels("node1", addr1,
node1Labels)
p.OnAddOrUpdate(node1)
- node2 := getDataNodeWithLabels("node2", addr2,
node2Labels, node2Boundaries)
+ node2 := getDataNodeWithLabels("node2", addr2,
node2Labels)
p.OnAddOrUpdate(node2)
nodeSelectors := map[string][]string{
@@ -368,21 +362,15 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
"role": "ingest",
"zone": "east",
}
- node1Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange,
- }
node2Labels := map[string]string{
"role": "query",
"zone": "west",
}
- node2Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange,
- }
- node1 := getDataNodeWithLabels("node1", addr1,
node1Labels, node1Boundaries)
+ node1 := getDataNodeWithLabels("node1", addr1,
node1Labels)
p.OnAddOrUpdate(node1)
- node2 := getDataNodeWithLabels("node2", addr2,
node2Labels, node2Boundaries)
+ node2 := getDataNodeWithLabels("node2", addr2,
node2Labels)
p.OnAddOrUpdate(node2)
nodeSelectors := map[string][]string{
@@ -399,174 +387,5 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(messages).Should(gomega.HaveLen(1))
})
-
- ginkgo.It("should broadcast messages based on time range
overlap", func() {
- addr1 := getAddress()
- addr2 := getAddress()
- addr3 := getAddress()
- closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
- closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond)
- closeFn3 := setup(addr3, codes.OK, 10*time.Millisecond)
- p := newPub()
- defer func() {
- p.GracefulStop()
- closeFn1()
- closeFn2()
- closeFn3()
- }()
-
- group1 := svc
-
- now := time.Now()
- timeRange1 := &modelv1.TimeRange{
- Begin: timestamppb.New(now.Add(-3 * time.Hour)),
- End: timestamppb.New(now.Add(-2 * time.Hour)),
- }
-
- timeRange2 := &modelv1.TimeRange{
- Begin: timestamppb.New(now.Add(-2 * time.Hour)),
- End: timestamppb.New(now.Add(-1 * time.Hour)),
- }
-
- timeRange3 := &modelv1.TimeRange{
- Begin: timestamppb.New(now.Add(-1 * time.Hour)),
- End: timestamppb.New(now),
- }
-
- node1Labels := map[string]string{"zone": "east"}
- node1Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange1,
- }
-
- node2Labels := map[string]string{"zone": "east"}
- node2Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange2,
- }
-
- node3Labels := map[string]string{"zone": "east"}
- node3Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange3,
- }
-
- node1 := getDataNodeWithLabels("node1", addr1,
node1Labels, node1Boundaries)
- p.OnAddOrUpdate(node1)
- node2 := getDataNodeWithLabels("node2", addr2,
node2Labels, node2Boundaries)
- p.OnAddOrUpdate(node2)
- node3 := getDataNodeWithLabels("node3", addr3,
node3Labels, node3Boundaries)
- p.OnAddOrUpdate(node3)
-
- queryTimeRange := &modelv1.TimeRange{
- Begin: timestamppb.New(now.Add(-2 *
time.Hour).Add(-30 * time.Minute)),
- End: timestamppb.New(now.Add(-1 *
time.Hour).Add(-30 * time.Minute)),
- }
-
- nodeSelectors := map[string][]string{
- group1: {""},
- }
-
- ff, err := p.Broadcast(3*time.Second,
data.TopicStreamQuery,
-
bus.NewMessageWithNodeSelectors(bus.MessageID(1), nodeSelectors,
queryTimeRange, &streamv1.QueryRequest{}))
-
- gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
- gomega.Expect(ff).Should(gomega.HaveLen(2))
-
- for _, f := range ff {
- messages, err := f.GetAll()
-
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-
gomega.Expect(messages).Should(gomega.HaveLen(1))
- }
- })
-
- ginkgo.It("should broadcast messages with both label selector
and time range filter", func() {
- addr1 := getAddress()
- addr2 := getAddress()
- addr3 := getAddress()
- addr4 := getAddress()
- closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
- closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond)
- closeFn3 := setup(addr3, codes.OK, 10*time.Millisecond)
- closeFn4 := setup(addr4, codes.OK, 10*time.Millisecond)
- p := newPub()
- defer func() {
- p.GracefulStop()
- closeFn1()
- closeFn2()
- closeFn3()
- closeFn4()
- }()
-
- group1 := svc
-
- now := time.Now()
- timeRange1 := &modelv1.TimeRange{
- Begin: timestamppb.New(now.Add(-2 * time.Hour)),
- End: timestamppb.New(now.Add(-1 * time.Hour)),
- }
-
- timeRange2 := &modelv1.TimeRange{
- Begin: timestamppb.New(now.Add(-1 * time.Hour)),
- End: timestamppb.New(now),
- }
-
- node1Labels := map[string]string{
- "env": "prod",
- "zone": "east",
- }
- node1Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange1,
- }
-
- node2Labels := map[string]string{
- "env": "prod",
- "zone": "west",
- }
- node2Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange1,
- }
-
- node3Labels := map[string]string{
- "env": "dev",
- "zone": "east",
- }
- node3Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange2,
- }
-
- node4Labels := map[string]string{
- "env": "prod",
- "zone": "east",
- }
- node4Boundaries := map[string]*modelv1.TimeRange{
- group1: timeRange2,
- }
-
- node1 := getDataNodeWithLabels("node1", addr1,
node1Labels, node1Boundaries)
- p.OnAddOrUpdate(node1)
- node2 := getDataNodeWithLabels("node2", addr2,
node2Labels, node2Boundaries)
- p.OnAddOrUpdate(node2)
- node3 := getDataNodeWithLabels("node3", addr3,
node3Labels, node3Boundaries)
- p.OnAddOrUpdate(node3)
- node4 := getDataNodeWithLabels("node4", addr4,
node4Labels, node4Boundaries)
- p.OnAddOrUpdate(node4)
-
- queryTimeRange := &modelv1.TimeRange{
- Begin: timestamppb.New(now.Add(-30 *
time.Minute)),
- End: timestamppb.New(now.Add(30 *
time.Minute)),
- }
-
- nodeSelectors := map[string][]string{
- group1: {"env=prod"},
- }
-
- ff, err := p.Broadcast(3*time.Second,
data.TopicStreamQuery,
-
bus.NewMessageWithNodeSelectors(bus.MessageID(1), nodeSelectors,
queryTimeRange, &streamv1.QueryRequest{}))
-
- gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
- gomega.Expect(ff).Should(gomega.HaveLen(1))
-
- messages, err := ff[0].GetAll()
- gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
- gomega.Expect(messages).Should(gomega.HaveLen(1))
- })
})
})
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 55624e6f..82b28495 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -331,7 +331,6 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
SeriesIndexFlushTimeoutSeconds:
s.option.flushTimeout.Nanoseconds() / int64(time.Second),
SeriesIndexCacheMaxBytes:
int(s.option.seriesCacheMaxSize),
StorageMetricsFactory:
s.omr.With(storageScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(),
p.DBLabelValues()))),
- SegmentBoundaryUpdateFn:
s.metadata.UpdateSegmentsBoundary,
SegmentIdleTimeout: segmentIdleTimeout,
}
return storage.OpenTSDB(
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 9fd262ad..ed3ec970 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -36,6 +36,13 @@
- [Tag](#banyandb-common-v1-Tag)
- [Trace](#banyandb-common-v1-Trace)
+- [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto)
+ - [Node](#banyandb-database-v1-Node)
+ - [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry)
+ - [Shard](#banyandb-database-v1-Shard)
+
+ - [Role](#banyandb-database-v1-Role)
+
- [banyandb/model/v1/common.proto](#banyandb_model_v1_common-proto)
- [FieldValue](#banyandb-model-v1-FieldValue)
- [Float](#banyandb-model-v1-Float)
@@ -65,14 +72,6 @@
-
[LogicalExpression.LogicalOp](#banyandb-model-v1-LogicalExpression-LogicalOp)
- [Sort](#banyandb-model-v1-Sort)
-- [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto)
- - [Node](#banyandb-database-v1-Node)
- -
[Node.DataSegmentsBoundaryEntry](#banyandb-database-v1-Node-DataSegmentsBoundaryEntry)
- - [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry)
- - [Shard](#banyandb-database-v1-Shard)
-
- - [Role](#banyandb-database-v1-Role)
-
- [banyandb/database/v1/schema.proto](#banyandb_database_v1_schema-proto)
- [Entity](#banyandb-database-v1-Entity)
- [FieldSpec](#banyandb-database-v1-FieldSpec)
@@ -663,6 +662,93 @@ Trace is the top level message of a trace.
+<a name="banyandb_database_v1_database-proto"></a>
+<p align="right"><a href="#top">Top</a></p>
+
+## banyandb/database/v1/database.proto
+
+
+
+<a name="banyandb-database-v1-Node"></a>
+
+### Node
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
|
+| roles | [Role](#banyandb-database-v1-Role) | repeated | |
+| grpc_address | [string](#string) | | |
+| http_address | [string](#string) | | |
+| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | |
+| labels | [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry) |
repeated | labels is a set of key-value pairs to describe the node. |
+
+
+
+
+
+
+<a name="banyandb-database-v1-Node-LabelsEntry"></a>
+
+### Node.LabelsEntry
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| key | [string](#string) | | |
+| value | [string](#string) | | |
+
+
+
+
+
+
+<a name="banyandb-database-v1-Shard"></a>
+
+### Shard
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| id | [uint64](#uint64) | | |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
|
+| catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) | | |
+| node | [string](#string) | | |
+| total | [uint32](#uint32) | | |
+| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | |
+| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | |
+
+
+
+
+
+
+
+
+<a name="banyandb-database-v1-Role"></a>
+
+### Role
+
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| ROLE_UNSPECIFIED | 0 | |
+| ROLE_META | 1 | |
+| ROLE_DATA | 2 | |
+| ROLE_LIAISON | 3 | |
+
+
+
+
+
+
+
+
+
+
<a name="banyandb_model_v1_common-proto"></a>
<p align="right"><a href="#top">Top</a></p>
@@ -1076,110 +1162,6 @@ Each item in a string array is seen as a token instead
of a query expression.
-<a name="banyandb_database_v1_database-proto"></a>
-<p align="right"><a href="#top">Top</a></p>
-
-## banyandb/database/v1/database.proto
-
-
-
-<a name="banyandb-database-v1-Node"></a>
-
-### Node
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
|
-| roles | [Role](#banyandb-database-v1-Role) | repeated | |
-| grpc_address | [string](#string) | | |
-| http_address | [string](#string) | | |
-| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | |
-| labels | [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry) |
repeated | labels is a set of key-value pairs to describe the node. |
-| data_segments_boundary |
[Node.DataSegmentsBoundaryEntry](#banyandb-database-v1-Node-DataSegmentsBoundaryEntry)
| repeated | data_segments_boundary is the time range of the data segments
that the node is responsible for. [start, end) is the time range. |
-
-
-
-
-
-
-<a name="banyandb-database-v1-Node-DataSegmentsBoundaryEntry"></a>
-
-### Node.DataSegmentsBoundaryEntry
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| key | [string](#string) | | |
-| value | [banyandb.model.v1.TimeRange](#banyandb-model-v1-TimeRange) | | |
-
-
-
-
-
-
-<a name="banyandb-database-v1-Node-LabelsEntry"></a>
-
-### Node.LabelsEntry
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| key | [string](#string) | | |
-| value | [string](#string) | | |
-
-
-
-
-
-
-<a name="banyandb-database-v1-Shard"></a>
-
-### Shard
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| id | [uint64](#uint64) | | |
-| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
|
-| catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) | | |
-| node | [string](#string) | | |
-| total | [uint32](#uint32) | | |
-| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | |
-| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | |
-
-
-
-
-
-
-
-
-<a name="banyandb-database-v1-Role"></a>
-
-### Role
-
-
-| Name | Number | Description |
-| ---- | ------ | ----------- |
-| ROLE_UNSPECIFIED | 0 | |
-| ROLE_META | 1 | |
-| ROLE_DATA | 2 | |
-| ROLE_LIAISON | 3 | |
-
-
-
-
-
-
-
-
-
-
<a name="banyandb_database_v1_schema-proto"></a>
<p align="right"><a href="#top">Top</a></p>
diff --git a/pkg/test/helpers/context.go b/pkg/test/helpers/context.go
index 82854237..edd816e5 100644
--- a/pkg/test/helpers/context.go
+++ b/pkg/test/helpers/context.go
@@ -38,15 +38,17 @@ type SharedContext struct {
// Args is a wrapper seals all necessary info for table specs.
type Args struct {
- Begin *timestamppb.Timestamp
- End *timestamppb.Timestamp
- Input string
- Want string
- Offset time.Duration
- Duration time.Duration
- WantEmpty bool
- WantErr bool
- DisOrder bool
+ Begin *timestamppb.Timestamp
+ End *timestamppb.Timestamp
+ Input string
+ Want string
+ Stages []string
+ Offset time.Duration
+ Duration time.Duration
+ WantEmpty bool
+ WantErr bool
+ DisOrder bool
+ IgnoreElementID bool
}
// UnmarshalYAML decodes YAML raw bytes to proto.Message.
@@ -80,9 +82,11 @@ type BackupSharedContext struct {
// LifecycleSharedContext is the context shared between test cases in the
lifecycle testing.
type LifecycleSharedContext struct {
- DataAddr string
- Connection *grpclib.ClientConn
- SrcDir string
- DestDir string
- EtcdAddr string
+ BaseTime time.Time
+ Connection *grpclib.ClientConn
+ LiaisonAddr string
+ DataAddr string
+ SrcDir string
+ DestDir string
+ EtcdAddr string
}
diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go
index 4838c2eb..54910694 100644
--- a/pkg/test/measure/etcd.go
+++ b/pkg/test/measure/etcd.go
@@ -38,6 +38,7 @@ const (
indexRuleDir = "testdata/index_rules"
indexRuleBindingDir = "testdata/index_rule_bindings"
topNAggregationDir = "testdata/topn_aggregations"
+ groupStagesDir = "testdata/group_stages" // new directory
)
//go:embed testdata/*
@@ -45,31 +46,52 @@ var store embed.FS
// PreloadSchema loads schemas from files in the booting process.
func PreloadSchema(ctx context.Context, e schema.Registry) error {
- if err := loadSchema(groupDir, &commonv1.Group{}, func(group
*commonv1.Group) error {
- return e.CreateGroup(ctx, group)
- }); err != nil {
- return errors.WithStack(err)
- }
- if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure
*databasev1.Measure) error {
- _, innerErr := e.CreateMeasure(ctx, measure)
- return innerErr
- }); err != nil {
- return errors.WithStack(err)
- }
- if err := loadSchema(indexRuleDir, &databasev1.IndexRule{},
func(indexRule *databasev1.IndexRule) error {
- return e.CreateIndexRule(ctx, indexRule)
- }); err != nil {
- return errors.WithStack(err)
- }
- if err := loadSchema(indexRuleBindingDir,
&databasev1.IndexRuleBinding{}, func(indexRuleBinding
*databasev1.IndexRuleBinding) error {
- return e.CreateIndexRuleBinding(ctx, indexRuleBinding)
- }); err != nil {
- return errors.WithStack(err)
- }
- if err := loadSchema(topNAggregationDir, &databasev1.TopNAggregation{},
func(topN *databasev1.TopNAggregation) error {
- return e.CreateTopNAggregation(ctx, topN)
- }); err != nil {
- return errors.WithStack(err)
+ return loadAllSchemas(ctx, e, groupDir)
+}
+
+// LoadSchemaWithStages loads group schemas from the groupStagesDir.
+func LoadSchemaWithStages(ctx context.Context, e schema.Registry) error {
+ return loadAllSchemas(ctx, e, groupStagesDir)
+}
+
+// loadAllSchemas is the common logic to load schemas from a given group
directory.
+func loadAllSchemas(ctx context.Context, e schema.Registry, groupDirectory
string) error {
+ return preloadSchemaWithFuncs(ctx, e,
+ func(ctx context.Context, e schema.Registry) error {
+ return loadSchema(groupDirectory, &commonv1.Group{},
func(group *commonv1.Group) error {
+ return e.CreateGroup(ctx, group)
+ })
+ },
+ func(ctx context.Context, e schema.Registry) error {
+ return loadSchema(measureDir, &databasev1.Measure{},
func(measure *databasev1.Measure) error {
+ _, innerErr := e.CreateMeasure(ctx, measure)
+ return innerErr
+ })
+ },
+ func(ctx context.Context, e schema.Registry) error {
+ return loadSchema(indexRuleDir,
&databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error {
+ return e.CreateIndexRule(ctx, indexRule)
+ })
+ },
+ func(ctx context.Context, e schema.Registry) error {
+ return loadSchema(indexRuleBindingDir,
&databasev1.IndexRuleBinding{}, func(indexRuleBinding
*databasev1.IndexRuleBinding) error {
+ return e.CreateIndexRuleBinding(ctx,
indexRuleBinding)
+ })
+ },
+ func(ctx context.Context, e schema.Registry) error {
+ return loadSchema(topNAggregationDir,
&databasev1.TopNAggregation{}, func(topN *databasev1.TopNAggregation) error {
+ return e.CreateTopNAggregation(ctx, topN)
+ })
+ },
+ )
+}
+
+// preloadSchemaWithFuncs extracts the common logic for loading schemas.
+func preloadSchemaWithFuncs(ctx context.Context, e schema.Registry, loaders
...func(context.Context, schema.Registry) error) error {
+ for _, loader := range loaders {
+ if err := loader(ctx, e); err != nil {
+ return errors.WithStack(err)
+ }
}
return nil
}
diff --git a/pkg/test/measure/testdata/group_stages/exception.json
b/pkg/test/measure/testdata/group_stages/exception.json
new file mode 100644
index 00000000..fadbd5a5
--- /dev/null
+++ b/pkg/test/measure/testdata/group_stages/exception.json
@@ -0,0 +1,18 @@
+{
+ "metadata": {
+ "name": "exception"
+ },
+ "catalog": "CATALOG_MEASURE",
+ "resource_opts": {
+ "shard_num": 2,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ }
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/group_stages/index_mode.json
b/pkg/test/measure/testdata/group_stages/index_mode.json
new file mode 100644
index 00000000..fb0d9b4b
--- /dev/null
+++ b/pkg/test/measure/testdata/group_stages/index_mode.json
@@ -0,0 +1,18 @@
+{
+ "metadata": {
+ "name": "index_mode"
+ },
+ "catalog": "CATALOG_MEASURE",
+ "resource_opts": {
+ "shard_num": 2,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ }
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/groups/sw_metric.json
b/pkg/test/measure/testdata/group_stages/sw_metric.json
similarity index 100%
copy from pkg/test/measure/testdata/groups/sw_metric.json
copy to pkg/test/measure/testdata/group_stages/sw_metric.json
diff --git a/pkg/test/measure/testdata/groups/sw_metric.json
b/pkg/test/measure/testdata/groups/sw_metric.json
index 703ae9f5..a91becb7 100644
--- a/pkg/test/measure/testdata/groups/sw_metric.json
+++ b/pkg/test/measure/testdata/groups/sw_metric.json
@@ -12,22 +12,7 @@
"ttl": {
"unit": "UNIT_DAY",
"num": 7
- },
- "stages": [
- {
- "name": "warm",
- "shard_num": 1,
- "segment_interval": {
- "unit": "UNIT_DAY",
- "num": 3
- },
- "ttl": {
- "unit": "UNIT_DAY",
- "num": 30
- },
- "node_selector": "type=warm"
- }
- ]
+ }
},
"updated_at": "2021-04-15T01:30:15.01Z"
}
\ No newline at end of file
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index 93dcf0fb..d48c3f4f 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -47,26 +47,12 @@ var (
streamStore embed.FS
//go:embed testdata/group.json
groupJSON string
+ //go:embed testdata/group_with_stages.json
+ groupWithStagesJSON string
)
-// PreloadSchema loads schemas from files in the booting process.
-func PreloadSchema(ctx context.Context, e schema.Registry) error {
- if e == nil {
- return nil
- }
- g := &commonv1.Group{}
- if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
- return err
- }
- _, err := e.GetGroup(ctx, g.Metadata.Name)
- if !errors.Is(err, schema.ErrGRPCResourceNotFound) {
- logger.Infof("group %s already exists", g.Metadata.Name)
- return nil
- }
- if innerErr := e.CreateGroup(ctx, g); innerErr != nil {
- return innerErr
- }
-
+// loadSchemas loads streams, index rules, and index rule bindings.
+func loadSchemas(ctx context.Context, e schema.Registry) error {
streams, err := streamStore.ReadDir(streamDir)
if err != nil {
return err
@@ -126,3 +112,46 @@ func PreloadSchema(ctx context.Context, e schema.Registry)
error {
return nil
}
+
+// LoadSchemaWithStages loads schemas from files, including group stages.
+func LoadSchemaWithStages(ctx context.Context, e schema.Registry) error {
+ if e == nil {
+ return nil
+ }
+ g := &commonv1.Group{}
+ if err := protojson.Unmarshal([]byte(groupWithStagesJSON), g); err !=
nil {
+ return err
+ }
+ _, err := e.GetGroup(ctx, g.Metadata.Name)
+ if !errors.Is(err, schema.ErrGRPCResourceNotFound) {
+ logger.Infof("group %s already exists", g.Metadata.Name)
+ return nil
+ }
+ if innerErr := e.CreateGroup(ctx, g); innerErr != nil {
+ return innerErr
+ }
+
+ return loadSchemas(ctx, e)
+}
+
+// PreloadSchema loads schemas from files in the booting process.
+// This version loads group without stages.
+func PreloadSchema(ctx context.Context, e schema.Registry) error {
+ if e == nil {
+ return nil
+ }
+ g := &commonv1.Group{}
+ if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
+ return err
+ }
+ _, err := e.GetGroup(ctx, g.Metadata.Name)
+ if !errors.Is(err, schema.ErrGRPCResourceNotFound) {
+ logger.Infof("group %s already exists", g.Metadata.Name)
+ return nil
+ }
+ if innerErr := e.CreateGroup(ctx, g); innerErr != nil {
+ return innerErr
+ }
+
+ return loadSchemas(ctx, e)
+}
diff --git a/pkg/test/stream/testdata/group.json
b/pkg/test/stream/testdata/group.json
index f38a2436..b9e8dc80 100644
--- a/pkg/test/stream/testdata/group.json
+++ b/pkg/test/stream/testdata/group.json
@@ -12,22 +12,7 @@
"ttl": {
"unit": "UNIT_DAY",
"num": 3
- },
- "stages": [
- {
- "name": "warm",
- "shard_num": 1,
- "segment_interval": {
- "unit": "UNIT_DAY",
- "num": 3
- },
- "ttl": {
- "unit": "UNIT_DAY",
- "num": 7
- },
- "node_selector": "type=warm"
- }
- ]
+ }
},
"updated_at": "2021-04-15T01:30:15.01Z"
}
\ No newline at end of file
diff --git a/pkg/test/stream/testdata/group.json
b/pkg/test/stream/testdata/group_with_stages.json
similarity index 100%
copy from pkg/test/stream/testdata/group.json
copy to pkg/test/stream/testdata/group_with_stages.json
diff --git a/test/cases/lifecycle/lifecycle.go
b/test/cases/lifecycle/lifecycle.go
index 55f68ee0..b1ebbfc3 100644
--- a/test/cases/lifecycle/lifecycle.go
+++ b/test/cases/lifecycle/lifecycle.go
@@ -22,12 +22,20 @@ import (
"io/fs"
"os"
"path/filepath"
+ "time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
"github.com/apache/skywalking-banyandb/banyand/backup/lifecycle"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ measureTestData
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+ streamTestData
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
+ topNTestData
"github.com/apache/skywalking-banyandb/test/cases/topn/data"
)
// SharedContext is the shared context for the snapshot test cases.
@@ -49,12 +57,87 @@ var _ = ginkgo.Describe("Lifecycle", func() {
})
err = lifecycleCmd.Execute()
gomega.Expect(err).NotTo(gomega.HaveOccurred())
- verifySourceDirectoriesBeforeMigration()
+ verifySourceDirectoriesAfterMigration()
verifyDestinationDirectoriesAfterMigration()
+ conn, err := grpchelper.Conn(SharedContext.LiaisonAddr,
10*time.Second,
+
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ defer func() {
+ if conn != nil {
+ _ = conn.Close()
+ }
+ }()
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ sc := helpers.SharedContext{
+ Connection: conn,
+ BaseTime: SharedContext.BaseTime,
+ }
+ // Verify measure data lifecycle stages
+ verifyLifecycleStages(sc, measureTestData.VerifyFn,
helpers.Args{
+ Input: "all",
+ Duration: 25 * time.Minute,
+ Offset: -20 * time.Minute,
+ })
+
+ // Verify stream data lifecycle stages
+ verifyLifecycleStages(sc, streamTestData.VerifyFn, helpers.Args{
+ Input: "all",
+ Duration: time.Hour,
+ IgnoreElementID: true,
+ })
+
+ // Verify topN data lifecycle stages
+ verifyLifecycleStages(sc, topNTestData.VerifyFn, helpers.Args{
+ Input: "aggr_desc",
+ Duration: 25 * time.Minute,
+ Offset: -20 * time.Minute,
+ })
})
})
-func verifySourceDirectoriesBeforeMigration() {
+func verifyLifecycleStages(sc helpers.SharedContext, verifyFn
func(gomega.Gomega, helpers.SharedContext, helpers.Args), args helpers.Args) {
+ // Initial verification expecting error before migration
+ verifyFn(gomega.Default, sc, helpers.Args{
+ Input: args.Input,
+ Duration: args.Duration,
+ Offset: args.Offset,
+ WantErr: true,
+ Stages: args.Stages,
+ })
+
+ // Verify hot+warm stages exist after migration
+ gomega.Eventually(func(innerGm gomega.Gomega) {
+ verifyFn(innerGm, sc, helpers.Args{
+ Input: args.Input,
+ Duration: args.Duration,
+ Offset: args.Offset,
+ Stages: []string{"hot", "warm"},
+ IgnoreElementID: args.IgnoreElementID,
+ })
+ }, flags.EventuallyTimeout).Should(gomega.Succeed())
+
+ // Verify warm stage only after retention
+ gomega.Eventually(func(innerGm gomega.Gomega) {
+ verifyFn(innerGm, sc, helpers.Args{
+ Input: args.Input,
+ Duration: args.Duration,
+ Offset: args.Offset,
+ Stages: []string{"warm"},
+ IgnoreElementID: args.IgnoreElementID,
+ })
+ }, flags.EventuallyTimeout).Should(gomega.Succeed())
+
+ // Verify hot stage is empty after retention
+ verifyFn(gomega.Default, sc, helpers.Args{
+ Input: args.Input,
+ Duration: args.Duration,
+ Offset: args.Offset,
+ WantEmpty: true,
+ Stages: []string{"hot"},
+ IgnoreElementID: args.IgnoreElementID,
+ })
+}
+
+func verifySourceDirectoriesAfterMigration() {
streamSrcPath := filepath.Join(SharedContext.SrcDir, "stream", "data",
"default")
streamEntries, err := os.ReadDir(streamSrcPath)
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Stream source
directory should exist")
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index ff17c45c..f4c78699 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -54,6 +54,7 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext
helpers.SharedContext, args
query := &measurev1.QueryRequest{}
helpers.UnmarshalYAML(i, query)
query.TimeRange = helpers.TimeRange(args, sharedContext)
+ query.Stages = args.Stages
c := measurev1.NewMeasureServiceClient(sharedContext.Connection)
ctx := context.Background()
resp, err := c.Query(ctx, query)
diff --git a/test/cases/stream/data/data.go b/test/cases/stream/data/data.go
index 6701e166..b3df58d4 100644
--- a/test/cases/stream/data/data.go
+++ b/test/cases/stream/data/data.go
@@ -65,6 +65,7 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext
helpers.SharedContext, args
query := &streamv1.QueryRequest{}
helpers.UnmarshalYAML(i, query)
query.TimeRange = helpers.TimeRange(args, sharedContext)
+ query.Stages = args.Stages
c := streamv1.NewStreamServiceClient(sharedContext.Connection)
ctx := context.Background()
resp, err := c.Query(ctx, query)
@@ -86,8 +87,10 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext
helpers.SharedContext, args
innerGm.Expect(err).NotTo(gm.HaveOccurred())
want := &streamv1.QueryResponse{}
helpers.UnmarshalYAML(ww, want)
- for i := range want.Elements {
- want.Elements[i].ElementId =
hex.EncodeToString(convert.Uint64ToBytes(convert.HashStr(query.Name + "|" +
want.Elements[i].ElementId)))
+ if !args.IgnoreElementID {
+ for i := range want.Elements {
+ want.Elements[i].ElementId =
hex.EncodeToString(convert.Uint64ToBytes(convert.HashStr(query.Name + "|" +
want.Elements[i].ElementId)))
+ }
}
if args.DisOrder {
slices.SortFunc(want.Elements, func(a, b *streamv1.Element) int
{
@@ -97,10 +100,15 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext
helpers.SharedContext, args
return strings.Compare(a.ElementId, b.ElementId)
})
}
- success := innerGm.Expect(cmp.Equal(resp, want,
- protocmp.IgnoreUnknown(),
+ var extra []cmp.Option
+ extra = append(extra, protocmp.IgnoreUnknown(),
protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"),
- protocmp.Transform())).
+ protocmp.Transform())
+ if args.IgnoreElementID {
+ extra = append(extra,
protocmp.IgnoreFields(&streamv1.Element{}, "element_id"))
+ }
+ success := innerGm.Expect(cmp.Equal(resp, want,
+ extra...)).
To(gm.BeTrue(), func() string {
var j []byte
j, err = protojson.Marshal(resp)
diff --git a/test/cases/topn/data/data.go b/test/cases/topn/data/data.go
index 983be9c4..67a1bbd3 100644
--- a/test/cases/topn/data/data.go
+++ b/test/cases/topn/data/data.go
@@ -46,6 +46,7 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext
helpers.SharedContext, args
query := &measurev1.TopNRequest{}
helpers.UnmarshalYAML(i, query)
query.TimeRange = helpers.TimeRange(args, sharedContext)
+ query.Stages = args.Stages
c := measurev1.NewMeasureServiceClient(sharedContext.Connection)
ctx := context.Background()
resp, err := c.TopN(ctx, query)
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 122c5d1b..103c9a00 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -26,6 +26,6 @@ SW_ROVER_COMMIT=4c0cb8429a96f190ea30eac1807008d523c749c3
SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3
SW_PREDICTOR_COMMIT=54a0197654a3781a6f73ce35146c712af297c994
-SW_OAP_COMMIT=c63fe21e75c7b898564add0035c220d9b242f7d2
+SW_OAP_COMMIT=03b03518616a91129cfced2972621df3f0bac3db
SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=d3f8fe894d1a206164b73f5b523d2eb62d9e9965
SW_CTL_COMMIT=67cbc89dd7b214d5791321a7ca992f940cb586ba
\ No newline at end of file
diff --git a/test/integration/distributed/lifecycle/lifecycle_suite_test.go
b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
index 0b5059fc..4fc99cbd 100644
--- a/test/integration/distributed/lifecycle/lifecycle_suite_test.go
+++ b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
@@ -51,13 +51,15 @@ func TestLifecycle(t *testing.T) {
}
var (
- connection *grpc.ClientConn
- srcDir string
- destDir string
- deferFunc func()
- goods []gleak.Goroutine
- dataAddr string
- ep string
+ connection *grpc.ClientConn
+ srcDir string
+ destDir string
+ deferFunc func()
+ goods []gleak.Goroutine
+ dataAddr string
+ ep string
+ tenDaysBeforeNow time.Time
+ liaisonAddr string
)
var _ = SynchronizedBeforeSuite(func() []byte {
@@ -86,8 +88,8 @@ var _ = SynchronizedBeforeSuite(func() []byte {
Expect(err).NotTo(HaveOccurred())
defer schemaRegistry.Close()
ctx := context.Background()
- test_stream.PreloadSchema(ctx, schemaRegistry)
- test_measure.PreloadSchema(ctx, schemaRegistry)
+ test_stream.LoadSchemaWithStages(ctx, schemaRegistry)
+ test_measure.LoadSchemaWithStages(ctx, schemaRegistry)
By("Starting hot data node")
var closeDataNode0 func()
dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep,
"--node-labels", "type=hot", "--measure-flush-timeout", "0s",
"--stream-flush-timeout", "0s")
@@ -95,11 +97,12 @@ var _ = SynchronizedBeforeSuite(func() []byte {
var closeDataNode1 func()
_, destDir, closeDataNode1 = setup.DataNodeWithAddrAndDir(ep,
"--node-labels", "type=warm", "--measure-flush-timeout", "0s",
"--stream-flush-timeout", "0s")
By("Starting liaison node")
- liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep,
"--data-node-selector", "type=hot")
+ var closerLiaisonNode func()
+ liaisonAddr, closerLiaisonNode = setup.LiaisonNode(ep,
"--data-node-selector", "type=hot")
By("Initializing test cases with 10 days before")
ns := timestamp.NowMilli().UnixNano()
now := time.Unix(0, ns-ns%int64(time.Minute))
- tenDaysBeforeNow := now.Add(-10 * 24 * time.Hour)
+ tenDaysBeforeNow = now.Add(-10 * 24 * time.Hour)
test_cases.Initialize(liaisonAddr, tenDaysBeforeNow)
deferFunc = func() {
closerLiaisonNode()
@@ -116,11 +119,13 @@ var _ = SynchronizedBeforeSuite(func() []byte {
grpc.WithTransportCredentials(insecure.NewCredentials()))
Expect(err).NotTo(HaveOccurred())
caseslifecycle.SharedContext = helpers.LifecycleSharedContext{
- DataAddr: dataAddr,
- Connection: connection,
- SrcDir: srcDir,
- DestDir: destDir,
- EtcdAddr: ep,
+ LiaisonAddr: liaisonAddr,
+ DataAddr: dataAddr,
+ Connection: connection,
+ SrcDir: srcDir,
+ DestDir: destDir,
+ EtcdAddr: ep,
+ BaseTime: tenDaysBeforeNow,
}
})