This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch liaison-lb in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 556635b76a49edd3d6514f597a1428e2dc2970a4 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Jun 18 08:56:35 2025 +0800 Add Load Balancer Feature to Liaison Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 1 + banyand/liaison/grpc/discovery.go | 10 +- banyand/liaison/grpc/measure.go | 118 ++++++++++++++++++++---- banyand/liaison/grpc/registry_test.go | 10 +- banyand/liaison/grpc/server.go | 100 +++++++++++++------- banyand/liaison/grpc/stream.go | 78 ++++++++++++++++ banyand/queue/pub/client.go | 13 ++- banyand/queue/pub/client_test.go | 18 ++++ banyand/queue/pub/pub.go | 80 ++++++++++------ banyand/queue/pub/pub_suite_test.go | 7 +- banyand/queue/pub/pub_tls_test.go | 4 +- banyand/queue/sub/server.go | 58 +++++++----- pkg/cmdsetup/liaison.go | 46 +++++---- pkg/cmdsetup/standalone.go | 28 +++--- pkg/test/setup/setup.go | 6 +- test/integration/distributed/setup/node_test.go | 10 +- test/integration/etcd/client_test.go | 20 ++-- test/integration/standalone/other/disk_test.go | 13 ++- 18 files changed, 455 insertions(+), 165 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e819a022..3a25d826 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ Release Notes. - Replica: Support configurable replica count on Group. - Replica: Move the TopN pre-calculation flow from the Data Node to the Liaison Node. - Add a wait and retry to write handlers to avoid the local metadata cache being loaded. +- Add Load Balancer Feature to Liaison. ### Bug Fixes diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index 2d031b59..eed1807b 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -46,9 +46,12 @@ type discoveryService struct { kind schema.Kind } -func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry) *discoveryService { - gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)} - er := &entityRepo{entitiesMap: make(map[identity]partition.Locator), measureMap: make(map[identity]*databasev1.Measure)} +func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry, gr *groupRepo) *discoveryService { + er := &entityRepo{entitiesMap: make(map[identity]partition.Locator)} + return newDiscoveryServiceWithEntityRepo(kind, metadataRepo, nodeRegistry, gr, er) +} + +func newDiscoveryServiceWithEntityRepo(kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry, gr *groupRepo, er *entityRepo) *discoveryService { sr := &shardingKeyRepo{shardingKeysMap: make(map[identity]partition.Locator)} return &discoveryService{ groupRepo: gr, @@ -61,7 +64,6 @@ func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, nodeRegis } func (ds *discoveryService) initialize() error { - ds.metadataRepo.RegisterHandler("liaison", schema.KindGroup, ds.groupRepo) ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.entityRepo) if ds.kind == schema.KindMeasure { ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.shardingKeyRepo) diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 52a7b913..a4e84fa2 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" @@ -47,7 +48,6 @@ type measureService struct { ingestionAccessLog accesslog.Log pipeline queue.Client broadcaster queue.Client - topNService measure.TopNService *discoveryService l *logger.Logger metrics *metrics @@ -193,22 +193,6 @@ func (ms *measureService) processAndPublishRequest(ctx context.Context, writeReq messageID: writeRequest.GetMessageId(), nodes: nodes, }) - stm, ok := ms.entityRepo.loadMeasure(writeRequest.GetMetadata()) - if !ok { - ms.l.Error().RawJSON("written", logger.Proto(writeRequest)).Msg("failed to load measure schema") - ms.sendReply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure) - return errors.New("failed to load measure schema") - } - series := &pbv1.Series{ - Subject: writeRequest.Metadata.Name, - EntityValues: iwr.EntityValues, - } - if err := series.Marshal(); err != nil { - ms.l.Error().Err(err).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to marshal series") - ms.sendReply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure) - return err - } - ms.topNService.InFlow(stm, uint64(series.ID), iwr.ShardId, iwr.EntityValues, iwr.Request.DataPoint) return nil } @@ -393,3 +377,103 @@ type succeedSentMessage struct { nodes []string messageID uint64 } + +type measureRedirectWriteCallback struct { + pipeline queue.Client + nodeRegistry NodeRegistry + topNService measure.TopNService + l *logger.Logger + groupRepo *groupRepo + entityRepo *entityRepo + writeTimeout time.Duration + maxDiskUsagePercent int +} + +func (r *measureRedirectWriteCallback) CheckHealth() *common.Error { + if r.maxDiskUsagePercent < 1 { + return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "measure is readonly because \"measure-max-disk-usage-percent\" is 0") + } + return nil +} + +func (r *measureRedirectWriteCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { + events, ok := message.Data().([]any) + if !ok { + r.l.Warn().Msg("invalid event data type") + return + } + if len(events) < 1 { + r.l.Warn().Msg("empty event") + return + } + + publisher := r.pipeline.NewBatchPublisher(r.writeTimeout) + defer func() { + _, err := publisher.Close() + if err != nil { + r.l.Error().Err(err).Msg("failed to close publisher") + } + }() + + for i := range events { + var writeEvent *measurev1.InternalWriteRequest + switch e := events[i].(type) { + case *measurev1.InternalWriteRequest: + writeEvent = e + case []byte: + writeEvent = &measurev1.InternalWriteRequest{} + if err := proto.Unmarshal(e, writeEvent); err != nil { + r.l.Error().Err(err).RawJSON("written", e).Msg("fail to unmarshal event") + continue + } + default: + r.l.Warn().Msg("invalid event data type") + continue + } + + metadata := writeEvent.Request.GetMetadata() + if metadata == nil { + r.l.Warn().Msg("metadata is nil in InternalWriteRequest") + continue + } + + group := metadata.GetGroup() + measureName := metadata.GetName() + shardID := writeEvent.GetShardId() + + copies, ok := r.groupRepo.copies(group) + if !ok { + r.l.Error().Str("group", group).Msg("failed to get group copies") + continue + } + + for copyIdx := range copies { + nodeID, err := r.nodeRegistry.Locate(group, measureName, shardID, copyIdx) + if err != nil { + r.l.Error().Err(err).Str("group", group).Str("measure", measureName).Uint32("shard", shardID).Uint32("copy", copyIdx).Msg("failed to locate node") + continue + } + + msg := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, writeEvent) + if _, err := publisher.Publish(ctx, data.TopicMeasureWrite, msg); err != nil { + r.l.Error().Err(err).Str("node", nodeID).Msg("failed to publish message") + } + } + stm, ok := r.entityRepo.loadMeasure(metadata) + if !ok { + r.l.Error().RawJSON("written", logger.Proto(writeEvent)).Msg("failed to load measure schema") + continue + } + series := &pbv1.Series{ + Subject: metadata.Name, + EntityValues: writeEvent.EntityValues, + } + if err := series.Marshal(); err != nil { + r.l.Error().Err(err).RawJSON("written", logger.Proto(writeEvent)).Msg("failed to marshal series") + continue + } + r.topNService.InFlow(stm, uint64(series.ID), writeEvent.ShardId, writeEvent.EntityValues, writeEvent.Request.DataPoint) + } + + return +} diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go index c54295b3..3b3f58a6 100644 --- a/banyand/liaison/grpc/registry_test.go +++ b/banyand/liaison/grpc/registry_test.go @@ -182,11 +182,11 @@ func setupForRegistry() func() { metricSvc := observability.NewMetricService(metaSvc, pipeline, "standalone", nil) nr := grpc.NewLocalNodeRegistry() - tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, nil, metaSvc, grpc.NodeRegistries{ - MeasureNodeRegistry: nr, - StreamNodeRegistry: nr, - PropertyNodeRegistry: nr, - }, metricSvc, nil) + tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, pipeline, nil, metaSvc, grpc.NodeRegistries{ + MeasureLiaisonNodeRegistry: nr, + StreamDataNodeRegistry: nr, + PropertyNodeRegistry: nr, + }, metricSvc, nil, pipeline) preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc} var flags []string metaPath, metaDeferFunc, err := test.NewSpace() diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 00c384c4..bd7fc591 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -47,6 +47,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/partition" "github.com/apache/skywalking-banyandb/pkg/run" pkgtls "github.com/apache/skywalking-banyandb/pkg/tls" ) @@ -71,36 +72,43 @@ type Server interface { // NodeRegistries contains the node registries. type NodeRegistries struct { - StreamNodeRegistry NodeRegistry - MeasureNodeRegistry NodeRegistry - PropertyNodeRegistry NodeRegistry + StreamLiaisonNodeRegistry NodeRegistry + StreamDataNodeRegistry NodeRegistry + MeasureLiaisonNodeRegistry NodeRegistry + MeasureDataNodeRegistry NodeRegistry + PropertyNodeRegistry NodeRegistry } type server struct { databasev1.UnimplementedSnapshotServiceServer - topNPipeline queue.Server - omr observability.MetricsRegistry - *indexRuleBindingRegistryServer - metrics *metrics - log *logger.Logger - *propertyServer + topNPipeline queue.Server + omr observability.MetricsRegistry + tire2Server queue.Server + schemaRepo metadata.Repo + measureCallback *measureRedirectWriteCallback + topNHandler *topNHandler *topNAggregationRegistryServer *groupRegistryServer stopCh chan struct{} *indexRuleRegistryServer *measureRegistryServer - streamSVC *streamService + streamSVC *streamService + streamCallback *streamRedirectWriteCallback *streamRegistryServer measureSVC *measureService + log *logger.Logger *propertyRegistryServer - ser *grpclib.Server - tlsReloader *pkgtls.Reloader - topNHandler *topNHandler - accessLogRootPath string - addr string - host string + ser *grpclib.Server + tlsReloader *pkgtls.Reloader + *propertyServer + *indexRuleBindingRegistryServer + groupRepo *groupRepo + metrics *metrics certFile string keyFile string + host string + addr string + accessLogRootPath string accessLogRecorders []accessLogRecorder maxRecvMsgSize run.Bytes port uint32 @@ -109,25 +117,41 @@ type server struct { } // NewServer returns a new gRPC server. -func NewServer(_ context.Context, pipeline, broadcaster queue.Client, topNPipeline queue.Server, +func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster queue.Client, topNPipeline queue.Server, schemaRegistry metadata.Repo, nr NodeRegistries, omr observability.MetricsRegistry, topNService measure.TopNService, + tire2Server queue.Server, ) Server { + gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)} + er := &entityRepo{entitiesMap: make(map[identity]partition.Locator), measureMap: make(map[identity]*databasev1.Measure)} streamSVC := &streamService{ - discoveryService: newDiscoveryService(schema.KindStream, schemaRegistry, nr.StreamNodeRegistry), - pipeline: pipeline, + discoveryService: newDiscoveryService(schema.KindStream, schemaRegistry, nr.StreamLiaisonNodeRegistry, gr), + pipeline: tir1Client, broadcaster: broadcaster, } measureSVC := &measureService{ - discoveryService: newDiscoveryService(schema.KindMeasure, schemaRegistry, nr.MeasureNodeRegistry), - pipeline: pipeline, + discoveryService: newDiscoveryServiceWithEntityRepo(schema.KindMeasure, schemaRegistry, nr.MeasureLiaisonNodeRegistry, gr, er), + pipeline: tir1Client, broadcaster: broadcaster, - topNService: topNService, } s := &server{ - omr: omr, - streamSVC: streamSVC, - measureSVC: measureSVC, + omr: omr, + streamSVC: streamSVC, + measureSVC: measureSVC, + groupRepo: gr, + tire2Server: tire2Server, + streamCallback: &streamRedirectWriteCallback{ + pipeline: tir2Client, + groupRepo: gr, + nodeRegistry: nr.StreamDataNodeRegistry, + }, + measureCallback: &measureRedirectWriteCallback{ + pipeline: tir2Client, + groupRepo: gr, + entityRepo: er, + nodeRegistry: nr.MeasureDataNodeRegistry, + topNService: topNService, + }, streamRegistryServer: &streamRegistryServer{ schemaRegistry: schemaRegistry, }, @@ -148,32 +172,43 @@ func NewServer(_ context.Context, pipeline, broadcaster queue.Client, topNPipeli }, propertyServer: &propertyServer{ schemaRegistry: schemaRegistry, - pipeline: pipeline, + pipeline: tir2Client, nodeRegistry: nr.PropertyNodeRegistry, }, propertyRegistryServer: &propertyRegistryServer{ schemaRegistry: schemaRegistry, }, topNPipeline: topNPipeline, + schemaRepo: schemaRegistry, } s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC} + return s } func (s *server) PreRun(_ context.Context) error { s.log = logger.GetLogger("liaison-grpc") - s.streamSVC.setLogger(s.log) + s.streamSVC.setLogger(s.log.Named("stream-t1")) + s.streamCallback.l = s.log.Named("stream-t2") s.measureSVC.setLogger(s.log) + s.measureCallback.l = s.log.Named("measure-t2") components := []*discoveryService{ s.streamSVC.discoveryService, s.measureSVC.discoveryService, } + s.schemaRepo.RegisterHandler("liaison", schema.KindGroup, s.groupRepo) for _, c := range components { c.SetLogger(s.log) if err := c.initialize(); err != nil { return err } } + if err := s.tire2Server.Subscribe(data.TopicStreamWrite, s.streamCallback); err != nil { + return err + } + if err := s.tire2Server.Subscribe(data.TopicMeasureWrite, s.measureCallback); err != nil { + return err + } if s.enableIngestionAccessLog { for _, alr := range s.accessLogRecorders { @@ -206,8 +241,8 @@ func (s *server) PreRun(_ context.Context) error { if s.topNPipeline != nil { topNHandler := &topNHandler{ - nodeRegistry: s.measureSVC.nodeRegistry, - pipeline: s.pipeline, + nodeRegistry: s.measureCallback.nodeRegistry, + pipeline: s.measureCallback.pipeline, l: s.log.Named("topNHandler"), } if err := s.topNPipeline.Subscribe(data.TopicMeasureWrite, topNHandler); err != nil { @@ -242,12 +277,15 @@ func (s *server) FlagSet() *run.FlagSet { fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens") fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", false, "enable ingestion access log") fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access log root path") - fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 15*time.Second, "stream write timeout") - fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 15*time.Second, "measure write timeout") + fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 15*time.Second, "timeout for writing stream among liaison nodes") + fs.DurationVar(&s.streamCallback.writeTimeout, "stream-write-data-timeout", 15*time.Second, "timeout for writing stream data to the data nodes") + fs.DurationVar(&s.measureCallback.writeTimeout, "measure-write-data-timeout", 15*time.Second, "timeout for writing measure data to the data nodes") + fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 15*time.Second, "timeout for writing measure among liaison nodes") fs.DurationVar(&s.measureSVC.maxWaitDuration, "measure-metadata-cache-wait-duration", 0, "the maximum duration to wait for metadata cache to load (for testing purposes)") fs.DurationVar(&s.streamSVC.maxWaitDuration, "stream-metadata-cache-wait-duration", 0, "the maximum duration to wait for metadata cache to load (for testing purposes)") + fs.IntVar(&s.measureCallback.maxDiskUsagePercent, "liaison-measure-max-disk-usage-percent", 95, "the maximum disk usage percentage allowed") return fs } diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 8dc9ff5f..cf9baada 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" @@ -315,3 +316,80 @@ func (s *streamService) Close() error { } return nil } + +type streamRedirectWriteCallback struct { + *bus.UnImplementedHealthyListener + l *logger.Logger + pipeline queue.Client + groupRepo *groupRepo + nodeRegistry NodeRegistry + writeTimeout time.Duration +} + +func (r *streamRedirectWriteCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { + events, ok := message.Data().([]any) + if !ok { + r.l.Warn().Msg("invalid event data type") + return + } + if len(events) < 1 { + r.l.Warn().Msg("empty event") + return + } + + publisher := r.pipeline.NewBatchPublisher(r.writeTimeout) + defer func() { + _, err := publisher.Close() + if err != nil { + r.l.Error().Err(err).Msg("failed to close publisher") + } + }() + + for i := range events { + var writeEvent *streamv1.InternalWriteRequest + switch e := events[i].(type) { + case *streamv1.InternalWriteRequest: + writeEvent = e + case []byte: + writeEvent = &streamv1.InternalWriteRequest{} + if err := proto.Unmarshal(e, writeEvent); err != nil { + r.l.Error().Err(err).RawJSON("written", e).Msg("fail to unmarshal event") + continue + } + default: + r.l.Warn().Msg("invalid event data type") + continue + } + + metadata := writeEvent.Request.GetMetadata() + if metadata == nil { + r.l.Warn().Msg("metadata is nil in InternalWriteRequest") + continue + } + + group := metadata.GetGroup() + streamName := metadata.GetName() + shardID := writeEvent.GetShardId() + + copies, ok := r.groupRepo.copies(group) + if !ok { + r.l.Error().Str("group", group).Msg("failed to get group copies") + continue + } + + for copyIdx := range copies { + nodeID, err := r.nodeRegistry.Locate(group, streamName, shardID, copyIdx) + if err != nil { + r.l.Error().Err(err).Str("group", group).Str("stream", streamName).Uint32("shard", shardID).Uint32("copy", copyIdx).Msg("failed to locate node") + continue + } + + msg := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, writeEvent) + if _, err := publisher.Publish(ctx, data.TopicStreamWrite, msg); err != nil { + r.l.Error().Err(err).Str("node", nodeID).Msg("failed to publish message") + } + } + } + + return +} diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index 739d19a5..73f4642c 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -75,14 +75,19 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { p.log.Warn().Msg("failed to cast node spec") return } - var hasDataRole bool + var okRole bool for _, r := range node.Roles { - if r == databasev1.Role_ROLE_DATA { - hasDataRole = true + for _, allowed := range p.allowedRoles { + if r == allowed { + okRole = true + break + } + } + if okRole { break } } - if !hasDataRole { + if !okRole { return } diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go index 43a43736..59ad104d 100644 --- a/banyand/queue/pub/client_test.go +++ b/banyand/queue/pub/client_test.go @@ -29,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/pkg/bus" @@ -146,6 +147,23 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { verifyClientsWithGomega(g, p, data.TopicStreamWrite, 1, 0, 2, 1) }, flags.EventuallyTimeout).Should(gomega.Succeed()) }) + + ginkgo.It("should register and unregister liaison nodes", func() { + addr := getAddress() + closeFn := setup(addr, codes.OK, 200*time.Millisecond) + defer func() { + closeFn() + }() + + p := newPub(databasev1.Role_ROLE_LIAISON) + defer p.GracefulStop() + node := getDataNode("liaison-node", addr) + n := node.Spec.(*databasev1.Node) + n.Roles = []databasev1.Role{databasev1.Role_ROLE_LIAISON} + + p.OnAddOrUpdate(node) + verifyClients(p, 1, 0, 1, 0) + }) }) func verifyClients(p *pub, active, evict, onAdd, onDelete int) { diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index e4b89744..a89b6104 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "io" + "strings" "sync" "time" @@ -54,22 +55,30 @@ var ( type pub struct { schema.UnimplementedOnInitHandler - metadata metadata.Repo - handlers map[bus.Topic]schema.EventHandler - log *logger.Logger - registered map[string]*databasev1.Node - active map[string]*client - evictable map[string]evictNode - closer *run.Closer - caCertPath string - mu sync.RWMutex - tlsEnabled bool + metadata metadata.Repo + evictable map[string]evictNode + log *logger.Logger + registered map[string]*databasev1.Node + active map[string]*client + handlers map[bus.Topic]schema.EventHandler + closer *run.Closer + caCertPath string + prefix string + allowedRoles []databasev1.Role + mu sync.RWMutex + tlsEnabled bool } func (p *pub) FlagSet() *run.FlagSet { + prefixFlag := func(name string) string { + if p.prefix == "" { + return name + } + return p.prefix + "-" + name + } fs := run.NewFlagSet("queue-client") - fs.BoolVar(&p.tlsEnabled, "internal-tls", false, "enable internal TLS") - fs.StringVar(&p.caCertPath, "internal-ca-cert", "", "CA certificate file to verify the internal data server") + fs.BoolVar(&p.tlsEnabled, prefixFlag("client-tls"), false, fmt.Sprintf("enable client TLS for %s", p.prefix)) + fs.StringVar(&p.caCertPath, prefixFlag("client-ca-cert"), "", fmt.Sprintf("CA certificate file to verify the %s server", p.prefix)) return fs } @@ -243,34 +252,53 @@ func (p *pub) Publish(_ context.Context, topic bus.Topic, messages ...bus.Messag return p.publish(15*time.Second, topic, messages...) } -// New returns a new queue client. -func New(metadata metadata.Repo) queue.Client { - return &pub{ - metadata: metadata, - active: make(map[string]*client), - evictable: make(map[string]evictNode), - registered: make(map[string]*databasev1.Node), - handlers: make(map[bus.Topic]schema.EventHandler), - closer: run.NewCloser(1), +// New returns a new queue client targeting the given node roles. +// If no roles are passed, it defaults to databasev1.Role_ROLE_DATA. +func New(metadata metadata.Repo, roles ...databasev1.Role) queue.Client { + if len(roles) == 0 { + roles = []databasev1.Role{databasev1.Role_ROLE_DATA} + } + var strBuilder strings.Builder + for _, role := range roles { + switch role { + case databasev1.Role_ROLE_DATA: + strBuilder.WriteString("data") + case databasev1.Role_ROLE_LIAISON: + strBuilder.WriteString("liaison") + default: + logger.Panicf("unknown role %s", role) + } } + p := &pub{ + metadata: metadata, + active: make(map[string]*client), + evictable: make(map[string]evictNode), + registered: make(map[string]*databasev1.Node), + handlers: make(map[bus.Topic]schema.EventHandler), + closer: run.NewCloser(1), + allowedRoles: roles, + prefix: strBuilder.String(), + } + return p } -// NewWithoutMetadata returns a new queue client without metadata. +// NewWithoutMetadata returns a new queue client without metadata, defaulting to data nodes. func NewWithoutMetadata() queue.Client { - p := New(nil) + p := New(nil, databasev1.Role_ROLE_DATA) p.(*pub).log = logger.GetLogger("queue-client") return p } -func (*pub) Name() string { - return "queue-client" +func (p *pub) Name() string { + return "queue-client-" + p.prefix } func (p *pub) PreRun(context.Context) error { if p.metadata != nil { p.metadata.RegisterHandler("queue-client", schema.KindNode, p) } - p.log = logger.GetLogger("server-queue-pub") + + p.log = logger.GetLogger("server-queue-pub-" + p.prefix) return nil } diff --git a/banyand/queue/pub/pub_suite_test.go b/banyand/queue/pub/pub_suite_test.go index 1405bb34..bb5c0f7d 100644 --- a/banyand/queue/pub/pub_suite_test.go +++ b/banyand/queue/pub/pub_suite_test.go @@ -214,11 +214,12 @@ func (m *mockHandler) OnDelete(_ schema.Metadata) { m.deleteCount++ } -func newPub() *pub { - p := NewWithoutMetadata().(*pub) +func newPub(roles ...databasev1.Role) *pub { + p := New(nil, roles...) + p.(*pub).log = logger.GetLogger("queue-client") p.Register(data.TopicStreamWrite, &mockHandler{}) p.Register(data.TopicMeasureWrite, &mockHandler{}) - return p + return p.(*pub) } func getDataNode(name string, address string) schema.Metadata { diff --git a/banyand/queue/pub/pub_tls_test.go b/banyand/queue/pub/pub_tls_test.go index a68c070f..72dde4fa 100644 --- a/banyand/queue/pub/pub_tls_test.go +++ b/banyand/queue/pub/pub_tls_test.go @@ -76,7 +76,7 @@ func newTLSPub() *pub { return p } -var _ = ginkgo.Describe("Broadcast over one‑way TLS", func() { +var _ = ginkgo.FDescribe("Broadcast over one-way TLS", func() { var before []gleak.Goroutine ginkgo.BeforeEach(func() { @@ -87,7 +87,7 @@ var _ = ginkgo.Describe("Broadcast over one‑way TLS", func() { ShouldNot(gleak.HaveLeaked(before)) }) - ginkgo.It("establishes TLS and broadcasts a QueryRequest", func() { + ginkgo.FIt("establishes TLS and broadcasts a QueryRequest", func() { addr := getAddress() stop := tlsServer(addr) defer stop() diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go index d669aea0..4ec97b2c 100644 --- a/banyand/queue/sub/server.go +++ b/banyand/queue/sub/server.go @@ -23,6 +23,7 @@ import ( "net/http" "runtime/debug" "strconv" + "strings" "sync" "time" @@ -66,23 +67,24 @@ var ( ) type server struct { - databasev1.UnimplementedSnapshotServiceServer - streamv1.UnimplementedStreamServiceServer - creds credentials.TransportCredentials - omr observability.MetricsRegistry - httpSrv *http.Server - log *logger.Logger - ser *grpclib.Server - listeners map[bus.Topic][]bus.MessageListener - topicMap map[string]bus.Topic clusterv1.UnimplementedServiceServer + streamv1.UnimplementedStreamServiceServer + databasev1.UnimplementedSnapshotServiceServer + creds credentials.TransportCredentials + omr observability.MetricsRegistry metrics *metrics + ser *grpclib.Server + listeners map[bus.Topic][]bus.MessageListener + topicMap map[string]bus.Topic + log *logger.Logger + httpSrv *http.Server clientCloser context.CancelFunc - host string - addr string httpAddr string + addr string + host string certFile string keyFile string + flagNamePrefix []string maxRecvMsgSize run.Bytes listenersLock sync.RWMutex port uint32 @@ -91,11 +93,13 @@ type server struct { } // NewServer returns a new gRPC server. -func NewServer(omr observability.MetricsRegistry) queue.Server { +func NewServer(omr observability.MetricsRegistry, flagNamePrefix ...string) queue.Server { return &server{ - listeners: make(map[bus.Topic][]bus.MessageListener), - topicMap: make(map[string]bus.Topic), - omr: omr, + listeners: make(map[bus.Topic][]bus.MessageListener), + topicMap: make(map[string]bus.Topic), + omr: omr, + maxRecvMsgSize: defaultRecvSize, + flagNamePrefix: flagNamePrefix, } } @@ -110,7 +114,7 @@ func (s *server) Name() string { } func (s *server) Role() databasev1.Role { - return databasev1.Role_ROLE_DATA + return databasev1.Role_ROLE_UNSPECIFIED } func (s *server) GetPort() *uint32 { @@ -119,14 +123,20 @@ func (s *server) GetPort() *uint32 { func (s *server) FlagSet() *run.FlagSet { fs := run.NewFlagSet("grpc") - s.maxRecvMsgSize = defaultRecvSize - fs.VarP(&s.maxRecvMsgSize, "max-recv-msg-size", "", "the size of max receiving message") - fs.BoolVar(&s.tls, "tls", false, "connection uses TLS if true, else plain TCP") - fs.StringVar(&s.certFile, "cert-file", "", "the TLS cert file") - fs.StringVar(&s.keyFile, "key-file", "", "the TLS key file") - fs.StringVar(&s.host, "grpc-host", "", "the host of banyand listens") - fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens") - fs.Uint32Var(&s.httpPort, "http-port", 17913, "the port of banyand http api listens") + prefix := strings.Join(s.flagNamePrefix, "-") + prefixFlag := func(name string) string { + if prefix == "" { + return name + } + return prefix + "-" + name + } + fs.VarP(&s.maxRecvMsgSize, prefixFlag("max-recv-msg-size"), "", "the size of max receiving message") + fs.BoolVar(&s.tls, prefixFlag("tls"), false, "connection uses TLS if true, else plain TCP") + fs.StringVar(&s.certFile, prefixFlag("cert-file"), "", "the TLS cert file") + fs.StringVar(&s.keyFile, prefixFlag("key-file"), "", "the TLS key file") + fs.StringVar(&s.host, prefixFlag("grpc-host"), "", "the host of banyand listens") + fs.Uint32Var(&s.port, prefixFlag("grpc-port"), 18912, "the port of banyand listens") + fs.Uint32Var(&s.httpPort, prefixFlag("http-port"), 18913, "the port of banyand http api listens") return fs } diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go index b4494eef..5e7beebe 100644 --- a/pkg/cmdsetup/liaison.go +++ b/pkg/cmdsetup/liaison.go @@ -25,6 +25,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/dquery" "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/liaison/http" @@ -32,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/queue/pub" + "github.com/apache/skywalking-banyandb/banyand/queue/sub" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/node" "github.com/apache/skywalking-banyandb/pkg/run" @@ -45,35 +47,45 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate metadata service") } - pipeline := pub.New(metaSvc) + tire1Client := pub.New(metaSvc, databasev1.Role_ROLE_LIAISON) + tire2Client := pub.New(metaSvc, databasev1.Role_ROLE_DATA) localPipeline := queue.Local() - measureNodeSel := node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc) - measureNodeRegistry := grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, pipeline, measureNodeSel) - metricSvc := observability.NewMetricService(metaSvc, pipeline, "liaison", measureNodeRegistry) - streamNodeSel := node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc) + measureLiaisonNodeSel := node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc) + measureLiaisonNodeRegistry := grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, tire1Client, measureLiaisonNodeSel) + measureDataNodeSel := node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc) + metricSvc := observability.NewMetricService(metaSvc, tire1Client, "liaison", measureLiaisonNodeRegistry) + internalPipeline := sub.NewServer(metricSvc, "liaison-server") + streamLiaisonNodeSel := node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc) + streamDataNodeSel := node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc) propertyNodeSel := node.NewRoundRobinSelector(data.TopicPropertyUpdate.String(), metaSvc) topNPipeline := queue.Local() - dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline, topNPipeline, metricSvc) + dQuery, err := dquery.NewService(metaSvc, localPipeline, tire2Client, topNPipeline, metricSvc) if err != nil { l.Fatal().Err(err).Msg("failed to initiate distributed query service") } - grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, topNPipeline, metaSvc, grpc.NodeRegistries{ - MeasureNodeRegistry: measureNodeRegistry, - StreamNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicStreamWrite, pipeline, streamNodeSel), - PropertyNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, pipeline, propertyNodeSel), - }, metricSvc, dQuery) + grpcServer := grpc.NewServer(ctx, tire1Client, tire2Client, localPipeline, topNPipeline, metaSvc, grpc.NodeRegistries{ + MeasureLiaisonNodeRegistry: measureLiaisonNodeRegistry, + MeasureDataNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, tire2Client, measureDataNodeSel), + StreamLiaisonNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicStreamWrite, tire1Client, streamLiaisonNodeSel), + StreamDataNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicStreamWrite, tire2Client, streamDataNodeSel), + PropertyNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, tire2Client, propertyNodeSel), + }, metricSvc, dQuery, internalPipeline) profSvc := observability.NewProfService() httpServer := http.NewServer() var units []run.Unit units = append(units, runners...) units = append(units, metaSvc, + metricSvc, localPipeline, - pipeline, - measureNodeSel, - streamNodeSel, + internalPipeline, + tire1Client, + tire2Client, + measureLiaisonNodeSel, + measureDataNodeSel, + streamLiaisonNodeSel, + streamDataNodeSel, propertyNodeSel, - metricSvc, dQuery, grpcServer, httpServer, @@ -95,11 +107,11 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { if err != nil { return err } - for _, sel := range []node.Selector{measureNodeSel, streamNodeSel, propertyNodeSel} { + for _, sel := range []node.Selector{measureDataNodeSel, streamDataNodeSel, propertyNodeSel} { sel.SetNodeSelector(ls) } } - node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort()) + node, err := common.GenerateNode(internalPipeline.GetPort(), httpServer.GetPort()) if err != nil { return err } diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go index b454515c..9562e706 100644 --- a/pkg/cmdsetup/standalone.go +++ b/pkg/cmdsetup/standalone.go @@ -43,45 +43,49 @@ import ( func newStandaloneCmd(runners ...run.Unit) *cobra.Command { l := logger.GetLogger("bootstrap") ctx := context.Background() - pipeline := queue.Local() + liaisonPipeline := queue.Local() + dataPipeline := queue.Local() metaSvc, err := embeddedserver.NewService(ctx) if err != nil { l.Fatal().Err(err).Msg("failed to initiate metadata service") } - metricSvc := observability.NewMetricService(metaSvc, pipeline, "standalone", nil) + metricSvc := observability.NewMetricService(metaSvc, liaisonPipeline, "standalone", nil) pm := protector.NewMemory(metricSvc) - propertySvc, err := property.NewService(metaSvc, pipeline, metricSvc, pm) + propertySvc, err := property.NewService(metaSvc, dataPipeline, metricSvc, pm) if err != nil { l.Fatal().Err(err).Msg("failed to initiate property service") } - streamSvc, err := stream.NewService(metaSvc, pipeline, metricSvc, pm) + streamSvc, err := stream.NewService(metaSvc, dataPipeline, metricSvc, pm) if err != nil { l.Fatal().Err(err).Msg("failed to initiate stream service") } var srvMetrics *grpcprom.ServerMetrics srvMetrics.UnaryServerInterceptor() srvMetrics.UnaryServerInterceptor() - measureSvc, err := measure.NewService(metaSvc, pipeline, nil, metricSvc, pm) + measureSvc, err := measure.NewService(metaSvc, dataPipeline, nil, metricSvc, pm) if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } - q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, pipeline) + q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, dataPipeline) if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } nr := grpc.NewLocalNodeRegistry() - grpcServer := grpc.NewServer(ctx, pipeline, pipeline, nil, metaSvc, grpc.NodeRegistries{ - MeasureNodeRegistry: nr, - StreamNodeRegistry: nr, - PropertyNodeRegistry: nr, - }, metricSvc, measureSvc) + grpcServer := grpc.NewServer(ctx, liaisonPipeline, dataPipeline, dataPipeline, nil, metaSvc, grpc.NodeRegistries{ + MeasureLiaisonNodeRegistry: nr, + MeasureDataNodeRegistry: nr, + StreamDataNodeRegistry: nr, + StreamLiaisonNodeRegistry: nr, + PropertyNodeRegistry: nr, + }, metricSvc, measureSvc, liaisonPipeline) profSvc := observability.NewProfService() httpServer := http.NewServer() var units []run.Unit units = append(units, runners...) units = append(units, - pipeline, + liaisonPipeline, + dataPipeline, metaSvc, metricSvc, pm, diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 53f87cad..3cfc128f 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -259,7 +259,7 @@ func DataNodeWithAddrAndDir(etcdEndpoint string, flags ...string) (string, strin // LiaisonNode runs a liaison node. func LiaisonNode(etcdEndpoint string, flags ...string) (string, func()) { - ports, err := test.AllocateFreePorts(2) + ports, err := test.AllocateFreePorts(3) gomega.Expect(err).NotTo(gomega.HaveOccurred()) addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) @@ -269,6 +269,8 @@ func LiaisonNode(etcdEndpoint string, flags ...string) (string, func()) { fmt.Sprintf("--grpc-port=%d", ports[0]), "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), + "--liaison-server-grpc-host="+host, + fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]), "--http-grpc-addr="+addr, "--etcd-endpoints", etcdEndpoint, "--node-host-provider", "flag", @@ -277,7 +279,7 @@ func LiaisonNode(etcdEndpoint string, flags ...string) (string, func()) { closeFn := CMD(flags...) gomega.Eventually(helpers.HTTPHealthCheck(httpAddr, ""), testflags.EventuallyTimeout).Should(gomega.Succeed()) gomega.Eventually(func() (map[string]*databasev1.Node, error) { - return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0])) + return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[2])) }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1)) return addr, closeFn } diff --git a/test/integration/distributed/setup/node_test.go b/test/integration/distributed/setup/node_test.go index b120d6bb..cb7d4977 100644 --- a/test/integration/distributed/setup/node_test.go +++ b/test/integration/distributed/setup/node_test.go @@ -38,8 +38,8 @@ const host = "127.0.0.1" var _ = Describe("Node registration", func() { It("should register/unregister a liaison node successfully", func() { namespace := "liaison-test" - nodeHost := "liaison-1" - ports, err := test.AllocateFreePorts(2) + nodeHost := "localhost" + ports, err := test.AllocateFreePorts(3) Expect(err).NotTo(HaveOccurred()) addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) @@ -50,16 +50,18 @@ var _ = Describe("Node registration", func() { "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), "--http-grpc-addr="+addr, + "--liaison-server-grpc-host="+host, + "--liaison-server-grpc-port="+fmt.Sprintf("%d", ports[2]), "--etcd-endpoints", etcdEndpoint, "--node-host-provider", "flag", "--node-host", nodeHost) Eventually(helpers.HTTPHealthCheck(httpAddr, ""), flags.EventuallyTimeout).Should(Succeed()) Eventually(func() (map[string]*databasev1.Node, error) { - return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0])) + return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2])) }, flags.EventuallyTimeout).Should(HaveLen(1)) closeFn() Eventually(func() (map[string]*databasev1.Node, error) { - return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0])) + return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2])) }, flags.EventuallyTimeout).Should(BeNil()) }) It("should register/unregister a data node successfully", func() { diff --git a/test/integration/etcd/client_test.go b/test/integration/etcd/client_test.go index ae22e81d..d4357c36 100644 --- a/test/integration/etcd/client_test.go +++ b/test/integration/etcd/client_test.go @@ -48,7 +48,7 @@ const host = "127.0.0.1" const namespace = "liaison-test" -const nodeHost = "liaison-1" +const nodeHost = "127.0.0.1" var _ = Describe("Client Test", func() { var ( @@ -108,7 +108,7 @@ var _ = Describe("Client Test", func() { adminClient.UserGrantRole(context.Background(), username, "root") adminClient.AuthEnable(context.Background()) - ports, err := test.AllocateFreePorts(2) + ports, err := test.AllocateFreePorts(3) Expect(err).NotTo(HaveOccurred()) addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) @@ -119,6 +119,8 @@ var _ = Describe("Client Test", func() { "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), "--http-grpc-addr="+addr, + "--liaison-server-grpc-host="+host, + fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]), "--node-host-provider", "flag", "--node-host", nodeHost, "--etcd-endpoints", etcdEndpoint, @@ -128,7 +130,7 @@ var _ = Describe("Client Test", func() { Eventually(helpers.HTTPHealthCheck(httpAddr, ""), flags.EventuallyTimeout).Should(Succeed()) Eventually(func() (map[string]*databasev1.Node, error) { - return listKeys(etcdEndpoint, username, password, clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0])) + return listKeys(etcdEndpoint, username, password, clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2])) }, flags.EventuallyTimeout).Should(HaveLen(1)) }) @@ -150,7 +152,7 @@ var _ = Describe("Client Test", func() { etcdEndpoint := etcdServer.Config().ListenClientUrls[0].String() defer etcdServer.Close() - ports, err := test.AllocateFreePorts(2) + ports, err := test.AllocateFreePorts(3) Expect(err).NotTo(HaveOccurred()) addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) @@ -161,6 +163,8 @@ var _ = Describe("Client Test", func() { "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), "--http-grpc-addr="+addr, + "--liaison-server-grpc-host="+host, + fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]), "--node-host-provider", "flag", "--node-host", nodeHost, "--etcd-endpoints", etcdEndpoint, @@ -169,7 +173,7 @@ var _ = Describe("Client Test", func() { Eventually(helpers.HTTPHealthCheck(httpAddr, ""), flags.EventuallyTimeout).Should(Succeed()) Eventually(func() (map[string]*databasev1.Node, error) { - return listKeys(etcdEndpoint, "", "", clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0])) + return listKeys(etcdEndpoint, "", "", clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2])) }, flags.EventuallyTimeout).Should(HaveLen(1)) }) @@ -195,7 +199,7 @@ var _ = Describe("Client Test", func() { etcdEndpoint := etcdServer.Config().ListenClientUrls[0].String() defer etcdServer.Close() - ports, err := test.AllocateFreePorts(2) + ports, err := test.AllocateFreePorts(3) Expect(err).NotTo(HaveOccurred()) addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) @@ -206,6 +210,8 @@ var _ = Describe("Client Test", func() { "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), "--http-grpc-addr="+addr, + "--liaison-server-grpc-host="+host, + fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]), "--node-host-provider", "flag", "--node-host", nodeHost, "--etcd-endpoints", etcdEndpoint, @@ -216,7 +222,7 @@ var _ = Describe("Client Test", func() { Eventually(helpers.HTTPHealthCheck(httpAddr, ""), flags.EventuallyTimeout).Should(Succeed()) Eventually(func() (map[string]*databasev1.Node, error) { - return listKeys(etcdEndpoint, "", "", clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0])) + return listKeys(etcdEndpoint, "", "", clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2])) }, flags.EventuallyTimeout).Should(HaveLen(1)) }) }) diff --git a/test/integration/standalone/other/disk_test.go b/test/integration/standalone/other/disk_test.go index d2741447..2252fd0a 100644 --- a/test/integration/standalone/other/disk_test.go +++ b/test/integration/standalone/other/disk_test.go @@ -58,7 +58,7 @@ var _ = g.Describe("Disk", func() { }) g.It(" is a standalone server, blocking writing, with disk full", func() { addr, _, deferFn := setup.Standalone( - "--measure-max-disk-usage-percent", + "--liaison-measure-max-disk-usage-percent", "0", ) defer deferFn() @@ -99,13 +99,13 @@ var _ = g.Describe("Disk", func() { ctx := context.Background() test_measure.PreloadSchema(ctx, schemaRegistry) g.By("Starting data node 0") - closeDataNode0 := setup.DataNode(ep, - "--measure-max-disk-usage-percent", - "0") + closeDataNode0 := setup.DataNode(ep) g.By("Starting data node 1") closeDataNode1 := setup.DataNode(ep) g.By("Starting liaison node") - liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep) + liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep, + "--liaison-measure-max-disk-usage-percent", + "0") defer func() { closerLiaisonNode() closeDataNode0() @@ -128,11 +128,10 @@ var _ = g.Describe("Disk", func() { successNum++ } else { errNum++ - gm.Expect(resp.GetStatus()).To(gm.Equal(modelv1.Status_name[int32(modelv1.Status_STATUS_DISK_FULL)])) } } gm.Expect(errNum).To(gm.BeNumerically(">", 0)) - gm.Expect(successNum).To(gm.BeNumerically(">", 0)) + gm.Expect(successNum).To(gm.BeEquivalentTo(0)) }) })