This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch lifecycle in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a121337cca2eedc6b171f551dd0caa752a748d6f Author: Gao Hongtao <[email protected]> AuthorDate: Tue Apr 29 01:29:18 2025 +0000 Add query test cases of lifecycle Signed-off-by: Gao Hongtao <[email protected]> --- api/proto/banyandb/database/v1/database.proto | 4 - banyand/backup/lifecycle/steps.go | 1 + banyand/dquery/dquery.go | 11 +- banyand/dquery/measure.go | 4 + banyand/dquery/stream.go | 4 + banyand/dquery/topn.go | 4 + banyand/internal/storage/rotation_test.go | 89 +-------- banyand/internal/storage/segment.go | 76 +++----- banyand/internal/storage/segment_test.go | 4 - banyand/internal/storage/tsdb.go | 7 +- banyand/measure/metadata.go | 1 - banyand/metadata/client.go | 49 ----- banyand/metadata/metadata.go | 2 - banyand/metadata/schema/register_test.go | 25 --- banyand/queue/pub/client_test.go | 66 ------- banyand/queue/pub/pub.go | 9 +- banyand/queue/pub/pub_suite_test.go | 3 +- banyand/queue/pub/pub_test.go | 189 +------------------ banyand/stream/metadata.go | 1 - docs/api-reference.md | 206 ++++++++++----------- pkg/test/helpers/context.go | 32 ++-- pkg/test/measure/etcd.go | 72 ++++--- .../measure/testdata/group_stages/exception.json | 18 ++ .../measure/testdata/group_stages/index_mode.json | 18 ++ .../{groups => group_stages}/sw_metric.json | 0 pkg/test/measure/testdata/groups/sw_metric.json | 17 +- pkg/test/stream/etcd.go | 65 +++++-- pkg/test/stream/testdata/group.json | 17 +- .../{group.json => group_with_stages.json} | 0 test/cases/lifecycle/lifecycle.go | 87 ++++++++- test/cases/measure/data/data.go | 1 + test/cases/stream/data/data.go | 18 +- test/cases/topn/data/data.go | 1 + test/e2e-v2/script/env | 2 +- .../distributed/lifecycle/lifecycle_suite_test.go | 37 ++-- 35 files changed, 422 insertions(+), 718 deletions(-) diff --git a/api/proto/banyandb/database/v1/database.proto b/api/proto/banyandb/database/v1/database.proto index a5caddba..2b4de339 100644 --- a/api/proto/banyandb/database/v1/database.proto +++ b/api/proto/banyandb/database/v1/database.proto @@ -20,7 +20,6 @@ syntax = "proto3"; package banyandb.database.v1; import "banyandb/common/v1/common.proto"; -import "banyandb/model/v1/query.proto"; import "google/protobuf/timestamp.proto"; option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"; @@ -41,9 +40,6 @@ message Node { google.protobuf.Timestamp created_at = 5; // labels is a set of key-value pairs to describe the node. map<string, string> labels = 6; - // data_segments_boundary is the time range of the data segments that the node is responsible for. - // [start, end) is the time range. - map<string, model.v1.TimeRange> data_segments_boundary = 7; } message Shard { diff --git a/banyand/backup/lifecycle/steps.go b/banyand/backup/lifecycle/steps.go index 7816b881..620be9e9 100644 --- a/banyand/backup/lifecycle/steps.go +++ b/banyand/backup/lifecycle/steps.go @@ -249,6 +249,7 @@ func migrateMeasure(ctx context.Context, m *databasev1.Measure, result model.Mea Metadata: m.Metadata, DataPoint: &measurev1.DataPointValue{ Timestamp: timestamppb.New(time.Unix(0, mr.Timestamps[i])), + Version: mr.Versions[i], }, MessageId: uint64(time.Now().UnixNano()), } diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go index 653aa811..74cedafe 100644 --- a/banyand/dquery/dquery.go +++ b/banyand/dquery/dquery.go @@ -151,8 +151,8 @@ func (q *queryService) parseNodeSelector(stages []string, resource *commonv1.Res } var nodeSelectors []string - for _, stage := range resource.Stages { - for _, sn := range stages { + for _, sn := range stages { + for _, stage := range resource.Stages { if strings.EqualFold(sn, stage.Name) { ns := stage.NodeSelector ns = strings.TrimSpace(ns) @@ -162,10 +162,9 @@ func (q *queryService) parseNodeSelector(stages []string, resource *commonv1.Res nodeSelectors = append(nodeSelectors, ns) break } - if strings.EqualFold(sn, hotStageName) && q.hotStageNodeSelector != "" { - nodeSelectors = append(nodeSelectors, q.hotStageNodeSelector) - break - } + } + if strings.EqualFold(sn, hotStageName) && q.hotStageNodeSelector != "" { + nodeSelectors = append(nodeSelectors, q.hotStageNodeSelector) } } if len(nodeSelectors) == 0 { diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go index 56548d7a..1b1a62e0 100644 --- a/banyand/dquery/measure.go +++ b/banyand/dquery/measure.go @@ -88,6 +88,10 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (r if gs, ok := p.measureService.LoadGroup(g); ok { if ns, exist := p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist { nodeSelectors[g] = ns + } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 { + ml.Error().Strs("req_stages", queryCriteria.Stages).Strs("default_stages", gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found") + resp = bus.NewMessage(bus.MessageID(now), common.NewError("no stage found in request or default stages in resource opts")) + return } } else { ml.Error().RawJSON("req", logger.Proto(queryCriteria)).Msg("group not found") diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go index 6cdd2ae7..184e3438 100644 --- a/banyand/dquery/stream.go +++ b/banyand/dquery/stream.go @@ -84,6 +84,10 @@ func (p *streamQueryProcessor) Rev(ctx context.Context, message bus.Message) (re if gs, ok := p.streamService.LoadGroup(g); ok { if ns, exist := p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist { nodeSelectors[g] = ns + } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 { + p.log.Error().Strs("req_stages", queryCriteria.Stages).Strs("default_stages", gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found") + resp = bus.NewMessage(bus.MessageID(now), common.NewError("no stage found in request or default stages in resource opts")) + return } } else { p.log.Error().RawJSON("req", logger.Proto(queryCriteria)).Msg("group not found") diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go index b7adf5c1..1bb3ad99 100644 --- a/banyand/dquery/topn.go +++ b/banyand/dquery/topn.go @@ -71,6 +71,10 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp if gs, ok := t.measureService.LoadGroup(g); ok { if ns, exist := t.parseNodeSelector(request.Stages, gs.GetSchema().ResourceOpts); exist { nodeSelectors[g] = ns + } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 { + t.log.Error().Strs("req_stages", request.Stages).Strs("default_stages", gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found") + resp = bus.NewMessage(now, common.NewError("no stage found in request or default stages in resource opts")) + return } } else { t.log.Error().Str("group", g).Msg("failed to load group") diff --git a/banyand/internal/storage/rotation_test.go b/banyand/internal/storage/rotation_test.go index 43f292ef..ccc9e271 100644 --- a/banyand/internal/storage/rotation_test.go +++ b/banyand/internal/storage/rotation_test.go @@ -19,7 +19,6 @@ package storage import ( "context" - "sync" "testing" "time" @@ -27,7 +26,6 @@ import ( "github.com/stretchr/testify/require" "github.com/apache/skywalking-banyandb/api/common" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -36,77 +34,9 @@ import ( "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -type boundaryUpdateTracker struct { - updates []*modelv1.TimeRange - called int - mu sync.Mutex -} - -func (t *boundaryUpdateTracker) recordUpdate(_, _ string, boundary *modelv1.TimeRange) { - t.mu.Lock() - defer t.mu.Unlock() - t.updates = append(t.updates, boundary) - t.called++ -} - -func TestSegmentBoundaryUpdateFn(t *testing.T) { - t.Run("called when a segment is created", func(t *testing.T) { - tsdb, c, _, tracker, dfFn := setUpDB(t) - defer dfFn() - - // Initial segment creation should have triggered it once - require.Equal(t, 1, tracker.called) - - // Create new segment - ts := c.Now().Add(23*time.Hour + time.Second) - tsdb.Tick(ts.UnixNano()) - - // Verify the function was called again - assert.Eventually(t, func() bool { - tracker.mu.Lock() - defer tracker.mu.Unlock() - return tracker.called >= 2 - }, flags.EventuallyTimeout, time.Millisecond, "boundary update function should be called") - }) - - t.Run("called when a segment is deleted", func(t *testing.T) { - tsdb, c, segCtrl, tracker, dfFn := setUpDB(t) - defer dfFn() - - ts := c.Now() - for i := 0; i < 4; i++ { - ts = ts.Add(23 * time.Hour) - c.Set(ts) - tsdb.Tick(ts.UnixNano()) - t.Logf("current time: %s", ts.Format(time.RFC3339)) - expected := i + 2 - require.EventuallyWithTf(t, func(ct *assert.CollectT) { - segments, _ := segCtrl.segments(false) - if len(segments) != expected { - ct.Errorf("expect %d segments, got %d", expected, len(segments)) - } - }, flags.EventuallyTimeout, time.Millisecond, "wait for %d segment to be created", expected) - ts = ts.Add(time.Hour) - } - - tracker.mu.Lock() - called := tracker.called - tracker.mu.Unlock() - - c.Set(ts) - tsdb.Tick(ts.UnixNano()) - - assert.Eventually(t, func() bool { - tracker.mu.Lock() - defer tracker.mu.Unlock() - return tracker.called > called - }, flags.EventuallyTimeout, time.Millisecond, "boundary update function should be called after deletion") - }) -} - func TestForwardRotation(t *testing.T) { t.Run("create a new segment when the time is up", func(t *testing.T) { - tsdb, c, segCtrl, _, dfFn := setUpDB(t) + tsdb, c, segCtrl, dfFn := setUpDB(t) defer dfFn() ts := c.Now().Add(23*time.Hour + time.Second) t.Logf("current time: %s", ts.Format(time.RFC3339)) @@ -118,7 +48,7 @@ func TestForwardRotation(t *testing.T) { }) t.Run("no new segment created when the time is not up", func(t *testing.T) { - tsdb, c, segCtrl, _, dfFn := setUpDB(t) + tsdb, c, segCtrl, dfFn := setUpDB(t) defer dfFn() ts := c.Now().Add(22*time.Hour + 59*time.Minute + 59*time.Second) t.Logf("current time: %s", ts.Format(time.RFC3339)) @@ -132,7 +62,7 @@ func TestForwardRotation(t *testing.T) { func TestRetention(t *testing.T) { t.Run("delete the segment and index when the TTL is up", func(t *testing.T) { - tsdb, c, segCtrl, _, dfFn := setUpDB(t) + tsdb, c, segCtrl, dfFn := setUpDB(t) defer dfFn() ts := c.Now() for i := 0; i < 4; i++ { @@ -161,7 +91,7 @@ func TestRetention(t *testing.T) { }) t.Run("keep the segment volume stable", func(t *testing.T) { - tsdb, c, segCtrl, _, dfFn := setUpDB(t) + tsdb, c, segCtrl, dfFn := setUpDB(t) defer dfFn() ts := c.Now() for i := 0; i < 10; i++ { @@ -207,22 +137,15 @@ func TestRetention(t *testing.T) { }) } -func setUpDB(t *testing.T) (*database[*MockTSTable, any], timestamp.MockClock, *segmentController[*MockTSTable, any], *boundaryUpdateTracker, func()) { +func setUpDB(t *testing.T) (*database[*MockTSTable, any], timestamp.MockClock, *segmentController[*MockTSTable, any], func()) { dir, defFn := test.Space(require.New(t)) - // Create tracker for boundary updates - tracker := &boundaryUpdateTracker{ - updates: make([]*modelv1.TimeRange, 0), - } - TSDBOpts := TSDBOpts[*MockTSTable, any]{ Location: dir, SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, TTL: IntervalRule{Unit: DAY, Num: 3}, ShardNum: 1, TSTableCreator: MockTSTableCreator, - // Add the boundary update function - SegmentBoundaryUpdateFn: tracker.recordUpdate, } ctx := context.Background() mc := timestamp.NewMockClock() @@ -240,7 +163,7 @@ func setUpDB(t *testing.T) (*database[*MockTSTable, any], timestamp.MockClock, * db := tsdb.(*database[*MockTSTable, any]) segments, _ := db.segmentController.segments(false) require.Equal(t, len(segments), 1) - return db, mc, db.segmentController, tracker, func() { + return db, mc, db.segmentController, func() { tsdb.Close() defFn() } diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 27bb2059..3193697a 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -30,11 +30,9 @@ import ( "time" "github.com/pkg/errors" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -278,41 +276,39 @@ func (s *segment[T, O]) String() string { } type segmentController[T TSTable, O any] struct { - clock timestamp.Clock - metrics Metrics - opts *TSDBOpts[T, O] - l *logger.Logger - indexMetrics *inverted.Metrics - segmentBoundaryUpdateFn SegmentBoundaryUpdateFn - position common.Position - db string - stage string - location string - lst []*segment[T, O] - deadline atomic.Int64 - idleTimeout time.Duration - optsMutex sync.RWMutex + clock timestamp.Clock + metrics Metrics + opts *TSDBOpts[T, O] + l *logger.Logger + indexMetrics *inverted.Metrics + position common.Position + db string + stage string + location string + lst []*segment[T, O] + deadline atomic.Int64 + idleTimeout time.Duration + optsMutex sync.RWMutex sync.RWMutex } func newSegmentController[T TSTable, O any](ctx context.Context, location string, l *logger.Logger, opts TSDBOpts[T, O], indexMetrics *inverted.Metrics, metrics Metrics, - segmentsBoundaryUpdateFn SegmentBoundaryUpdateFn, idleTimeout time.Duration, + idleTimeout time.Duration, ) *segmentController[T, O] { clock, _ := timestamp.GetClock(ctx) p := common.GetPosition(ctx) return &segmentController[T, O]{ - location: location, - opts: &opts, - l: l, - clock: clock, - position: common.GetPosition(ctx), - metrics: metrics, - indexMetrics: indexMetrics, - segmentBoundaryUpdateFn: segmentsBoundaryUpdateFn, - stage: p.Stage, - db: p.Database, - idleTimeout: idleTimeout, + location: location, + opts: &opts, + l: l, + clock: clock, + position: common.GetPosition(ctx), + metrics: metrics, + indexMetrics: indexMetrics, + stage: p.Stage, + db: p.Database, + idleTimeout: idleTimeout, } } @@ -364,31 +360,9 @@ func (sc *segmentController[T, O]) createSegment(ts time.Time) (*segment[T, O], if err != nil { return nil, err } - sc.notifySegmentBoundaryUpdate() return s, s.incRef(context.WithValue(context.Background(), logger.ContextKey, sc.l)) } -func (sc *segmentController[T, O]) notifySegmentBoundaryUpdate() { - if sc.segmentBoundaryUpdateFn == nil { - return - } - // No error if we do not open closed segments. - segs, _ := sc.segments(false) - defer func() { - for i := range segs { - segs[i].DecRef() - } - }() - var tr *modelv1.TimeRange - if len(segs) > 0 { - tr = &modelv1.TimeRange{ - Begin: timestamppb.New(segs[0].Start), - End: timestamppb.New(segs[len(segs)-1].End), - } - } - sc.segmentBoundaryUpdateFn(sc.stage, sc.db, tr) -} - func (sc *segmentController[T, O]) segments(reopenClosed bool) (ss []*segment[T, O], err error) { sc.RLock() defer sc.RUnlock() @@ -570,7 +544,6 @@ func (sc *segmentController[T, O]) remove(deadline time.Time) (hasSegment bool, } s.DecRef() } - sc.notifySegmentBoundaryUpdate() return hasSegment, err } @@ -607,7 +580,6 @@ func (sc *segmentController[T, O]) deleteExpiredSegments(timeRange timestamp.Tim } s.DecRef() } - sc.notifySegmentBoundaryUpdate() return count } diff --git a/banyand/internal/storage/segment_test.go b/banyand/internal/storage/segment_test.go index 094bf475..87f62dbe 100644 --- a/banyand/internal/storage/segment_test.go +++ b/banyand/internal/storage/segment_test.go @@ -108,7 +108,6 @@ func TestSegmentOpenAndReopen(t *testing.T) { opts, nil, // indexMetrics nil, // metrics - nil, // segmentBoundaryUpdateFn 5*time.Minute, // idleTimeout ) @@ -199,7 +198,6 @@ func TestSegmentCloseIfIdle(t *testing.T) { opts, nil, // indexMetrics nil, // metrics - nil, // segmentBoundaryUpdateFn time.Second, // Set short idle timeout for testing ) @@ -282,7 +280,6 @@ func TestCloseIdleAndSelectSegments(t *testing.T) { opts, nil, // indexMetrics nil, // metrics - nil, // segmentBoundaryUpdateFn idleTimeout, // short idle timeout ) @@ -414,7 +411,6 @@ func TestOpenExistingSegmentWithShards(t *testing.T) { opts, nil, // indexMetrics nil, // metrics - nil, // segmentBoundaryUpdateFn 5*time.Minute, // idleTimeout ) diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index eca40aa7..a0ff3eb7 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -29,7 +29,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index/inverted" @@ -50,16 +49,12 @@ const ( lockFilename = "lock" ) -// SegmentBoundaryUpdateFn is a callback function to update the segment boundary. -type SegmentBoundaryUpdateFn func(stage, group string, boundary *modelv1.TimeRange) - // TSDBOpts wraps options to create a tsdb. type TSDBOpts[T TSTable, O any] struct { Option O TableMetrics Metrics TSTableCreator TSTableCreator[T, O] StorageMetricsFactory *observability.Factory - SegmentBoundaryUpdateFn SegmentBoundaryUpdateFn Location string SegmentInterval IntervalRule TTL IntervalRule @@ -138,7 +133,7 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O]) (TSDB[ tsEventCh: make(chan int64), p: p, segmentController: newSegmentController(ctx, location, - l, opts, indexMetrics, opts.TableMetrics, opts.SegmentBoundaryUpdateFn, opts.SegmentIdleTimeout), + l, opts, indexMetrics, opts.TableMetrics, opts.SegmentIdleTimeout), metrics: newMetrics(opts.StorageMetricsFactory), disableRetention: opts.DisableRetention, } diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 85560df8..dca0b6d2 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -419,7 +419,6 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error SeriesIndexFlushTimeoutSeconds: s.option.flushTimeout.Nanoseconds() / int64(time.Second), SeriesIndexCacheMaxBytes: int(s.option.seriesCacheMaxSize), StorageMetricsFactory: factory, - SegmentBoundaryUpdateFn: s.metadata.UpdateSegmentsBoundary, SegmentIdleTimeout: segmentIdleTimeout, } return storage.OpenTSDB( diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index 7e410af7..f935f432 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -21,7 +21,6 @@ import ( "context" "os" "os/signal" - "strings" "sync" "syscall" "time" @@ -33,7 +32,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" @@ -361,50 +359,3 @@ func contains(s []string, e string) bool { } return false } - -func (s *clientService) UpdateSegmentsBoundary(stage, group string, boundary *modelv1.TimeRange) { - s.nodeInfoMux.Lock() - defer s.nodeInfoMux.Unlock() - nodeInfo := s.nodeInfo - b := nodeInfo.DataSegmentsBoundary - if b == nil { - b = make(map[string]*modelv1.TimeRange) - } - key := getNodeBoundaryKey(group, stage) - if boundary == nil { - delete(b, key) - } else { - b[key] = boundary - } - nodeInfo.DataSegmentsBoundary = b - s.nodeInfo = nodeInfo - if err := s.schemaRegistry.UpdateNode(context.Background(), nodeInfo); err != nil { - logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to update node") - } -} - -func getNodeBoundaryKey(items ...string) string { - return strings.Join(items, "|") -} - -// FindSegmentsBoundary finds the segments boundary for the given group. -func FindSegmentsBoundary(nodeInfo *databasev1.Node, group string) *modelv1.TimeRange { - b := nodeInfo.DataSegmentsBoundary - if len(b) == 0 { - return nil - } - result := &modelv1.TimeRange{} - for g, tr := range b { - segments := strings.Split(g, "|") - if segments[0] != group { - continue - } - if result.Begin == nil || result.Begin.AsTime().After(tr.Begin.AsTime()) { - result.Begin = tr.Begin - } - if result.End == nil || result.End.AsTime().Before(tr.End.AsTime()) { - result.End = tr.End - } - } - return result -} diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go index e7e981e7..14467c50 100644 --- a/banyand/metadata/metadata.go +++ b/banyand/metadata/metadata.go @@ -24,7 +24,6 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -49,7 +48,6 @@ type Repo interface { GroupRegistry() schema.Group TopNAggregationRegistry() schema.TopNAggregation RegisterHandler(string, schema.Kind, schema.EventHandler) - UpdateSegmentsBoundary(stage, group string, boundary *modelv1.TimeRange) NodeRegistry() schema.Node PropertyRegistry() schema.Property } diff --git a/banyand/metadata/schema/register_test.go b/banyand/metadata/schema/register_test.go index 8280e150..1e9ceef8 100644 --- a/banyand/metadata/schema/register_test.go +++ b/banyand/metadata/schema/register_test.go @@ -26,11 +26,9 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" "github.com/onsi/gomega/gleak" - "google.golang.org/protobuf/types/known/timestamppb" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/test" @@ -120,27 +118,4 @@ var _ = ginkgo.Describe("etcd_register", func() { return err }, flags.EventuallyTimeout).ShouldNot(gomega.HaveOccurred()) }) - - ginkgo.It("should update node's data segments boundary", func() { - gomega.Expect(r.Register(context.Background(), md, false)).ShouldNot(gomega.HaveOccurred()) - - n, err := r.GetNode(context.Background(), node) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - - newBoundary := &modelv1.TimeRange{ - Begin: timestamppb.New(time.Unix(1000, 0)), - End: timestamppb.New(time.Unix(2000, 0)), - } - n.DataSegmentsBoundary = map[string]*modelv1.TimeRange{"default": newBoundary} - - gomega.Expect(r.UpdateNode(context.Background(), n)).ShouldNot(gomega.HaveOccurred()) - - updatedNode, err := r.GetNode(context.Background(), node) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - gomega.Expect(updatedNode.DataSegmentsBoundary).ShouldNot(gomega.BeNil()) - gomega.Expect(updatedNode.DataSegmentsBoundary).Should(gomega.HaveLen(1)) - gomega.Expect(updatedNode.DataSegmentsBoundary["default"]).ShouldNot(gomega.BeNil()) - gomega.Expect(updatedNode.DataSegmentsBoundary["default"].Begin.Seconds).Should(gomega.Equal(int64(1000))) - gomega.Expect(updatedNode.DataSegmentsBoundary["default"].End.Seconds).Should(gomega.Equal(int64(2000))) - }) }) diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go index a96d7da8..43a43736 100644 --- a/banyand/queue/pub/client_test.go +++ b/banyand/queue/pub/client_test.go @@ -26,7 +26,6 @@ import ( "github.com/onsi/gomega/gleak" "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" @@ -147,71 +146,6 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { verifyClientsWithGomega(g, p, data.TopicStreamWrite, 1, 0, 2, 1) }, flags.EventuallyTimeout).Should(gomega.Succeed()) }) - - ginkgo.It("should update node when labels or data boundaries change", func() { - addr1 := getAddress() - closeFn := setup(addr1, codes.OK, 200*time.Millisecond) - defer closeFn() - p := newPub() - defer p.GracefulStop() - - // Replace hard-coded "service" with the Service constant - group1 := svc - now := time.Now() - timeRange1 := &modelv1.TimeRange{ - Begin: timestamppb.New(now.Add(-3 * time.Hour)), - End: timestamppb.New(now.Add(-2 * time.Hour)), - } - - initialLabels := map[string]string{ - "role": "ingest", - "zone": "east", - } - initialBoundaries := map[string]*modelv1.TimeRange{ - group1: timeRange1, - } - - node1 := getDataNodeWithLabels("node1", addr1, initialLabels, initialBoundaries) - p.OnAddOrUpdate(node1) - verifyClients(p, 1, 0, 1, 0) - - p.mu.RLock() - registeredNode := p.registered["node1"] - gomega.Expect(registeredNode.Labels).Should(gomega.Equal(initialLabels)) - gomega.Expect(registeredNode.DataSegmentsBoundary).Should(gomega.Equal(initialBoundaries)) - p.mu.RUnlock() - - updatedLabels := map[string]string{ - "role": "query", - "zone": "east", - "env": "prod", - } - updatedNode1 := getDataNodeWithLabels("node1", addr1, updatedLabels, initialBoundaries) - p.OnAddOrUpdate(updatedNode1) - - p.mu.RLock() - registeredNode = p.registered["node1"] - gomega.Expect(registeredNode.Labels).Should(gomega.Equal(updatedLabels)) - gomega.Expect(len(p.active)).Should(gomega.Equal(1)) - p.mu.RUnlock() - - timeRange2 := &modelv1.TimeRange{ - Begin: timestamppb.New(now.Add(-1 * time.Hour)), - End: timestamppb.New(now), - } - updatedBoundaries := map[string]*modelv1.TimeRange{ - group1: timeRange1, - "inventory": timeRange2, - } - updatedNode2 := getDataNodeWithLabels("node1", addr1, updatedLabels, updatedBoundaries) - p.OnAddOrUpdate(updatedNode2) - - p.mu.RLock() - registeredNode = p.registered["node1"] - gomega.Expect(registeredNode.DataSegmentsBoundary).Should(gomega.Equal(updatedBoundaries)) - gomega.Expect(len(p.active)).Should(gomega.Equal(1)) - p.mu.RUnlock() - }) }) func verifyClients(p *pub, active, evict, onAdd, onDelete int) { diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index da5efca0..ae63065d 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -45,7 +45,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" - "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( @@ -129,7 +128,7 @@ func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages bus.Mes names[n.Metadata.GetName()] = struct{}{} } } else { - for g, sel := range messages.NodeSelectors() { + for _, sel := range messages.NodeSelectors() { var matches []MatchFunc if sel == nil { matches = bypassMatches @@ -143,12 +142,8 @@ func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages bus.Mes } } for _, n := range nodes { - tr := metadata.FindSegmentsBoundary(n, g) - if tr == nil { - continue - } for _, m := range matches { - if m(n.Labels) && timestamp.PbHasOverlap(messages.TimeRange(), tr) { + if m(n.Labels) { names[n.Metadata.Name] = struct{}{} break } diff --git a/banyand/queue/pub/pub_suite_test.go b/banyand/queue/pub/pub_suite_test.go index 8a1001bb..e6bb424a 100644 --- a/banyand/queue/pub/pub_suite_test.go +++ b/banyand/queue/pub/pub_suite_test.go @@ -237,10 +237,9 @@ func getDataNode(name string, address string) schema.Metadata { } } -func getDataNodeWithLabels(name string, address string, labels map[string]string, dataBoundary map[string]*modelv1.TimeRange) schema.Metadata { +func getDataNodeWithLabels(name string, address string, labels map[string]string) schema.Metadata { node := getDataNode(name, address) nodeInfo := node.Spec.(*databasev1.Node) nodeInfo.Labels = labels - nodeInfo.DataSegmentsBoundary = dataBoundary return node } diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go index daaf2505..a8f4251b 100644 --- a/banyand/queue/pub/pub_test.go +++ b/banyand/queue/pub/pub_test.go @@ -311,21 +311,15 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { "role": "ingest", "zone": "east", } - node1Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange, - } node2Labels := map[string]string{ "role": "query", "zone": "west", } - node2Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange, - } - node1 := getDataNodeWithLabels("node1", addr1, node1Labels, node1Boundaries) + node1 := getDataNodeWithLabels("node1", addr1, node1Labels) p.OnAddOrUpdate(node1) - node2 := getDataNodeWithLabels("node2", addr2, node2Labels, node2Boundaries) + node2 := getDataNodeWithLabels("node2", addr2, node2Labels) p.OnAddOrUpdate(node2) nodeSelectors := map[string][]string{ @@ -368,21 +362,15 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { "role": "ingest", "zone": "east", } - node1Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange, - } node2Labels := map[string]string{ "role": "query", "zone": "west", } - node2Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange, - } - node1 := getDataNodeWithLabels("node1", addr1, node1Labels, node1Boundaries) + node1 := getDataNodeWithLabels("node1", addr1, node1Labels) p.OnAddOrUpdate(node1) - node2 := getDataNodeWithLabels("node2", addr2, node2Labels, node2Boundaries) + node2 := getDataNodeWithLabels("node2", addr2, node2Labels) p.OnAddOrUpdate(node2) nodeSelectors := map[string][]string{ @@ -399,174 +387,5 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) gomega.Expect(messages).Should(gomega.HaveLen(1)) }) - - ginkgo.It("should broadcast messages based on time range overlap", func() { - addr1 := getAddress() - addr2 := getAddress() - addr3 := getAddress() - closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) - closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond) - closeFn3 := setup(addr3, codes.OK, 10*time.Millisecond) - p := newPub() - defer func() { - p.GracefulStop() - closeFn1() - closeFn2() - closeFn3() - }() - - group1 := svc - - now := time.Now() - timeRange1 := &modelv1.TimeRange{ - Begin: timestamppb.New(now.Add(-3 * time.Hour)), - End: timestamppb.New(now.Add(-2 * time.Hour)), - } - - timeRange2 := &modelv1.TimeRange{ - Begin: timestamppb.New(now.Add(-2 * time.Hour)), - End: timestamppb.New(now.Add(-1 * time.Hour)), - } - - timeRange3 := &modelv1.TimeRange{ - Begin: timestamppb.New(now.Add(-1 * time.Hour)), - End: timestamppb.New(now), - } - - node1Labels := map[string]string{"zone": "east"} - node1Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange1, - } - - node2Labels := map[string]string{"zone": "east"} - node2Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange2, - } - - node3Labels := map[string]string{"zone": "east"} - node3Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange3, - } - - node1 := getDataNodeWithLabels("node1", addr1, node1Labels, node1Boundaries) - p.OnAddOrUpdate(node1) - node2 := getDataNodeWithLabels("node2", addr2, node2Labels, node2Boundaries) - p.OnAddOrUpdate(node2) - node3 := getDataNodeWithLabels("node3", addr3, node3Labels, node3Boundaries) - p.OnAddOrUpdate(node3) - - queryTimeRange := &modelv1.TimeRange{ - Begin: timestamppb.New(now.Add(-2 * time.Hour).Add(-30 * time.Minute)), - End: timestamppb.New(now.Add(-1 * time.Hour).Add(-30 * time.Minute)), - } - - nodeSelectors := map[string][]string{ - group1: {""}, - } - - ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, - bus.NewMessageWithNodeSelectors(bus.MessageID(1), nodeSelectors, queryTimeRange, &streamv1.QueryRequest{})) - - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - gomega.Expect(ff).Should(gomega.HaveLen(2)) - - for _, f := range ff { - messages, err := f.GetAll() - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - gomega.Expect(messages).Should(gomega.HaveLen(1)) - } - }) - - ginkgo.It("should broadcast messages with both label selector and time range filter", func() { - addr1 := getAddress() - addr2 := getAddress() - addr3 := getAddress() - addr4 := getAddress() - closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) - closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond) - closeFn3 := setup(addr3, codes.OK, 10*time.Millisecond) - closeFn4 := setup(addr4, codes.OK, 10*time.Millisecond) - p := newPub() - defer func() { - p.GracefulStop() - closeFn1() - closeFn2() - closeFn3() - closeFn4() - }() - - group1 := svc - - now := time.Now() - timeRange1 := &modelv1.TimeRange{ - Begin: timestamppb.New(now.Add(-2 * time.Hour)), - End: timestamppb.New(now.Add(-1 * time.Hour)), - } - - timeRange2 := &modelv1.TimeRange{ - Begin: timestamppb.New(now.Add(-1 * time.Hour)), - End: timestamppb.New(now), - } - - node1Labels := map[string]string{ - "env": "prod", - "zone": "east", - } - node1Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange1, - } - - node2Labels := map[string]string{ - "env": "prod", - "zone": "west", - } - node2Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange1, - } - - node3Labels := map[string]string{ - "env": "dev", - "zone": "east", - } - node3Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange2, - } - - node4Labels := map[string]string{ - "env": "prod", - "zone": "east", - } - node4Boundaries := map[string]*modelv1.TimeRange{ - group1: timeRange2, - } - - node1 := getDataNodeWithLabels("node1", addr1, node1Labels, node1Boundaries) - p.OnAddOrUpdate(node1) - node2 := getDataNodeWithLabels("node2", addr2, node2Labels, node2Boundaries) - p.OnAddOrUpdate(node2) - node3 := getDataNodeWithLabels("node3", addr3, node3Labels, node3Boundaries) - p.OnAddOrUpdate(node3) - node4 := getDataNodeWithLabels("node4", addr4, node4Labels, node4Boundaries) - p.OnAddOrUpdate(node4) - - queryTimeRange := &modelv1.TimeRange{ - Begin: timestamppb.New(now.Add(-30 * time.Minute)), - End: timestamppb.New(now.Add(30 * time.Minute)), - } - - nodeSelectors := map[string][]string{ - group1: {"env=prod"}, - } - - ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, - bus.NewMessageWithNodeSelectors(bus.MessageID(1), nodeSelectors, queryTimeRange, &streamv1.QueryRequest{})) - - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - gomega.Expect(ff).Should(gomega.HaveLen(1)) - - messages, err := ff[0].GetAll() - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - gomega.Expect(messages).Should(gomega.HaveLen(1)) - }) }) }) diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 55624e6f..82b28495 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -331,7 +331,6 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error SeriesIndexFlushTimeoutSeconds: s.option.flushTimeout.Nanoseconds() / int64(time.Second), SeriesIndexCacheMaxBytes: int(s.option.seriesCacheMaxSize), StorageMetricsFactory: s.omr.With(storageScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), p.DBLabelValues()))), - SegmentBoundaryUpdateFn: s.metadata.UpdateSegmentsBoundary, SegmentIdleTimeout: segmentIdleTimeout, } return storage.OpenTSDB( diff --git a/docs/api-reference.md b/docs/api-reference.md index 9fd262ad..ed3ec970 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -36,6 +36,13 @@ - [Tag](#banyandb-common-v1-Tag) - [Trace](#banyandb-common-v1-Trace) +- [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto) + - [Node](#banyandb-database-v1-Node) + - [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry) + - [Shard](#banyandb-database-v1-Shard) + + - [Role](#banyandb-database-v1-Role) + - [banyandb/model/v1/common.proto](#banyandb_model_v1_common-proto) - [FieldValue](#banyandb-model-v1-FieldValue) - [Float](#banyandb-model-v1-Float) @@ -65,14 +72,6 @@ - [LogicalExpression.LogicalOp](#banyandb-model-v1-LogicalExpression-LogicalOp) - [Sort](#banyandb-model-v1-Sort) -- [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto) - - [Node](#banyandb-database-v1-Node) - - [Node.DataSegmentsBoundaryEntry](#banyandb-database-v1-Node-DataSegmentsBoundaryEntry) - - [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry) - - [Shard](#banyandb-database-v1-Shard) - - - [Role](#banyandb-database-v1-Role) - - [banyandb/database/v1/schema.proto](#banyandb_database_v1_schema-proto) - [Entity](#banyandb-database-v1-Entity) - [FieldSpec](#banyandb-database-v1-FieldSpec) @@ -663,6 +662,93 @@ Trace is the top level message of a trace. +<a name="banyandb_database_v1_database-proto"></a> +<p align="right"><a href="#top">Top</a></p> + +## banyandb/database/v1/database.proto + + + +<a name="banyandb-database-v1-Node"></a> + +### Node + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | +| roles | [Role](#banyandb-database-v1-Role) | repeated | | +| grpc_address | [string](#string) | | | +| http_address | [string](#string) | | | +| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | +| labels | [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry) | repeated | labels is a set of key-value pairs to describe the node. | + + + + + + +<a name="banyandb-database-v1-Node-LabelsEntry"></a> + +### Node.LabelsEntry + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| key | [string](#string) | | | +| value | [string](#string) | | | + + + + + + +<a name="banyandb-database-v1-Shard"></a> + +### Shard + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| id | [uint64](#uint64) | | | +| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | +| catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) | | | +| node | [string](#string) | | | +| total | [uint32](#uint32) | | | +| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | +| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | + + + + + + + + +<a name="banyandb-database-v1-Role"></a> + +### Role + + +| Name | Number | Description | +| ---- | ------ | ----------- | +| ROLE_UNSPECIFIED | 0 | | +| ROLE_META | 1 | | +| ROLE_DATA | 2 | | +| ROLE_LIAISON | 3 | | + + + + + + + + + + <a name="banyandb_model_v1_common-proto"></a> <p align="right"><a href="#top">Top</a></p> @@ -1076,110 +1162,6 @@ Each item in a string array is seen as a token instead of a query expression. -<a name="banyandb_database_v1_database-proto"></a> -<p align="right"><a href="#top">Top</a></p> - -## banyandb/database/v1/database.proto - - - -<a name="banyandb-database-v1-Node"></a> - -### Node - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | -| roles | [Role](#banyandb-database-v1-Role) | repeated | | -| grpc_address | [string](#string) | | | -| http_address | [string](#string) | | | -| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | -| labels | [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry) | repeated | labels is a set of key-value pairs to describe the node. | -| data_segments_boundary | [Node.DataSegmentsBoundaryEntry](#banyandb-database-v1-Node-DataSegmentsBoundaryEntry) | repeated | data_segments_boundary is the time range of the data segments that the node is responsible for. [start, end) is the time range. | - - - - - - -<a name="banyandb-database-v1-Node-DataSegmentsBoundaryEntry"></a> - -### Node.DataSegmentsBoundaryEntry - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| key | [string](#string) | | | -| value | [banyandb.model.v1.TimeRange](#banyandb-model-v1-TimeRange) | | | - - - - - - -<a name="banyandb-database-v1-Node-LabelsEntry"></a> - -### Node.LabelsEntry - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| key | [string](#string) | | | -| value | [string](#string) | | | - - - - - - -<a name="banyandb-database-v1-Shard"></a> - -### Shard - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| id | [uint64](#uint64) | | | -| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | -| catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) | | | -| node | [string](#string) | | | -| total | [uint32](#uint32) | | | -| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | -| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | - - - - - - - - -<a name="banyandb-database-v1-Role"></a> - -### Role - - -| Name | Number | Description | -| ---- | ------ | ----------- | -| ROLE_UNSPECIFIED | 0 | | -| ROLE_META | 1 | | -| ROLE_DATA | 2 | | -| ROLE_LIAISON | 3 | | - - - - - - - - - - <a name="banyandb_database_v1_schema-proto"></a> <p align="right"><a href="#top">Top</a></p> diff --git a/pkg/test/helpers/context.go b/pkg/test/helpers/context.go index 82854237..edd816e5 100644 --- a/pkg/test/helpers/context.go +++ b/pkg/test/helpers/context.go @@ -38,15 +38,17 @@ type SharedContext struct { // Args is a wrapper seals all necessary info for table specs. type Args struct { - Begin *timestamppb.Timestamp - End *timestamppb.Timestamp - Input string - Want string - Offset time.Duration - Duration time.Duration - WantEmpty bool - WantErr bool - DisOrder bool + Begin *timestamppb.Timestamp + End *timestamppb.Timestamp + Input string + Want string + Stages []string + Offset time.Duration + Duration time.Duration + WantEmpty bool + WantErr bool + DisOrder bool + IgnoreElementID bool } // UnmarshalYAML decodes YAML raw bytes to proto.Message. @@ -80,9 +82,11 @@ type BackupSharedContext struct { // LifecycleSharedContext is the context shared between test cases in the lifecycle testing. type LifecycleSharedContext struct { - DataAddr string - Connection *grpclib.ClientConn - SrcDir string - DestDir string - EtcdAddr string + BaseTime time.Time + Connection *grpclib.ClientConn + LiaisonAddr string + DataAddr string + SrcDir string + DestDir string + EtcdAddr string } diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go index 4838c2eb..54910694 100644 --- a/pkg/test/measure/etcd.go +++ b/pkg/test/measure/etcd.go @@ -38,6 +38,7 @@ const ( indexRuleDir = "testdata/index_rules" indexRuleBindingDir = "testdata/index_rule_bindings" topNAggregationDir = "testdata/topn_aggregations" + groupStagesDir = "testdata/group_stages" // new directory ) //go:embed testdata/* @@ -45,31 +46,52 @@ var store embed.FS // PreloadSchema loads schemas from files in the booting process. func PreloadSchema(ctx context.Context, e schema.Registry) error { - if err := loadSchema(groupDir, &commonv1.Group{}, func(group *commonv1.Group) error { - return e.CreateGroup(ctx, group) - }); err != nil { - return errors.WithStack(err) - } - if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure *databasev1.Measure) error { - _, innerErr := e.CreateMeasure(ctx, measure) - return innerErr - }); err != nil { - return errors.WithStack(err) - } - if err := loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error { - return e.CreateIndexRule(ctx, indexRule) - }); err != nil { - return errors.WithStack(err) - } - if err := loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(indexRuleBinding *databasev1.IndexRuleBinding) error { - return e.CreateIndexRuleBinding(ctx, indexRuleBinding) - }); err != nil { - return errors.WithStack(err) - } - if err := loadSchema(topNAggregationDir, &databasev1.TopNAggregation{}, func(topN *databasev1.TopNAggregation) error { - return e.CreateTopNAggregation(ctx, topN) - }); err != nil { - return errors.WithStack(err) + return loadAllSchemas(ctx, e, groupDir) +} + +// LoadSchemaWithStages loads group schemas from the groupStagesDir. +func LoadSchemaWithStages(ctx context.Context, e schema.Registry) error { + return loadAllSchemas(ctx, e, groupStagesDir) +} + +// loadAllSchemas is the common logic to load schemas from a given group directory. +func loadAllSchemas(ctx context.Context, e schema.Registry, groupDirectory string) error { + return preloadSchemaWithFuncs(ctx, e, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(groupDirectory, &commonv1.Group{}, func(group *commonv1.Group) error { + return e.CreateGroup(ctx, group) + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(measureDir, &databasev1.Measure{}, func(measure *databasev1.Measure) error { + _, innerErr := e.CreateMeasure(ctx, measure) + return innerErr + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error { + return e.CreateIndexRule(ctx, indexRule) + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(indexRuleBinding *databasev1.IndexRuleBinding) error { + return e.CreateIndexRuleBinding(ctx, indexRuleBinding) + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(topNAggregationDir, &databasev1.TopNAggregation{}, func(topN *databasev1.TopNAggregation) error { + return e.CreateTopNAggregation(ctx, topN) + }) + }, + ) +} + +// preloadSchemaWithFuncs extracts the common logic for loading schemas. +func preloadSchemaWithFuncs(ctx context.Context, e schema.Registry, loaders ...func(context.Context, schema.Registry) error) error { + for _, loader := range loaders { + if err := loader(ctx, e); err != nil { + return errors.WithStack(err) + } } return nil } diff --git a/pkg/test/measure/testdata/group_stages/exception.json b/pkg/test/measure/testdata/group_stages/exception.json new file mode 100644 index 00000000..fadbd5a5 --- /dev/null +++ b/pkg/test/measure/testdata/group_stages/exception.json @@ -0,0 +1,18 @@ +{ + "metadata": { + "name": "exception" + }, + "catalog": "CATALOG_MEASURE", + "resource_opts": { + "shard_num": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/measure/testdata/group_stages/index_mode.json b/pkg/test/measure/testdata/group_stages/index_mode.json new file mode 100644 index 00000000..fb0d9b4b --- /dev/null +++ b/pkg/test/measure/testdata/group_stages/index_mode.json @@ -0,0 +1,18 @@ +{ + "metadata": { + "name": "index_mode" + }, + "catalog": "CATALOG_MEASURE", + "resource_opts": { + "shard_num": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/measure/testdata/groups/sw_metric.json b/pkg/test/measure/testdata/group_stages/sw_metric.json similarity index 100% copy from pkg/test/measure/testdata/groups/sw_metric.json copy to pkg/test/measure/testdata/group_stages/sw_metric.json diff --git a/pkg/test/measure/testdata/groups/sw_metric.json b/pkg/test/measure/testdata/groups/sw_metric.json index 703ae9f5..a91becb7 100644 --- a/pkg/test/measure/testdata/groups/sw_metric.json +++ b/pkg/test/measure/testdata/groups/sw_metric.json @@ -12,22 +12,7 @@ "ttl": { "unit": "UNIT_DAY", "num": 7 - }, - "stages": [ - { - "name": "warm", - "shard_num": 1, - "segment_interval": { - "unit": "UNIT_DAY", - "num": 3 - }, - "ttl": { - "unit": "UNIT_DAY", - "num": 30 - }, - "node_selector": "type=warm" - } - ] + } }, "updated_at": "2021-04-15T01:30:15.01Z" } \ No newline at end of file diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go index 93dcf0fb..d48c3f4f 100644 --- a/pkg/test/stream/etcd.go +++ b/pkg/test/stream/etcd.go @@ -47,26 +47,12 @@ var ( streamStore embed.FS //go:embed testdata/group.json groupJSON string + //go:embed testdata/group_with_stages.json + groupWithStagesJSON string ) -// PreloadSchema loads schemas from files in the booting process. -func PreloadSchema(ctx context.Context, e schema.Registry) error { - if e == nil { - return nil - } - g := &commonv1.Group{} - if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil { - return err - } - _, err := e.GetGroup(ctx, g.Metadata.Name) - if !errors.Is(err, schema.ErrGRPCResourceNotFound) { - logger.Infof("group %s already exists", g.Metadata.Name) - return nil - } - if innerErr := e.CreateGroup(ctx, g); innerErr != nil { - return innerErr - } - +// loadSchemas loads streams, index rules, and index rule bindings. +func loadSchemas(ctx context.Context, e schema.Registry) error { streams, err := streamStore.ReadDir(streamDir) if err != nil { return err @@ -126,3 +112,46 @@ func PreloadSchema(ctx context.Context, e schema.Registry) error { return nil } + +// LoadSchemaWithStages loads schemas from files, including group stages. +func LoadSchemaWithStages(ctx context.Context, e schema.Registry) error { + if e == nil { + return nil + } + g := &commonv1.Group{} + if err := protojson.Unmarshal([]byte(groupWithStagesJSON), g); err != nil { + return err + } + _, err := e.GetGroup(ctx, g.Metadata.Name) + if !errors.Is(err, schema.ErrGRPCResourceNotFound) { + logger.Infof("group %s already exists", g.Metadata.Name) + return nil + } + if innerErr := e.CreateGroup(ctx, g); innerErr != nil { + return innerErr + } + + return loadSchemas(ctx, e) +} + +// PreloadSchema loads schemas from files in the booting process. +// This version loads group without stages. +func PreloadSchema(ctx context.Context, e schema.Registry) error { + if e == nil { + return nil + } + g := &commonv1.Group{} + if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil { + return err + } + _, err := e.GetGroup(ctx, g.Metadata.Name) + if !errors.Is(err, schema.ErrGRPCResourceNotFound) { + logger.Infof("group %s already exists", g.Metadata.Name) + return nil + } + if innerErr := e.CreateGroup(ctx, g); innerErr != nil { + return innerErr + } + + return loadSchemas(ctx, e) +} diff --git a/pkg/test/stream/testdata/group.json b/pkg/test/stream/testdata/group.json index f38a2436..b9e8dc80 100644 --- a/pkg/test/stream/testdata/group.json +++ b/pkg/test/stream/testdata/group.json @@ -12,22 +12,7 @@ "ttl": { "unit": "UNIT_DAY", "num": 3 - }, - "stages": [ - { - "name": "warm", - "shard_num": 1, - "segment_interval": { - "unit": "UNIT_DAY", - "num": 3 - }, - "ttl": { - "unit": "UNIT_DAY", - "num": 7 - }, - "node_selector": "type=warm" - } - ] + } }, "updated_at": "2021-04-15T01:30:15.01Z" } \ No newline at end of file diff --git a/pkg/test/stream/testdata/group.json b/pkg/test/stream/testdata/group_with_stages.json similarity index 100% copy from pkg/test/stream/testdata/group.json copy to pkg/test/stream/testdata/group_with_stages.json diff --git a/test/cases/lifecycle/lifecycle.go b/test/cases/lifecycle/lifecycle.go index 55f68ee0..b1ebbfc3 100644 --- a/test/cases/lifecycle/lifecycle.go +++ b/test/cases/lifecycle/lifecycle.go @@ -22,12 +22,20 @@ import ( "io/fs" "os" "path/filepath" + "time" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/apache/skywalking-banyandb/banyand/backup/lifecycle" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" + measureTestData "github.com/apache/skywalking-banyandb/test/cases/measure/data" + streamTestData "github.com/apache/skywalking-banyandb/test/cases/stream/data" + topNTestData "github.com/apache/skywalking-banyandb/test/cases/topn/data" ) // SharedContext is the shared context for the snapshot test cases. @@ -49,12 +57,87 @@ var _ = ginkgo.Describe("Lifecycle", func() { }) err = lifecycleCmd.Execute() gomega.Expect(err).NotTo(gomega.HaveOccurred()) - verifySourceDirectoriesBeforeMigration() + verifySourceDirectoriesAfterMigration() verifyDestinationDirectoriesAfterMigration() + conn, err := grpchelper.Conn(SharedContext.LiaisonAddr, 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + defer func() { + if conn != nil { + _ = conn.Close() + } + }() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + sc := helpers.SharedContext{ + Connection: conn, + BaseTime: SharedContext.BaseTime, + } + // Verify measure data lifecycle stages + verifyLifecycleStages(sc, measureTestData.VerifyFn, helpers.Args{ + Input: "all", + Duration: 25 * time.Minute, + Offset: -20 * time.Minute, + }) + + // Verify stream data lifecycle stages + verifyLifecycleStages(sc, streamTestData.VerifyFn, helpers.Args{ + Input: "all", + Duration: time.Hour, + IgnoreElementID: true, + }) + + // Verify topN data lifecycle stages + verifyLifecycleStages(sc, topNTestData.VerifyFn, helpers.Args{ + Input: "aggr_desc", + Duration: 25 * time.Minute, + Offset: -20 * time.Minute, + }) }) }) -func verifySourceDirectoriesBeforeMigration() { +func verifyLifecycleStages(sc helpers.SharedContext, verifyFn func(gomega.Gomega, helpers.SharedContext, helpers.Args), args helpers.Args) { + // Initial verification expecting error before migration + verifyFn(gomega.Default, sc, helpers.Args{ + Input: args.Input, + Duration: args.Duration, + Offset: args.Offset, + WantErr: true, + Stages: args.Stages, + }) + + // Verify hot+warm stages exist after migration + gomega.Eventually(func(innerGm gomega.Gomega) { + verifyFn(innerGm, sc, helpers.Args{ + Input: args.Input, + Duration: args.Duration, + Offset: args.Offset, + Stages: []string{"hot", "warm"}, + IgnoreElementID: args.IgnoreElementID, + }) + }, flags.EventuallyTimeout).Should(gomega.Succeed()) + + // Verify warm stage only after retention + gomega.Eventually(func(innerGm gomega.Gomega) { + verifyFn(innerGm, sc, helpers.Args{ + Input: args.Input, + Duration: args.Duration, + Offset: args.Offset, + Stages: []string{"warm"}, + IgnoreElementID: args.IgnoreElementID, + }) + }, flags.EventuallyTimeout).Should(gomega.Succeed()) + + // Verify hot stage is empty after retention + verifyFn(gomega.Default, sc, helpers.Args{ + Input: args.Input, + Duration: args.Duration, + Offset: args.Offset, + WantEmpty: true, + Stages: []string{"hot"}, + IgnoreElementID: args.IgnoreElementID, + }) +} + +func verifySourceDirectoriesAfterMigration() { streamSrcPath := filepath.Join(SharedContext.SrcDir, "stream", "data", "default") streamEntries, err := os.ReadDir(streamSrcPath) gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Stream source directory should exist") diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go index ff17c45c..f4c78699 100644 --- a/test/cases/measure/data/data.go +++ b/test/cases/measure/data/data.go @@ -54,6 +54,7 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args query := &measurev1.QueryRequest{} helpers.UnmarshalYAML(i, query) query.TimeRange = helpers.TimeRange(args, sharedContext) + query.Stages = args.Stages c := measurev1.NewMeasureServiceClient(sharedContext.Connection) ctx := context.Background() resp, err := c.Query(ctx, query) diff --git a/test/cases/stream/data/data.go b/test/cases/stream/data/data.go index 6701e166..b3df58d4 100644 --- a/test/cases/stream/data/data.go +++ b/test/cases/stream/data/data.go @@ -65,6 +65,7 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args query := &streamv1.QueryRequest{} helpers.UnmarshalYAML(i, query) query.TimeRange = helpers.TimeRange(args, sharedContext) + query.Stages = args.Stages c := streamv1.NewStreamServiceClient(sharedContext.Connection) ctx := context.Background() resp, err := c.Query(ctx, query) @@ -86,8 +87,10 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args innerGm.Expect(err).NotTo(gm.HaveOccurred()) want := &streamv1.QueryResponse{} helpers.UnmarshalYAML(ww, want) - for i := range want.Elements { - want.Elements[i].ElementId = hex.EncodeToString(convert.Uint64ToBytes(convert.HashStr(query.Name + "|" + want.Elements[i].ElementId))) + if !args.IgnoreElementID { + for i := range want.Elements { + want.Elements[i].ElementId = hex.EncodeToString(convert.Uint64ToBytes(convert.HashStr(query.Name + "|" + want.Elements[i].ElementId))) + } } if args.DisOrder { slices.SortFunc(want.Elements, func(a, b *streamv1.Element) int { @@ -97,10 +100,15 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args return strings.Compare(a.ElementId, b.ElementId) }) } - success := innerGm.Expect(cmp.Equal(resp, want, - protocmp.IgnoreUnknown(), + var extra []cmp.Option + extra = append(extra, protocmp.IgnoreUnknown(), protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"), - protocmp.Transform())). + protocmp.Transform()) + if args.IgnoreElementID { + extra = append(extra, protocmp.IgnoreFields(&streamv1.Element{}, "element_id")) + } + success := innerGm.Expect(cmp.Equal(resp, want, + extra...)). To(gm.BeTrue(), func() string { var j []byte j, err = protojson.Marshal(resp) diff --git a/test/cases/topn/data/data.go b/test/cases/topn/data/data.go index 983be9c4..67a1bbd3 100644 --- a/test/cases/topn/data/data.go +++ b/test/cases/topn/data/data.go @@ -46,6 +46,7 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args query := &measurev1.TopNRequest{} helpers.UnmarshalYAML(i, query) query.TimeRange = helpers.TimeRange(args, sharedContext) + query.Stages = args.Stages c := measurev1.NewMeasureServiceClient(sharedContext.Connection) ctx := context.Background() resp, err := c.TopN(ctx, query) diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env index 122c5d1b..103c9a00 100644 --- a/test/e2e-v2/script/env +++ b/test/e2e-v2/script/env @@ -26,6 +26,6 @@ SW_ROVER_COMMIT=4c0cb8429a96f190ea30eac1807008d523c749c3 SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3 SW_PREDICTOR_COMMIT=54a0197654a3781a6f73ce35146c712af297c994 -SW_OAP_COMMIT=c63fe21e75c7b898564add0035c220d9b242f7d2 +SW_OAP_COMMIT=03b03518616a91129cfced2972621df3f0bac3db SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=d3f8fe894d1a206164b73f5b523d2eb62d9e9965 SW_CTL_COMMIT=67cbc89dd7b214d5791321a7ca992f940cb586ba \ No newline at end of file diff --git a/test/integration/distributed/lifecycle/lifecycle_suite_test.go b/test/integration/distributed/lifecycle/lifecycle_suite_test.go index 0b5059fc..4fc99cbd 100644 --- a/test/integration/distributed/lifecycle/lifecycle_suite_test.go +++ b/test/integration/distributed/lifecycle/lifecycle_suite_test.go @@ -51,13 +51,15 @@ func TestLifecycle(t *testing.T) { } var ( - connection *grpc.ClientConn - srcDir string - destDir string - deferFunc func() - goods []gleak.Goroutine - dataAddr string - ep string + connection *grpc.ClientConn + srcDir string + destDir string + deferFunc func() + goods []gleak.Goroutine + dataAddr string + ep string + tenDaysBeforeNow time.Time + liaisonAddr string ) var _ = SynchronizedBeforeSuite(func() []byte { @@ -86,8 +88,8 @@ var _ = SynchronizedBeforeSuite(func() []byte { Expect(err).NotTo(HaveOccurred()) defer schemaRegistry.Close() ctx := context.Background() - test_stream.PreloadSchema(ctx, schemaRegistry) - test_measure.PreloadSchema(ctx, schemaRegistry) + test_stream.LoadSchemaWithStages(ctx, schemaRegistry) + test_measure.LoadSchemaWithStages(ctx, schemaRegistry) By("Starting hot data node") var closeDataNode0 func() dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep, "--node-labels", "type=hot", "--measure-flush-timeout", "0s", "--stream-flush-timeout", "0s") @@ -95,11 +97,12 @@ var _ = SynchronizedBeforeSuite(func() []byte { var closeDataNode1 func() _, destDir, closeDataNode1 = setup.DataNodeWithAddrAndDir(ep, "--node-labels", "type=warm", "--measure-flush-timeout", "0s", "--stream-flush-timeout", "0s") By("Starting liaison node") - liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep, "--data-node-selector", "type=hot") + var closerLiaisonNode func() + liaisonAddr, closerLiaisonNode = setup.LiaisonNode(ep, "--data-node-selector", "type=hot") By("Initializing test cases with 10 days before") ns := timestamp.NowMilli().UnixNano() now := time.Unix(0, ns-ns%int64(time.Minute)) - tenDaysBeforeNow := now.Add(-10 * 24 * time.Hour) + tenDaysBeforeNow = now.Add(-10 * 24 * time.Hour) test_cases.Initialize(liaisonAddr, tenDaysBeforeNow) deferFunc = func() { closerLiaisonNode() @@ -116,11 +119,13 @@ var _ = SynchronizedBeforeSuite(func() []byte { grpc.WithTransportCredentials(insecure.NewCredentials())) Expect(err).NotTo(HaveOccurred()) caseslifecycle.SharedContext = helpers.LifecycleSharedContext{ - DataAddr: dataAddr, - Connection: connection, - SrcDir: srcDir, - DestDir: destDir, - EtcdAddr: ep, + LiaisonAddr: liaisonAddr, + DataAddr: dataAddr, + Connection: connection, + SrcDir: srcDir, + DestDir: destDir, + EtcdAddr: ep, + BaseTime: tenDaysBeforeNow, } })
