This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 803408ff Add Load Balancer Feature to Liaiso (#685) 803408ff is described below commit 803408fff81491307a1338c724b439ea2c97d767 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Jun 18 20:23:57 2025 +0800 Add Load Balancer Feature to Liaiso (#685) --- CHANGES.md | 1 + banyand/backup/lifecycle/service.go | 19 +++- banyand/internal/storage/rotation_test.go | 31 +++++-- banyand/liaison/grpc/discovery.go | 10 +- banyand/liaison/grpc/measure.go | 118 ++++++++++++++++++++---- banyand/liaison/grpc/registry_test.go | 10 +- banyand/liaison/grpc/server.go | 102 +++++++++++++------- banyand/liaison/grpc/stream.go | 78 ++++++++++++++++ banyand/queue/pub/client.go | 13 ++- banyand/queue/pub/client_test.go | 18 ++++ banyand/queue/pub/pub.go | 80 ++++++++++------ banyand/queue/pub/pub_suite_test.go | 7 +- banyand/queue/pub/pub_tls_test.go | 4 +- banyand/queue/sub/server.go | 67 +++++++++----- pkg/cmdsetup/liaison.go | 46 +++++---- pkg/cmdsetup/standalone.go | 28 +++--- pkg/test/setup/setup.go | 6 +- test/integration/distributed/setup/node_test.go | 10 +- test/integration/etcd/client_test.go | 20 ++-- test/integration/standalone/other/disk_test.go | 13 ++- 20 files changed, 509 insertions(+), 172 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 43cc1e2b..582827ea 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,7 @@ Release Notes. - Replica: Move the TopN pre-calculation flow from the Data Node to the Liaison Node. - Add a wait and retry to write handlers to avoid the local metadata cache being loaded. - Implement versioning properties and replace physical deletion with the tombstone mechanism for the property database. +- Add Load Balancer Feature to Liaison. ### Bug Fixes diff --git a/banyand/backup/lifecycle/service.go b/banyand/backup/lifecycle/service.go index f7e673bc..a1dafce0 100644 --- a/banyand/backup/lifecycle/service.go +++ b/banyand/backup/lifecycle/service.go @@ -170,6 +170,8 @@ func (l *lifecycleService) action() error { return err } + l.l.Info().Msgf("starting migration for %d groups: %v", len(groups), getGroupNames(groups)) + if len(groups) == 0 { l.l.Info().Msg("no groups to process, all groups already completed") progress.Remove(l.progressFilePath, l.l) @@ -182,6 +184,10 @@ func (l *lifecycleService) action() error { l.l.Error().Err(err).Msg("failed to get snapshots") return err } + l.l.Info(). + Str("stream_snapshot", streamDir). + Str("measure_snapshot", measureDir). + Msg("created snapshots") progress.Save(l.progressFilePath, l.l) streamSVC, measureSVC, err := l.setupQuerySvc(ctx, streamDir, measureDir) if streamSVC != nil { @@ -208,6 +214,7 @@ func (l *lifecycleService) action() error { case commonv1.Catalog_CATALOG_STREAM: if streamSVC == nil { l.l.Error().Msgf("stream service is not available, skipping group: %s", g.Metadata.Name) + progress.MarkStreamError(g.Metadata.Name, "", fmt.Sprintf("stream service unavailable for group %s", g.Metadata.Name)) allGroupsCompleted = false continue } @@ -215,6 +222,7 @@ func (l *lifecycleService) action() error { case commonv1.Catalog_CATALOG_MEASURE: if measureSVC == nil { l.l.Error().Msgf("measure service is not available, skipping group: %s", g.Metadata.Name) + progress.MarkMeasureError(g.Metadata.Name, "", fmt.Sprintf("measure service unavailable for group %s", g.Metadata.Name)) allGroupsCompleted = false continue } @@ -326,6 +334,14 @@ func (l *lifecycleService) generateReport(p *Progress) { l.l.Info().Msg("rotated old migration reports") } +func getGroupNames(groups []*commonv1.Group) []string { + names := make([]string, 0, len(groups)) + for _, g := range groups { + names = append(names, g.Metadata.Name) + } + return names +} + func (l *lifecycleService) getGroupsToProcess(ctx context.Context, progress *Progress) ([]*commonv1.Group, error) { gg, err := l.metadata.GroupRegistry().ListGroup(ctx) if err != nil { @@ -381,10 +397,10 @@ func (l *lifecycleService) processStreamGroup(ctx context.Context, g *commonv1.G for _, s := range ss { if !progress.IsStreamCompleted(g.Metadata.Name, s.Metadata.Name) { allStreamsDone = false - break } } if allStreamsDone { + l.l.Info().Msgf("deleting expired stream segments for group: %s", g.Metadata.Name) l.deleteExpiredStreamSegments(ctx, g, tr, progress) progress.MarkGroupCompleted(g.Metadata.Name) progress.Save(l.progressFilePath, l.l) @@ -523,6 +539,7 @@ func (l *lifecycleService) processMeasureGroup(ctx context.Context, g *commonv1. } } if allMeasuresDone { + l.l.Info().Msgf("deleting expired measure segments for group: %s", g.Metadata.Name) l.deleteExpiredMeasureSegments(ctx, g, tr, progress) progress.MarkGroupCompleted(g.Metadata.Name) progress.Save(l.progressFilePath, l.l) diff --git a/banyand/internal/storage/rotation_test.go b/banyand/internal/storage/rotation_test.go index 20094639..26e8bd5d 100644 --- a/banyand/internal/storage/rotation_test.go +++ b/banyand/internal/storage/rotation_test.go @@ -62,7 +62,7 @@ func TestForwardRotation(t *testing.T) { func TestRetention(t *testing.T) { t.Run("delete the segment and index when the TTL is up", func(t *testing.T) { - tsdb, c, segCtrl, dfFn := setUpDB(t) + tsdb, c, segCtrl, dfFn := setUpDB(t, 5) // Use 5-day TTL to avoid early deletion defer dfFn() ts := c.Now() for i := 0; i < 4; i++ { @@ -82,13 +82,27 @@ func TestRetention(t *testing.T) { // amend the time to the next day ts = ts.Add(time.Hour) } - t.Logf("current time: %s", ts.Format(time.RFC3339)) + + // Verify all 5 segments exist before testing TTL deletion (initial + 4 created) + require.EventuallyWithTf(t, func(ct *assert.CollectT) { + segments, _ := segCtrl.segments(false) + if len(segments) != 5 { + ct.Errorf("expect 5 segments before TTL test, got %d", len(segments)) + } + }, flags.EventuallyTimeout, time.Millisecond, "wait for 5 segments to be created") + + // Now test TTL deletion by advancing time beyond the original TTL + // Move forward enough days to exceed the 5-day TTL we set + ts = ts.Add(2 * 24 * time.Hour) // Total of 6+ days from the first segment + t.Logf("current time after TTL advancement: %s", ts.Format(time.RFC3339)) c.Set(ts) tsdb.Tick(ts.UnixNano()) + assert.Eventually(t, func() bool { segments, _ := segCtrl.segments(false) - return len(segments) == 4 - }, flags.EventuallyTimeout, time.Millisecond, "wait for the 1st segment to be deleted") + // Should have fewer than 5 segments as old ones get deleted + return len(segments) < 5 + }, flags.EventuallyTimeout, time.Millisecond, "wait for old segments to be deleted by TTL") }) t.Run("keep the segment volume stable", func(t *testing.T) { @@ -138,13 +152,18 @@ func TestRetention(t *testing.T) { }) } -func setUpDB(t *testing.T) (*database[*MockTSTable, any], timestamp.MockClock, *segmentController[*MockTSTable, any], func()) { +func setUpDB(t *testing.T, ttlDays ...int) (*database[*MockTSTable, any], timestamp.MockClock, *segmentController[*MockTSTable, any], func()) { dir, defFn := test.Space(require.New(t)) + ttl := 3 + if len(ttlDays) > 0 { + ttl = ttlDays[0] + } + TSDBOpts := TSDBOpts[*MockTSTable, any]{ Location: dir, SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, - TTL: IntervalRule{Unit: DAY, Num: 3}, + TTL: IntervalRule{Unit: DAY, Num: ttl}, ShardNum: 1, TSTableCreator: MockTSTableCreator, } diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index 2d031b59..eed1807b 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -46,9 +46,12 @@ type discoveryService struct { kind schema.Kind } -func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry) *discoveryService { - gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)} - er := &entityRepo{entitiesMap: make(map[identity]partition.Locator), measureMap: make(map[identity]*databasev1.Measure)} +func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry, gr *groupRepo) *discoveryService { + er := &entityRepo{entitiesMap: make(map[identity]partition.Locator)} + return newDiscoveryServiceWithEntityRepo(kind, metadataRepo, nodeRegistry, gr, er) +} + +func newDiscoveryServiceWithEntityRepo(kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry, gr *groupRepo, er *entityRepo) *discoveryService { sr := &shardingKeyRepo{shardingKeysMap: make(map[identity]partition.Locator)} return &discoveryService{ groupRepo: gr, @@ -61,7 +64,6 @@ func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, nodeRegis } func (ds *discoveryService) initialize() error { - ds.metadataRepo.RegisterHandler("liaison", schema.KindGroup, ds.groupRepo) ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.entityRepo) if ds.kind == schema.KindMeasure { ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.shardingKeyRepo) diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 52a7b913..a4e84fa2 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" @@ -47,7 +48,6 @@ type measureService struct { ingestionAccessLog accesslog.Log pipeline queue.Client broadcaster queue.Client - topNService measure.TopNService *discoveryService l *logger.Logger metrics *metrics @@ -193,22 +193,6 @@ func (ms *measureService) processAndPublishRequest(ctx context.Context, writeReq messageID: writeRequest.GetMessageId(), nodes: nodes, }) - stm, ok := ms.entityRepo.loadMeasure(writeRequest.GetMetadata()) - if !ok { - ms.l.Error().RawJSON("written", logger.Proto(writeRequest)).Msg("failed to load measure schema") - ms.sendReply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure) - return errors.New("failed to load measure schema") - } - series := &pbv1.Series{ - Subject: writeRequest.Metadata.Name, - EntityValues: iwr.EntityValues, - } - if err := series.Marshal(); err != nil { - ms.l.Error().Err(err).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to marshal series") - ms.sendReply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure) - return err - } - ms.topNService.InFlow(stm, uint64(series.ID), iwr.ShardId, iwr.EntityValues, iwr.Request.DataPoint) return nil } @@ -393,3 +377,103 @@ type succeedSentMessage struct { nodes []string messageID uint64 } + +type measureRedirectWriteCallback struct { + pipeline queue.Client + nodeRegistry NodeRegistry + topNService measure.TopNService + l *logger.Logger + groupRepo *groupRepo + entityRepo *entityRepo + writeTimeout time.Duration + maxDiskUsagePercent int +} + +func (r *measureRedirectWriteCallback) CheckHealth() *common.Error { + if r.maxDiskUsagePercent < 1 { + return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "measure is readonly because \"measure-max-disk-usage-percent\" is 0") + } + return nil +} + +func (r *measureRedirectWriteCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { + events, ok := message.Data().([]any) + if !ok { + r.l.Warn().Msg("invalid event data type") + return + } + if len(events) < 1 { + r.l.Warn().Msg("empty event") + return + } + + publisher := r.pipeline.NewBatchPublisher(r.writeTimeout) + defer func() { + _, err := publisher.Close() + if err != nil { + r.l.Error().Err(err).Msg("failed to close publisher") + } + }() + + for i := range events { + var writeEvent *measurev1.InternalWriteRequest + switch e := events[i].(type) { + case *measurev1.InternalWriteRequest: + writeEvent = e + case []byte: + writeEvent = &measurev1.InternalWriteRequest{} + if err := proto.Unmarshal(e, writeEvent); err != nil { + r.l.Error().Err(err).RawJSON("written", e).Msg("fail to unmarshal event") + continue + } + default: + r.l.Warn().Msg("invalid event data type") + continue + } + + metadata := writeEvent.Request.GetMetadata() + if metadata == nil { + r.l.Warn().Msg("metadata is nil in InternalWriteRequest") + continue + } + + group := metadata.GetGroup() + measureName := metadata.GetName() + shardID := writeEvent.GetShardId() + + copies, ok := r.groupRepo.copies(group) + if !ok { + r.l.Error().Str("group", group).Msg("failed to get group copies") + continue + } + + for copyIdx := range copies { + nodeID, err := r.nodeRegistry.Locate(group, measureName, shardID, copyIdx) + if err != nil { + r.l.Error().Err(err).Str("group", group).Str("measure", measureName).Uint32("shard", shardID).Uint32("copy", copyIdx).Msg("failed to locate node") + continue + } + + msg := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, writeEvent) + if _, err := publisher.Publish(ctx, data.TopicMeasureWrite, msg); err != nil { + r.l.Error().Err(err).Str("node", nodeID).Msg("failed to publish message") + } + } + stm, ok := r.entityRepo.loadMeasure(metadata) + if !ok { + r.l.Error().RawJSON("written", logger.Proto(writeEvent)).Msg("failed to load measure schema") + continue + } + series := &pbv1.Series{ + Subject: metadata.Name, + EntityValues: writeEvent.EntityValues, + } + if err := series.Marshal(); err != nil { + r.l.Error().Err(err).RawJSON("written", logger.Proto(writeEvent)).Msg("failed to marshal series") + continue + } + r.topNService.InFlow(stm, uint64(series.ID), writeEvent.ShardId, writeEvent.EntityValues, writeEvent.Request.DataPoint) + } + + return +} diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go index c54295b3..3b3f58a6 100644 --- a/banyand/liaison/grpc/registry_test.go +++ b/banyand/liaison/grpc/registry_test.go @@ -182,11 +182,11 @@ func setupForRegistry() func() { metricSvc := observability.NewMetricService(metaSvc, pipeline, "standalone", nil) nr := grpc.NewLocalNodeRegistry() - tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, nil, metaSvc, grpc.NodeRegistries{ - MeasureNodeRegistry: nr, - StreamNodeRegistry: nr, - PropertyNodeRegistry: nr, - }, metricSvc, nil) + tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, pipeline, nil, metaSvc, grpc.NodeRegistries{ + MeasureLiaisonNodeRegistry: nr, + StreamDataNodeRegistry: nr, + PropertyNodeRegistry: nr, + }, metricSvc, nil, pipeline) preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc} var flags []string metaPath, metaDeferFunc, err := test.NewSpace() diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index b12d1839..dadf6ad6 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -47,6 +47,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/partition" "github.com/apache/skywalking-banyandb/pkg/run" pkgtls "github.com/apache/skywalking-banyandb/pkg/tls" ) @@ -71,36 +72,43 @@ type Server interface { // NodeRegistries contains the node registries. type NodeRegistries struct { - StreamNodeRegistry NodeRegistry - MeasureNodeRegistry NodeRegistry - PropertyNodeRegistry NodeRegistry + StreamLiaisonNodeRegistry NodeRegistry + StreamDataNodeRegistry NodeRegistry + MeasureLiaisonNodeRegistry NodeRegistry + MeasureDataNodeRegistry NodeRegistry + PropertyNodeRegistry NodeRegistry } type server struct { databasev1.UnimplementedSnapshotServiceServer - topNPipeline queue.Server - omr observability.MetricsRegistry - *indexRuleBindingRegistryServer - metrics *metrics - log *logger.Logger - *propertyServer + topNPipeline queue.Server + omr observability.MetricsRegistry + tire2Server queue.Server + schemaRepo metadata.Repo + measureCallback *measureRedirectWriteCallback + topNHandler *topNHandler *topNAggregationRegistryServer *groupRegistryServer stopCh chan struct{} *indexRuleRegistryServer *measureRegistryServer - streamSVC *streamService + streamSVC *streamService + streamCallback *streamRedirectWriteCallback *streamRegistryServer measureSVC *measureService + log *logger.Logger *propertyRegistryServer - ser *grpclib.Server - tlsReloader *pkgtls.Reloader - topNHandler *topNHandler - accessLogRootPath string - addr string - host string + ser *grpclib.Server + tlsReloader *pkgtls.Reloader + *propertyServer + *indexRuleBindingRegistryServer + groupRepo *groupRepo + metrics *metrics certFile string keyFile string + host string + addr string + accessLogRootPath string accessLogRecorders []accessLogRecorder maxRecvMsgSize run.Bytes port uint32 @@ -109,25 +117,41 @@ type server struct { } // NewServer returns a new gRPC server. -func NewServer(_ context.Context, pipeline, broadcaster queue.Client, topNPipeline queue.Server, +func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster queue.Client, topNPipeline queue.Server, schemaRegistry metadata.Repo, nr NodeRegistries, omr observability.MetricsRegistry, topNService measure.TopNService, + tire2Server queue.Server, ) Server { + gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)} + er := &entityRepo{entitiesMap: make(map[identity]partition.Locator), measureMap: make(map[identity]*databasev1.Measure)} streamSVC := &streamService{ - discoveryService: newDiscoveryService(schema.KindStream, schemaRegistry, nr.StreamNodeRegistry), - pipeline: pipeline, + discoveryService: newDiscoveryService(schema.KindStream, schemaRegistry, nr.StreamLiaisonNodeRegistry, gr), + pipeline: tir1Client, broadcaster: broadcaster, } measureSVC := &measureService{ - discoveryService: newDiscoveryService(schema.KindMeasure, schemaRegistry, nr.MeasureNodeRegistry), - pipeline: pipeline, + discoveryService: newDiscoveryServiceWithEntityRepo(schema.KindMeasure, schemaRegistry, nr.MeasureLiaisonNodeRegistry, gr, er), + pipeline: tir1Client, broadcaster: broadcaster, - topNService: topNService, } s := &server{ - omr: omr, - streamSVC: streamSVC, - measureSVC: measureSVC, + omr: omr, + streamSVC: streamSVC, + measureSVC: measureSVC, + groupRepo: gr, + tire2Server: tire2Server, + streamCallback: &streamRedirectWriteCallback{ + pipeline: tir2Client, + groupRepo: gr, + nodeRegistry: nr.StreamDataNodeRegistry, + }, + measureCallback: &measureRedirectWriteCallback{ + pipeline: tir2Client, + groupRepo: gr, + entityRepo: er, + nodeRegistry: nr.MeasureDataNodeRegistry, + topNService: topNService, + }, streamRegistryServer: &streamRegistryServer{ schemaRegistry: schemaRegistry, }, @@ -148,35 +172,46 @@ func NewServer(_ context.Context, pipeline, broadcaster queue.Client, topNPipeli }, propertyServer: &propertyServer{ schemaRegistry: schemaRegistry, - pipeline: pipeline, + pipeline: tir2Client, nodeRegistry: nr.PropertyNodeRegistry, - discoveryService: newDiscoveryService(schema.KindProperty, schemaRegistry, nr.MeasureNodeRegistry), + discoveryService: newDiscoveryService(schema.KindProperty, schemaRegistry, nr.PropertyNodeRegistry, gr), }, propertyRegistryServer: &propertyRegistryServer{ schemaRegistry: schemaRegistry, }, topNPipeline: topNPipeline, + schemaRepo: schemaRegistry, } s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC} + return s } func (s *server) PreRun(_ context.Context) error { s.log = logger.GetLogger("liaison-grpc") - s.streamSVC.setLogger(s.log) + s.streamSVC.setLogger(s.log.Named("stream-t1")) + s.streamCallback.l = s.log.Named("stream-t2") s.measureSVC.setLogger(s.log) s.propertyServer.SetLogger(s.log) + s.measureCallback.l = s.log.Named("measure-t2") components := []*discoveryService{ s.streamSVC.discoveryService, s.measureSVC.discoveryService, s.propertyServer.discoveryService, } + s.schemaRepo.RegisterHandler("liaison", schema.KindGroup, s.groupRepo) for _, c := range components { c.SetLogger(s.log) if err := c.initialize(); err != nil { return err } } + if err := s.tire2Server.Subscribe(data.TopicStreamWrite, s.streamCallback); err != nil { + return err + } + if err := s.tire2Server.Subscribe(data.TopicMeasureWrite, s.measureCallback); err != nil { + return err + } if s.enableIngestionAccessLog { for _, alr := range s.accessLogRecorders { @@ -209,8 +244,8 @@ func (s *server) PreRun(_ context.Context) error { if s.topNPipeline != nil { topNHandler := &topNHandler{ - nodeRegistry: s.measureSVC.nodeRegistry, - pipeline: s.pipeline, + nodeRegistry: s.measureCallback.nodeRegistry, + pipeline: s.measureCallback.pipeline, l: s.log.Named("topNHandler"), } if err := s.topNPipeline.Subscribe(data.TopicMeasureWrite, topNHandler); err != nil { @@ -245,12 +280,15 @@ func (s *server) FlagSet() *run.FlagSet { fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens") fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", false, "enable ingestion access log") fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access log root path") - fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 15*time.Second, "stream write timeout") - fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 15*time.Second, "measure write timeout") + fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 15*time.Second, "timeout for writing stream among liaison nodes") + fs.DurationVar(&s.streamCallback.writeTimeout, "stream-write-data-timeout", 15*time.Second, "timeout for writing stream data to the data nodes") + fs.DurationVar(&s.measureCallback.writeTimeout, "measure-write-data-timeout", 15*time.Second, "timeout for writing measure data to the data nodes") + fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 15*time.Second, "timeout for writing measure among liaison nodes") fs.DurationVar(&s.measureSVC.maxWaitDuration, "measure-metadata-cache-wait-duration", 0, "the maximum duration to wait for metadata cache to load (for testing purposes)") fs.DurationVar(&s.streamSVC.maxWaitDuration, "stream-metadata-cache-wait-duration", 0, "the maximum duration to wait for metadata cache to load (for testing purposes)") + fs.IntVar(&s.measureCallback.maxDiskUsagePercent, "liaison-measure-max-disk-usage-percent", 95, "the maximum disk usage percentage allowed") return fs } diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 8dc9ff5f..cf9baada 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" @@ -315,3 +316,80 @@ func (s *streamService) Close() error { } return nil } + +type streamRedirectWriteCallback struct { + *bus.UnImplementedHealthyListener + l *logger.Logger + pipeline queue.Client + groupRepo *groupRepo + nodeRegistry NodeRegistry + writeTimeout time.Duration +} + +func (r *streamRedirectWriteCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { + events, ok := message.Data().([]any) + if !ok { + r.l.Warn().Msg("invalid event data type") + return + } + if len(events) < 1 { + r.l.Warn().Msg("empty event") + return + } + + publisher := r.pipeline.NewBatchPublisher(r.writeTimeout) + defer func() { + _, err := publisher.Close() + if err != nil { + r.l.Error().Err(err).Msg("failed to close publisher") + } + }() + + for i := range events { + var writeEvent *streamv1.InternalWriteRequest + switch e := events[i].(type) { + case *streamv1.InternalWriteRequest: + writeEvent = e + case []byte: + writeEvent = &streamv1.InternalWriteRequest{} + if err := proto.Unmarshal(e, writeEvent); err != nil { + r.l.Error().Err(err).RawJSON("written", e).Msg("fail to unmarshal event") + continue + } + default: + r.l.Warn().Msg("invalid event data type") + continue + } + + metadata := writeEvent.Request.GetMetadata() + if metadata == nil { + r.l.Warn().Msg("metadata is nil in InternalWriteRequest") + continue + } + + group := metadata.GetGroup() + streamName := metadata.GetName() + shardID := writeEvent.GetShardId() + + copies, ok := r.groupRepo.copies(group) + if !ok { + r.l.Error().Str("group", group).Msg("failed to get group copies") + continue + } + + for copyIdx := range copies { + nodeID, err := r.nodeRegistry.Locate(group, streamName, shardID, copyIdx) + if err != nil { + r.l.Error().Err(err).Str("group", group).Str("stream", streamName).Uint32("shard", shardID).Uint32("copy", copyIdx).Msg("failed to locate node") + continue + } + + msg := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, writeEvent) + if _, err := publisher.Publish(ctx, data.TopicStreamWrite, msg); err != nil { + r.l.Error().Err(err).Str("node", nodeID).Msg("failed to publish message") + } + } + } + + return +} diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index 739d19a5..73f4642c 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -75,14 +75,19 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { p.log.Warn().Msg("failed to cast node spec") return } - var hasDataRole bool + var okRole bool for _, r := range node.Roles { - if r == databasev1.Role_ROLE_DATA { - hasDataRole = true + for _, allowed := range p.allowedRoles { + if r == allowed { + okRole = true + break + } + } + if okRole { break } } - if !hasDataRole { + if !okRole { return } diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go index 43a43736..59ad104d 100644 --- a/banyand/queue/pub/client_test.go +++ b/banyand/queue/pub/client_test.go @@ -29,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/pkg/bus" @@ -146,6 +147,23 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { verifyClientsWithGomega(g, p, data.TopicStreamWrite, 1, 0, 2, 1) }, flags.EventuallyTimeout).Should(gomega.Succeed()) }) + + ginkgo.It("should register and unregister liaison nodes", func() { + addr := getAddress() + closeFn := setup(addr, codes.OK, 200*time.Millisecond) + defer func() { + closeFn() + }() + + p := newPub(databasev1.Role_ROLE_LIAISON) + defer p.GracefulStop() + node := getDataNode("liaison-node", addr) + n := node.Spec.(*databasev1.Node) + n.Roles = []databasev1.Role{databasev1.Role_ROLE_LIAISON} + + p.OnAddOrUpdate(node) + verifyClients(p, 1, 0, 1, 0) + }) }) func verifyClients(p *pub, active, evict, onAdd, onDelete int) { diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index e4b89744..a89b6104 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "io" + "strings" "sync" "time" @@ -54,22 +55,30 @@ var ( type pub struct { schema.UnimplementedOnInitHandler - metadata metadata.Repo - handlers map[bus.Topic]schema.EventHandler - log *logger.Logger - registered map[string]*databasev1.Node - active map[string]*client - evictable map[string]evictNode - closer *run.Closer - caCertPath string - mu sync.RWMutex - tlsEnabled bool + metadata metadata.Repo + evictable map[string]evictNode + log *logger.Logger + registered map[string]*databasev1.Node + active map[string]*client + handlers map[bus.Topic]schema.EventHandler + closer *run.Closer + caCertPath string + prefix string + allowedRoles []databasev1.Role + mu sync.RWMutex + tlsEnabled bool } func (p *pub) FlagSet() *run.FlagSet { + prefixFlag := func(name string) string { + if p.prefix == "" { + return name + } + return p.prefix + "-" + name + } fs := run.NewFlagSet("queue-client") - fs.BoolVar(&p.tlsEnabled, "internal-tls", false, "enable internal TLS") - fs.StringVar(&p.caCertPath, "internal-ca-cert", "", "CA certificate file to verify the internal data server") + fs.BoolVar(&p.tlsEnabled, prefixFlag("client-tls"), false, fmt.Sprintf("enable client TLS for %s", p.prefix)) + fs.StringVar(&p.caCertPath, prefixFlag("client-ca-cert"), "", fmt.Sprintf("CA certificate file to verify the %s server", p.prefix)) return fs } @@ -243,34 +252,53 @@ func (p *pub) Publish(_ context.Context, topic bus.Topic, messages ...bus.Messag return p.publish(15*time.Second, topic, messages...) } -// New returns a new queue client. -func New(metadata metadata.Repo) queue.Client { - return &pub{ - metadata: metadata, - active: make(map[string]*client), - evictable: make(map[string]evictNode), - registered: make(map[string]*databasev1.Node), - handlers: make(map[bus.Topic]schema.EventHandler), - closer: run.NewCloser(1), +// New returns a new queue client targeting the given node roles. +// If no roles are passed, it defaults to databasev1.Role_ROLE_DATA. +func New(metadata metadata.Repo, roles ...databasev1.Role) queue.Client { + if len(roles) == 0 { + roles = []databasev1.Role{databasev1.Role_ROLE_DATA} + } + var strBuilder strings.Builder + for _, role := range roles { + switch role { + case databasev1.Role_ROLE_DATA: + strBuilder.WriteString("data") + case databasev1.Role_ROLE_LIAISON: + strBuilder.WriteString("liaison") + default: + logger.Panicf("unknown role %s", role) + } } + p := &pub{ + metadata: metadata, + active: make(map[string]*client), + evictable: make(map[string]evictNode), + registered: make(map[string]*databasev1.Node), + handlers: make(map[bus.Topic]schema.EventHandler), + closer: run.NewCloser(1), + allowedRoles: roles, + prefix: strBuilder.String(), + } + return p } -// NewWithoutMetadata returns a new queue client without metadata. +// NewWithoutMetadata returns a new queue client without metadata, defaulting to data nodes. func NewWithoutMetadata() queue.Client { - p := New(nil) + p := New(nil, databasev1.Role_ROLE_DATA) p.(*pub).log = logger.GetLogger("queue-client") return p } -func (*pub) Name() string { - return "queue-client" +func (p *pub) Name() string { + return "queue-client-" + p.prefix } func (p *pub) PreRun(context.Context) error { if p.metadata != nil { p.metadata.RegisterHandler("queue-client", schema.KindNode, p) } - p.log = logger.GetLogger("server-queue-pub") + + p.log = logger.GetLogger("server-queue-pub-" + p.prefix) return nil } diff --git a/banyand/queue/pub/pub_suite_test.go b/banyand/queue/pub/pub_suite_test.go index 1405bb34..bb5c0f7d 100644 --- a/banyand/queue/pub/pub_suite_test.go +++ b/banyand/queue/pub/pub_suite_test.go @@ -214,11 +214,12 @@ func (m *mockHandler) OnDelete(_ schema.Metadata) { m.deleteCount++ } -func newPub() *pub { - p := NewWithoutMetadata().(*pub) +func newPub(roles ...databasev1.Role) *pub { + p := New(nil, roles...) + p.(*pub).log = logger.GetLogger("queue-client") p.Register(data.TopicStreamWrite, &mockHandler{}) p.Register(data.TopicMeasureWrite, &mockHandler{}) - return p + return p.(*pub) } func getDataNode(name string, address string) schema.Metadata { diff --git a/banyand/queue/pub/pub_tls_test.go b/banyand/queue/pub/pub_tls_test.go index a68c070f..72dde4fa 100644 --- a/banyand/queue/pub/pub_tls_test.go +++ b/banyand/queue/pub/pub_tls_test.go @@ -76,7 +76,7 @@ func newTLSPub() *pub { return p } -var _ = ginkgo.Describe("Broadcast over one‑way TLS", func() { +var _ = ginkgo.FDescribe("Broadcast over one-way TLS", func() { var before []gleak.Goroutine ginkgo.BeforeEach(func() { @@ -87,7 +87,7 @@ var _ = ginkgo.Describe("Broadcast over one‑way TLS", func() { ShouldNot(gleak.HaveLeaked(before)) }) - ginkgo.It("establishes TLS and broadcasts a QueryRequest", func() { + ginkgo.FIt("establishes TLS and broadcasts a QueryRequest", func() { addr := getAddress() stop := tlsServer(addr) defer stop() diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go index d669aea0..6e246531 100644 --- a/banyand/queue/sub/server.go +++ b/banyand/queue/sub/server.go @@ -66,23 +66,24 @@ var ( ) type server struct { - databasev1.UnimplementedSnapshotServiceServer - streamv1.UnimplementedStreamServiceServer - creds credentials.TransportCredentials - omr observability.MetricsRegistry - httpSrv *http.Server - log *logger.Logger - ser *grpclib.Server - listeners map[bus.Topic][]bus.MessageListener - topicMap map[string]bus.Topic clusterv1.UnimplementedServiceServer + streamv1.UnimplementedStreamServiceServer + databasev1.UnimplementedSnapshotServiceServer + creds credentials.TransportCredentials + omr observability.MetricsRegistry metrics *metrics + ser *grpclib.Server + listeners map[bus.Topic][]bus.MessageListener + topicMap map[string]bus.Topic + log *logger.Logger + httpSrv *http.Server clientCloser context.CancelFunc - host string - addr string httpAddr string + addr string + host string certFile string keyFile string + flagNamePrefix string maxRecvMsgSize run.Bytes listenersLock sync.RWMutex port uint32 @@ -92,10 +93,19 @@ type server struct { // NewServer returns a new gRPC server. func NewServer(omr observability.MetricsRegistry) queue.Server { + return NewServerWithPorts(omr, "", 17912, 17913) +} + +// NewServerWithPorts returns a new gRPC server with specified ports. +func NewServerWithPorts(omr observability.MetricsRegistry, flagNamePrefix string, port, httpPort uint32) queue.Server { return &server{ - listeners: make(map[bus.Topic][]bus.MessageListener), - topicMap: make(map[string]bus.Topic), - omr: omr, + listeners: make(map[bus.Topic][]bus.MessageListener), + topicMap: make(map[string]bus.Topic), + omr: omr, + maxRecvMsgSize: defaultRecvSize, + flagNamePrefix: flagNamePrefix, + port: port, + httpPort: httpPort, } } @@ -110,7 +120,7 @@ func (s *server) Name() string { } func (s *server) Role() databasev1.Role { - return databasev1.Role_ROLE_DATA + return databasev1.Role_ROLE_UNSPECIFIED } func (s *server) GetPort() *uint32 { @@ -119,18 +129,29 @@ func (s *server) GetPort() *uint32 { func (s *server) FlagSet() *run.FlagSet { fs := run.NewFlagSet("grpc") - s.maxRecvMsgSize = defaultRecvSize - fs.VarP(&s.maxRecvMsgSize, "max-recv-msg-size", "", "the size of max receiving message") - fs.BoolVar(&s.tls, "tls", false, "connection uses TLS if true, else plain TCP") - fs.StringVar(&s.certFile, "cert-file", "", "the TLS cert file") - fs.StringVar(&s.keyFile, "key-file", "", "the TLS key file") - fs.StringVar(&s.host, "grpc-host", "", "the host of banyand listens") - fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens") - fs.Uint32Var(&s.httpPort, "http-port", 17913, "the port of banyand http api listens") + prefixFlag := func(name string) string { + if s.flagNamePrefix == "" { + return name + } + return s.flagNamePrefix + "-" + name + } + fs.VarP(&s.maxRecvMsgSize, prefixFlag("max-recv-msg-size"), "", "the size of max receiving message") + fs.BoolVar(&s.tls, prefixFlag("tls"), false, "connection uses TLS if true, else plain TCP") + fs.StringVar(&s.certFile, prefixFlag("cert-file"), "", "the TLS cert file") + fs.StringVar(&s.keyFile, prefixFlag("key-file"), "", "the TLS key file") + fs.StringVar(&s.host, prefixFlag("grpc-host"), "", "the host of banyand listens") + fs.Uint32Var(&s.port, prefixFlag("grpc-port"), s.port, "the port of banyand listens") + fs.Uint32Var(&s.httpPort, prefixFlag("http-port"), s.httpPort, "the port of banyand http api listens") return fs } func (s *server) Validate() error { + if s.port == 0 { + s.port = 17912 + } + if s.httpPort == 0 { + s.httpPort = 17913 + } s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 10)) if s.addr == ":" { return errNoAddr diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go index b4494eef..7f2efcb5 100644 --- a/pkg/cmdsetup/liaison.go +++ b/pkg/cmdsetup/liaison.go @@ -25,6 +25,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/dquery" "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/liaison/http" @@ -32,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/queue/pub" + "github.com/apache/skywalking-banyandb/banyand/queue/sub" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/node" "github.com/apache/skywalking-banyandb/pkg/run" @@ -45,35 +47,45 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate metadata service") } - pipeline := pub.New(metaSvc) + tire1Client := pub.New(metaSvc, databasev1.Role_ROLE_LIAISON) + tire2Client := pub.New(metaSvc, databasev1.Role_ROLE_DATA) localPipeline := queue.Local() - measureNodeSel := node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc) - measureNodeRegistry := grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, pipeline, measureNodeSel) - metricSvc := observability.NewMetricService(metaSvc, pipeline, "liaison", measureNodeRegistry) - streamNodeSel := node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc) + measureLiaisonNodeSel := node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc) + measureLiaisonNodeRegistry := grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, tire1Client, measureLiaisonNodeSel) + measureDataNodeSel := node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc) + metricSvc := observability.NewMetricService(metaSvc, tire1Client, "liaison", measureLiaisonNodeRegistry) + internalPipeline := sub.NewServerWithPorts(metricSvc, "liaison-server", 18912, 18913) + streamLiaisonNodeSel := node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc) + streamDataNodeSel := node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc) propertyNodeSel := node.NewRoundRobinSelector(data.TopicPropertyUpdate.String(), metaSvc) topNPipeline := queue.Local() - dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline, topNPipeline, metricSvc) + dQuery, err := dquery.NewService(metaSvc, localPipeline, tire2Client, topNPipeline, metricSvc) if err != nil { l.Fatal().Err(err).Msg("failed to initiate distributed query service") } - grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, topNPipeline, metaSvc, grpc.NodeRegistries{ - MeasureNodeRegistry: measureNodeRegistry, - StreamNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicStreamWrite, pipeline, streamNodeSel), - PropertyNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, pipeline, propertyNodeSel), - }, metricSvc, dQuery) + grpcServer := grpc.NewServer(ctx, tire1Client, tire2Client, localPipeline, topNPipeline, metaSvc, grpc.NodeRegistries{ + MeasureLiaisonNodeRegistry: measureLiaisonNodeRegistry, + MeasureDataNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, tire2Client, measureDataNodeSel), + StreamLiaisonNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicStreamWrite, tire1Client, streamLiaisonNodeSel), + StreamDataNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicStreamWrite, tire2Client, streamDataNodeSel), + PropertyNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, tire2Client, propertyNodeSel), + }, metricSvc, dQuery, internalPipeline) profSvc := observability.NewProfService() httpServer := http.NewServer() var units []run.Unit units = append(units, runners...) units = append(units, metaSvc, + metricSvc, localPipeline, - pipeline, - measureNodeSel, - streamNodeSel, + internalPipeline, + tire1Client, + tire2Client, + measureLiaisonNodeSel, + measureDataNodeSel, + streamLiaisonNodeSel, + streamDataNodeSel, propertyNodeSel, - metricSvc, dQuery, grpcServer, httpServer, @@ -95,11 +107,11 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { if err != nil { return err } - for _, sel := range []node.Selector{measureNodeSel, streamNodeSel, propertyNodeSel} { + for _, sel := range []node.Selector{measureDataNodeSel, streamDataNodeSel, propertyNodeSel} { sel.SetNodeSelector(ls) } } - node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort()) + node, err := common.GenerateNode(internalPipeline.GetPort(), httpServer.GetPort()) if err != nil { return err } diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go index b454515c..9562e706 100644 --- a/pkg/cmdsetup/standalone.go +++ b/pkg/cmdsetup/standalone.go @@ -43,45 +43,49 @@ import ( func newStandaloneCmd(runners ...run.Unit) *cobra.Command { l := logger.GetLogger("bootstrap") ctx := context.Background() - pipeline := queue.Local() + liaisonPipeline := queue.Local() + dataPipeline := queue.Local() metaSvc, err := embeddedserver.NewService(ctx) if err != nil { l.Fatal().Err(err).Msg("failed to initiate metadata service") } - metricSvc := observability.NewMetricService(metaSvc, pipeline, "standalone", nil) + metricSvc := observability.NewMetricService(metaSvc, liaisonPipeline, "standalone", nil) pm := protector.NewMemory(metricSvc) - propertySvc, err := property.NewService(metaSvc, pipeline, metricSvc, pm) + propertySvc, err := property.NewService(metaSvc, dataPipeline, metricSvc, pm) if err != nil { l.Fatal().Err(err).Msg("failed to initiate property service") } - streamSvc, err := stream.NewService(metaSvc, pipeline, metricSvc, pm) + streamSvc, err := stream.NewService(metaSvc, dataPipeline, metricSvc, pm) if err != nil { l.Fatal().Err(err).Msg("failed to initiate stream service") } var srvMetrics *grpcprom.ServerMetrics srvMetrics.UnaryServerInterceptor() srvMetrics.UnaryServerInterceptor() - measureSvc, err := measure.NewService(metaSvc, pipeline, nil, metricSvc, pm) + measureSvc, err := measure.NewService(metaSvc, dataPipeline, nil, metricSvc, pm) if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } - q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, pipeline) + q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, dataPipeline) if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } nr := grpc.NewLocalNodeRegistry() - grpcServer := grpc.NewServer(ctx, pipeline, pipeline, nil, metaSvc, grpc.NodeRegistries{ - MeasureNodeRegistry: nr, - StreamNodeRegistry: nr, - PropertyNodeRegistry: nr, - }, metricSvc, measureSvc) + grpcServer := grpc.NewServer(ctx, liaisonPipeline, dataPipeline, dataPipeline, nil, metaSvc, grpc.NodeRegistries{ + MeasureLiaisonNodeRegistry: nr, + MeasureDataNodeRegistry: nr, + StreamDataNodeRegistry: nr, + StreamLiaisonNodeRegistry: nr, + PropertyNodeRegistry: nr, + }, metricSvc, measureSvc, liaisonPipeline) profSvc := observability.NewProfService() httpServer := http.NewServer() var units []run.Unit units = append(units, runners...) units = append(units, - pipeline, + liaisonPipeline, + dataPipeline, metaSvc, metricSvc, pm, diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 6cd6b7ba..d61a40d3 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -272,7 +272,7 @@ func LiaisonNode(etcdEndpoint string, flags ...string) (grpcAddr string, closeFn // LiaisonNodeWithHTTP runs a liaison node with HTTP enabled and returns the gRPC and HTTP addresses. func LiaisonNodeWithHTTP(etcdEndpoint string, flags ...string) (grpcAddr, httpAddr string, closeFn func()) { - ports, err := test.AllocateFreePorts(2) + ports, err := test.AllocateFreePorts(3) gomega.Expect(err).NotTo(gomega.HaveOccurred()) grpcAddr = fmt.Sprintf("%s:%d", host, ports[0]) httpAddr = fmt.Sprintf("%s:%d", host, ports[1]) @@ -282,6 +282,8 @@ func LiaisonNodeWithHTTP(etcdEndpoint string, flags ...string) (grpcAddr, httpAd fmt.Sprintf("--grpc-port=%d", ports[0]), "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), + "--liaison-server-grpc-host="+host, + fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]), "--http-grpc-addr="+grpcAddr, "--etcd-endpoints", etcdEndpoint, "--node-host-provider", "flag", @@ -290,7 +292,7 @@ func LiaisonNodeWithHTTP(etcdEndpoint string, flags ...string) (grpcAddr, httpAd closeFn = CMD(flags...) gomega.Eventually(helpers.HTTPHealthCheck(httpAddr, ""), testflags.EventuallyTimeout).Should(gomega.Succeed()) gomega.Eventually(func() (map[string]*databasev1.Node, error) { - return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0])) + return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[2])) }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1)) return } diff --git a/test/integration/distributed/setup/node_test.go b/test/integration/distributed/setup/node_test.go index b120d6bb..cb7d4977 100644 --- a/test/integration/distributed/setup/node_test.go +++ b/test/integration/distributed/setup/node_test.go @@ -38,8 +38,8 @@ const host = "127.0.0.1" var _ = Describe("Node registration", func() { It("should register/unregister a liaison node successfully", func() { namespace := "liaison-test" - nodeHost := "liaison-1" - ports, err := test.AllocateFreePorts(2) + nodeHost := "localhost" + ports, err := test.AllocateFreePorts(3) Expect(err).NotTo(HaveOccurred()) addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) @@ -50,16 +50,18 @@ var _ = Describe("Node registration", func() { "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), "--http-grpc-addr="+addr, + "--liaison-server-grpc-host="+host, + "--liaison-server-grpc-port="+fmt.Sprintf("%d", ports[2]), "--etcd-endpoints", etcdEndpoint, "--node-host-provider", "flag", "--node-host", nodeHost) Eventually(helpers.HTTPHealthCheck(httpAddr, ""), flags.EventuallyTimeout).Should(Succeed()) Eventually(func() (map[string]*databasev1.Node, error) { - return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0])) + return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2])) }, flags.EventuallyTimeout).Should(HaveLen(1)) closeFn() Eventually(func() (map[string]*databasev1.Node, error) { - return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0])) + return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2])) }, flags.EventuallyTimeout).Should(BeNil()) }) It("should register/unregister a data node successfully", func() { diff --git a/test/integration/etcd/client_test.go b/test/integration/etcd/client_test.go index ae22e81d..d4357c36 100644 --- a/test/integration/etcd/client_test.go +++ b/test/integration/etcd/client_test.go @@ -48,7 +48,7 @@ const host = "127.0.0.1" const namespace = "liaison-test" -const nodeHost = "liaison-1" +const nodeHost = "127.0.0.1" var _ = Describe("Client Test", func() { var ( @@ -108,7 +108,7 @@ var _ = Describe("Client Test", func() { adminClient.UserGrantRole(context.Background(), username, "root") adminClient.AuthEnable(context.Background()) - ports, err := test.AllocateFreePorts(2) + ports, err := test.AllocateFreePorts(3) Expect(err).NotTo(HaveOccurred()) addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) @@ -119,6 +119,8 @@ var _ = Describe("Client Test", func() { "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), "--http-grpc-addr="+addr, + "--liaison-server-grpc-host="+host, + fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]), "--node-host-provider", "flag", "--node-host", nodeHost, "--etcd-endpoints", etcdEndpoint, @@ -128,7 +130,7 @@ var _ = Describe("Client Test", func() { Eventually(helpers.HTTPHealthCheck(httpAddr, ""), flags.EventuallyTimeout).Should(Succeed()) Eventually(func() (map[string]*databasev1.Node, error) { - return listKeys(etcdEndpoint, username, password, clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0])) + return listKeys(etcdEndpoint, username, password, clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2])) }, flags.EventuallyTimeout).Should(HaveLen(1)) }) @@ -150,7 +152,7 @@ var _ = Describe("Client Test", func() { etcdEndpoint := etcdServer.Config().ListenClientUrls[0].String() defer etcdServer.Close() - ports, err := test.AllocateFreePorts(2) + ports, err := test.AllocateFreePorts(3) Expect(err).NotTo(HaveOccurred()) addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) @@ -161,6 +163,8 @@ var _ = Describe("Client Test", func() { "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), "--http-grpc-addr="+addr, + "--liaison-server-grpc-host="+host, + fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]), "--node-host-provider", "flag", "--node-host", nodeHost, "--etcd-endpoints", etcdEndpoint, @@ -169,7 +173,7 @@ var _ = Describe("Client Test", func() { Eventually(helpers.HTTPHealthCheck(httpAddr, ""), flags.EventuallyTimeout).Should(Succeed()) Eventually(func() (map[string]*databasev1.Node, error) { - return listKeys(etcdEndpoint, "", "", clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0])) + return listKeys(etcdEndpoint, "", "", clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2])) }, flags.EventuallyTimeout).Should(HaveLen(1)) }) @@ -195,7 +199,7 @@ var _ = Describe("Client Test", func() { etcdEndpoint := etcdServer.Config().ListenClientUrls[0].String() defer etcdServer.Close() - ports, err := test.AllocateFreePorts(2) + ports, err := test.AllocateFreePorts(3) Expect(err).NotTo(HaveOccurred()) addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) @@ -206,6 +210,8 @@ var _ = Describe("Client Test", func() { "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), "--http-grpc-addr="+addr, + "--liaison-server-grpc-host="+host, + fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]), "--node-host-provider", "flag", "--node-host", nodeHost, "--etcd-endpoints", etcdEndpoint, @@ -216,7 +222,7 @@ var _ = Describe("Client Test", func() { Eventually(helpers.HTTPHealthCheck(httpAddr, ""), flags.EventuallyTimeout).Should(Succeed()) Eventually(func() (map[string]*databasev1.Node, error) { - return listKeys(etcdEndpoint, "", "", clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0])) + return listKeys(etcdEndpoint, "", "", clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2])) }, flags.EventuallyTimeout).Should(HaveLen(1)) }) }) diff --git a/test/integration/standalone/other/disk_test.go b/test/integration/standalone/other/disk_test.go index d2741447..2252fd0a 100644 --- a/test/integration/standalone/other/disk_test.go +++ b/test/integration/standalone/other/disk_test.go @@ -58,7 +58,7 @@ var _ = g.Describe("Disk", func() { }) g.It(" is a standalone server, blocking writing, with disk full", func() { addr, _, deferFn := setup.Standalone( - "--measure-max-disk-usage-percent", + "--liaison-measure-max-disk-usage-percent", "0", ) defer deferFn() @@ -99,13 +99,13 @@ var _ = g.Describe("Disk", func() { ctx := context.Background() test_measure.PreloadSchema(ctx, schemaRegistry) g.By("Starting data node 0") - closeDataNode0 := setup.DataNode(ep, - "--measure-max-disk-usage-percent", - "0") + closeDataNode0 := setup.DataNode(ep) g.By("Starting data node 1") closeDataNode1 := setup.DataNode(ep) g.By("Starting liaison node") - liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep) + liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep, + "--liaison-measure-max-disk-usage-percent", + "0") defer func() { closerLiaisonNode() closeDataNode0() @@ -128,11 +128,10 @@ var _ = g.Describe("Disk", func() { successNum++ } else { errNum++ - gm.Expect(resp.GetStatus()).To(gm.Equal(modelv1.Status_name[int32(modelv1.Status_STATUS_DISK_FULL)])) } } gm.Expect(errNum).To(gm.BeNumerically(">", 0)) - gm.Expect(successNum).To(gm.BeNumerically(">", 0)) + gm.Expect(successNum).To(gm.BeEquivalentTo(0)) }) })