This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch liaison-lb
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 556635b76a49edd3d6514f597a1428e2dc2970a4
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Jun 18 08:56:35 2025 +0800

    Add Load Balancer Feature to Liaison
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                                      |   1 +
 banyand/liaison/grpc/discovery.go               |  10 +-
 banyand/liaison/grpc/measure.go                 | 118 ++++++++++++++++++++----
 banyand/liaison/grpc/registry_test.go           |  10 +-
 banyand/liaison/grpc/server.go                  | 100 +++++++++++++-------
 banyand/liaison/grpc/stream.go                  |  78 ++++++++++++++++
 banyand/queue/pub/client.go                     |  13 ++-
 banyand/queue/pub/client_test.go                |  18 ++++
 banyand/queue/pub/pub.go                        |  80 ++++++++++------
 banyand/queue/pub/pub_suite_test.go             |   7 +-
 banyand/queue/pub/pub_tls_test.go               |   4 +-
 banyand/queue/sub/server.go                     |  58 +++++++-----
 pkg/cmdsetup/liaison.go                         |  46 +++++----
 pkg/cmdsetup/standalone.go                      |  28 +++---
 pkg/test/setup/setup.go                         |   6 +-
 test/integration/distributed/setup/node_test.go |  10 +-
 test/integration/etcd/client_test.go            |  20 ++--
 test/integration/standalone/other/disk_test.go  |  13 ++-
 18 files changed, 455 insertions(+), 165 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index e819a022..3a25d826 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,7 @@ Release Notes.
 - Replica: Support configurable replica count on Group.
 - Replica: Move the TopN pre-calculation flow from the Data Node to the 
Liaison Node.
 - Add a wait and retry to write handlers to avoid the local metadata cache 
being loaded.
+- Add Load Balancer Feature to Liaison. 
 
 ### Bug Fixes
 
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index 2d031b59..eed1807b 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -46,9 +46,12 @@ type discoveryService struct {
        kind            schema.Kind
 }
 
-func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, 
nodeRegistry NodeRegistry) *discoveryService {
-       gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)}
-       er := &entityRepo{entitiesMap: make(map[identity]partition.Locator), 
measureMap: make(map[identity]*databasev1.Measure)}
+func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, 
nodeRegistry NodeRegistry, gr *groupRepo) *discoveryService {
+       er := &entityRepo{entitiesMap: make(map[identity]partition.Locator)}
+       return newDiscoveryServiceWithEntityRepo(kind, metadataRepo, 
nodeRegistry, gr, er)
+}
+
+func newDiscoveryServiceWithEntityRepo(kind schema.Kind, metadataRepo 
metadata.Repo, nodeRegistry NodeRegistry, gr *groupRepo, er *entityRepo) 
*discoveryService {
        sr := &shardingKeyRepo{shardingKeysMap: 
make(map[identity]partition.Locator)}
        return &discoveryService{
                groupRepo:       gr,
@@ -61,7 +64,6 @@ func newDiscoveryService(kind schema.Kind, metadataRepo 
metadata.Repo, nodeRegis
 }
 
 func (ds *discoveryService) initialize() error {
-       ds.metadataRepo.RegisterHandler("liaison", schema.KindGroup, 
ds.groupRepo)
        ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.entityRepo)
        if ds.kind == schema.KindMeasure {
                ds.metadataRepo.RegisterHandler("liaison", ds.kind, 
ds.shardingKeyRepo)
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 52a7b913..a4e84fa2 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -25,6 +25,7 @@ import (
        "github.com/pkg/errors"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/proto"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
@@ -47,7 +48,6 @@ type measureService struct {
        ingestionAccessLog accesslog.Log
        pipeline           queue.Client
        broadcaster        queue.Client
-       topNService        measure.TopNService
        *discoveryService
        l               *logger.Logger
        metrics         *metrics
@@ -193,22 +193,6 @@ func (ms *measureService) processAndPublishRequest(ctx 
context.Context, writeReq
                messageID: writeRequest.GetMessageId(),
                nodes:     nodes,
        })
-       stm, ok := ms.entityRepo.loadMeasure(writeRequest.GetMetadata())
-       if !ok {
-               ms.l.Error().RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to load measure schema")
-               ms.sendReply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
-               return errors.New("failed to load measure schema")
-       }
-       series := &pbv1.Series{
-               Subject:      writeRequest.Metadata.Name,
-               EntityValues: iwr.EntityValues,
-       }
-       if err := series.Marshal(); err != nil {
-               ms.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to marshal series")
-               ms.sendReply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
-               return err
-       }
-       ms.topNService.InFlow(stm, uint64(series.ID), iwr.ShardId, 
iwr.EntityValues, iwr.Request.DataPoint)
        return nil
 }
 
@@ -393,3 +377,103 @@ type succeedSentMessage struct {
        nodes     []string
        messageID uint64
 }
+
+type measureRedirectWriteCallback struct {
+       pipeline            queue.Client
+       nodeRegistry        NodeRegistry
+       topNService         measure.TopNService
+       l                   *logger.Logger
+       groupRepo           *groupRepo
+       entityRepo          *entityRepo
+       writeTimeout        time.Duration
+       maxDiskUsagePercent int
+}
+
+func (r *measureRedirectWriteCallback) CheckHealth() *common.Error {
+       if r.maxDiskUsagePercent < 1 {
+               return 
common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "measure is readonly 
because \"measure-max-disk-usage-percent\" is 0")
+       }
+       return nil
+}
+
+func (r *measureRedirectWriteCallback) Rev(ctx context.Context, message 
bus.Message) (resp bus.Message) {
+       events, ok := message.Data().([]any)
+       if !ok {
+               r.l.Warn().Msg("invalid event data type")
+               return
+       }
+       if len(events) < 1 {
+               r.l.Warn().Msg("empty event")
+               return
+       }
+
+       publisher := r.pipeline.NewBatchPublisher(r.writeTimeout)
+       defer func() {
+               _, err := publisher.Close()
+               if err != nil {
+                       r.l.Error().Err(err).Msg("failed to close publisher")
+               }
+       }()
+
+       for i := range events {
+               var writeEvent *measurev1.InternalWriteRequest
+               switch e := events[i].(type) {
+               case *measurev1.InternalWriteRequest:
+                       writeEvent = e
+               case []byte:
+                       writeEvent = &measurev1.InternalWriteRequest{}
+                       if err := proto.Unmarshal(e, writeEvent); err != nil {
+                               r.l.Error().Err(err).RawJSON("written", 
e).Msg("fail to unmarshal event")
+                               continue
+                       }
+               default:
+                       r.l.Warn().Msg("invalid event data type")
+                       continue
+               }
+
+               metadata := writeEvent.Request.GetMetadata()
+               if metadata == nil {
+                       r.l.Warn().Msg("metadata is nil in 
InternalWriteRequest")
+                       continue
+               }
+
+               group := metadata.GetGroup()
+               measureName := metadata.GetName()
+               shardID := writeEvent.GetShardId()
+
+               copies, ok := r.groupRepo.copies(group)
+               if !ok {
+                       r.l.Error().Str("group", group).Msg("failed to get 
group copies")
+                       continue
+               }
+
+               for copyIdx := range copies {
+                       nodeID, err := r.nodeRegistry.Locate(group, 
measureName, shardID, copyIdx)
+                       if err != nil {
+                               r.l.Error().Err(err).Str("group", 
group).Str("measure", measureName).Uint32("shard", shardID).Uint32("copy", 
copyIdx).Msg("failed to locate node")
+                               continue
+                       }
+
+                       msg := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, 
writeEvent)
+                       if _, err := publisher.Publish(ctx, 
data.TopicMeasureWrite, msg); err != nil {
+                               r.l.Error().Err(err).Str("node", 
nodeID).Msg("failed to publish message")
+                       }
+               }
+               stm, ok := r.entityRepo.loadMeasure(metadata)
+               if !ok {
+                       r.l.Error().RawJSON("written", 
logger.Proto(writeEvent)).Msg("failed to load measure schema")
+                       continue
+               }
+               series := &pbv1.Series{
+                       Subject:      metadata.Name,
+                       EntityValues: writeEvent.EntityValues,
+               }
+               if err := series.Marshal(); err != nil {
+                       r.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEvent)).Msg("failed to marshal series")
+                       continue
+               }
+               r.topNService.InFlow(stm, uint64(series.ID), 
writeEvent.ShardId, writeEvent.EntityValues, writeEvent.Request.DataPoint)
+       }
+
+       return
+}
diff --git a/banyand/liaison/grpc/registry_test.go 
b/banyand/liaison/grpc/registry_test.go
index c54295b3..3b3f58a6 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -182,11 +182,11 @@ func setupForRegistry() func() {
        metricSvc := observability.NewMetricService(metaSvc, pipeline, 
"standalone", nil)
 
        nr := grpc.NewLocalNodeRegistry()
-       tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, nil, metaSvc, 
grpc.NodeRegistries{
-               MeasureNodeRegistry:  nr,
-               StreamNodeRegistry:   nr,
-               PropertyNodeRegistry: nr,
-       }, metricSvc, nil)
+       tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, pipeline, 
nil, metaSvc, grpc.NodeRegistries{
+               MeasureLiaisonNodeRegistry: nr,
+               StreamDataNodeRegistry:     nr,
+               PropertyNodeRegistry:       nr,
+       }, metricSvc, nil, pipeline)
        preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
        var flags []string
        metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 00c384c4..bd7fc591 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -47,6 +47,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/partition"
        "github.com/apache/skywalking-banyandb/pkg/run"
        pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
 )
@@ -71,36 +72,43 @@ type Server interface {
 
 // NodeRegistries contains the node registries.
 type NodeRegistries struct {
-       StreamNodeRegistry   NodeRegistry
-       MeasureNodeRegistry  NodeRegistry
-       PropertyNodeRegistry NodeRegistry
+       StreamLiaisonNodeRegistry  NodeRegistry
+       StreamDataNodeRegistry     NodeRegistry
+       MeasureLiaisonNodeRegistry NodeRegistry
+       MeasureDataNodeRegistry    NodeRegistry
+       PropertyNodeRegistry       NodeRegistry
 }
 
 type server struct {
        databasev1.UnimplementedSnapshotServiceServer
-       topNPipeline queue.Server
-       omr          observability.MetricsRegistry
-       *indexRuleBindingRegistryServer
-       metrics *metrics
-       log     *logger.Logger
-       *propertyServer
+       topNPipeline    queue.Server
+       omr             observability.MetricsRegistry
+       tire2Server     queue.Server
+       schemaRepo      metadata.Repo
+       measureCallback *measureRedirectWriteCallback
+       topNHandler     *topNHandler
        *topNAggregationRegistryServer
        *groupRegistryServer
        stopCh chan struct{}
        *indexRuleRegistryServer
        *measureRegistryServer
-       streamSVC *streamService
+       streamSVC      *streamService
+       streamCallback *streamRedirectWriteCallback
        *streamRegistryServer
        measureSVC *measureService
+       log        *logger.Logger
        *propertyRegistryServer
-       ser                      *grpclib.Server
-       tlsReloader              *pkgtls.Reloader
-       topNHandler              *topNHandler
-       accessLogRootPath        string
-       addr                     string
-       host                     string
+       ser         *grpclib.Server
+       tlsReloader *pkgtls.Reloader
+       *propertyServer
+       *indexRuleBindingRegistryServer
+       groupRepo                *groupRepo
+       metrics                  *metrics
        certFile                 string
        keyFile                  string
+       host                     string
+       addr                     string
+       accessLogRootPath        string
        accessLogRecorders       []accessLogRecorder
        maxRecvMsgSize           run.Bytes
        port                     uint32
@@ -109,25 +117,41 @@ type server struct {
 }
 
 // NewServer returns a new gRPC server.
-func NewServer(_ context.Context, pipeline, broadcaster queue.Client, 
topNPipeline queue.Server,
+func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster 
queue.Client, topNPipeline queue.Server,
        schemaRegistry metadata.Repo, nr NodeRegistries, omr 
observability.MetricsRegistry, topNService measure.TopNService,
+       tire2Server queue.Server,
 ) Server {
+       gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)}
+       er := &entityRepo{entitiesMap: make(map[identity]partition.Locator), 
measureMap: make(map[identity]*databasev1.Measure)}
        streamSVC := &streamService{
-               discoveryService: newDiscoveryService(schema.KindStream, 
schemaRegistry, nr.StreamNodeRegistry),
-               pipeline:         pipeline,
+               discoveryService: newDiscoveryService(schema.KindStream, 
schemaRegistry, nr.StreamLiaisonNodeRegistry, gr),
+               pipeline:         tir1Client,
                broadcaster:      broadcaster,
        }
        measureSVC := &measureService{
-               discoveryService: newDiscoveryService(schema.KindMeasure, 
schemaRegistry, nr.MeasureNodeRegistry),
-               pipeline:         pipeline,
+               discoveryService: 
newDiscoveryServiceWithEntityRepo(schema.KindMeasure, schemaRegistry, 
nr.MeasureLiaisonNodeRegistry, gr, er),
+               pipeline:         tir1Client,
                broadcaster:      broadcaster,
-               topNService:      topNService,
        }
 
        s := &server{
-               omr:        omr,
-               streamSVC:  streamSVC,
-               measureSVC: measureSVC,
+               omr:         omr,
+               streamSVC:   streamSVC,
+               measureSVC:  measureSVC,
+               groupRepo:   gr,
+               tire2Server: tire2Server,
+               streamCallback: &streamRedirectWriteCallback{
+                       pipeline:     tir2Client,
+                       groupRepo:    gr,
+                       nodeRegistry: nr.StreamDataNodeRegistry,
+               },
+               measureCallback: &measureRedirectWriteCallback{
+                       pipeline:     tir2Client,
+                       groupRepo:    gr,
+                       entityRepo:   er,
+                       nodeRegistry: nr.MeasureDataNodeRegistry,
+                       topNService:  topNService,
+               },
                streamRegistryServer: &streamRegistryServer{
                        schemaRegistry: schemaRegistry,
                },
@@ -148,32 +172,43 @@ func NewServer(_ context.Context, pipeline, broadcaster 
queue.Client, topNPipeli
                },
                propertyServer: &propertyServer{
                        schemaRegistry: schemaRegistry,
-                       pipeline:       pipeline,
+                       pipeline:       tir2Client,
                        nodeRegistry:   nr.PropertyNodeRegistry,
                },
                propertyRegistryServer: &propertyRegistryServer{
                        schemaRegistry: schemaRegistry,
                },
                topNPipeline: topNPipeline,
+               schemaRepo:   schemaRegistry,
        }
        s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC}
+
        return s
 }
 
 func (s *server) PreRun(_ context.Context) error {
        s.log = logger.GetLogger("liaison-grpc")
-       s.streamSVC.setLogger(s.log)
+       s.streamSVC.setLogger(s.log.Named("stream-t1"))
+       s.streamCallback.l = s.log.Named("stream-t2")
        s.measureSVC.setLogger(s.log)
+       s.measureCallback.l = s.log.Named("measure-t2")
        components := []*discoveryService{
                s.streamSVC.discoveryService,
                s.measureSVC.discoveryService,
        }
+       s.schemaRepo.RegisterHandler("liaison", schema.KindGroup, s.groupRepo)
        for _, c := range components {
                c.SetLogger(s.log)
                if err := c.initialize(); err != nil {
                        return err
                }
        }
+       if err := s.tire2Server.Subscribe(data.TopicStreamWrite, 
s.streamCallback); err != nil {
+               return err
+       }
+       if err := s.tire2Server.Subscribe(data.TopicMeasureWrite, 
s.measureCallback); err != nil {
+               return err
+       }
 
        if s.enableIngestionAccessLog {
                for _, alr := range s.accessLogRecorders {
@@ -206,8 +241,8 @@ func (s *server) PreRun(_ context.Context) error {
 
        if s.topNPipeline != nil {
                topNHandler := &topNHandler{
-                       nodeRegistry: s.measureSVC.nodeRegistry,
-                       pipeline:     s.pipeline,
+                       nodeRegistry: s.measureCallback.nodeRegistry,
+                       pipeline:     s.measureCallback.pipeline,
                        l:            s.log.Named("topNHandler"),
                }
                if err := s.topNPipeline.Subscribe(data.TopicMeasureWrite, 
topNHandler); err != nil {
@@ -242,12 +277,15 @@ func (s *server) FlagSet() *run.FlagSet {
        fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens")
        fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", 
false, "enable ingestion access log")
        fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access 
log root path")
-       fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 
15*time.Second, "stream write timeout")
-       fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 
15*time.Second, "measure write timeout")
+       fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 
15*time.Second, "timeout for writing stream among liaison nodes")
+       fs.DurationVar(&s.streamCallback.writeTimeout, 
"stream-write-data-timeout", 15*time.Second, "timeout for writing stream data 
to the data nodes")
+       fs.DurationVar(&s.measureCallback.writeTimeout, 
"measure-write-data-timeout", 15*time.Second, "timeout for writing measure data 
to the data nodes")
+       fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 
15*time.Second, "timeout for writing measure among liaison nodes")
        fs.DurationVar(&s.measureSVC.maxWaitDuration, 
"measure-metadata-cache-wait-duration", 0,
                "the maximum duration to wait for metadata cache to load (for 
testing purposes)")
        fs.DurationVar(&s.streamSVC.maxWaitDuration, 
"stream-metadata-cache-wait-duration", 0,
                "the maximum duration to wait for metadata cache to load (for 
testing purposes)")
+       fs.IntVar(&s.measureCallback.maxDiskUsagePercent, 
"liaison-measure-max-disk-usage-percent", 95, "the maximum disk usage 
percentage allowed")
        return fs
 }
 
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 8dc9ff5f..cf9baada 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -25,6 +25,7 @@ import (
        "github.com/pkg/errors"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/proto"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
@@ -315,3 +316,80 @@ func (s *streamService) Close() error {
        }
        return nil
 }
+
+type streamRedirectWriteCallback struct {
+       *bus.UnImplementedHealthyListener
+       l            *logger.Logger
+       pipeline     queue.Client
+       groupRepo    *groupRepo
+       nodeRegistry NodeRegistry
+       writeTimeout time.Duration
+}
+
+func (r *streamRedirectWriteCallback) Rev(ctx context.Context, message 
bus.Message) (resp bus.Message) {
+       events, ok := message.Data().([]any)
+       if !ok {
+               r.l.Warn().Msg("invalid event data type")
+               return
+       }
+       if len(events) < 1 {
+               r.l.Warn().Msg("empty event")
+               return
+       }
+
+       publisher := r.pipeline.NewBatchPublisher(r.writeTimeout)
+       defer func() {
+               _, err := publisher.Close()
+               if err != nil {
+                       r.l.Error().Err(err).Msg("failed to close publisher")
+               }
+       }()
+
+       for i := range events {
+               var writeEvent *streamv1.InternalWriteRequest
+               switch e := events[i].(type) {
+               case *streamv1.InternalWriteRequest:
+                       writeEvent = e
+               case []byte:
+                       writeEvent = &streamv1.InternalWriteRequest{}
+                       if err := proto.Unmarshal(e, writeEvent); err != nil {
+                               r.l.Error().Err(err).RawJSON("written", 
e).Msg("fail to unmarshal event")
+                               continue
+                       }
+               default:
+                       r.l.Warn().Msg("invalid event data type")
+                       continue
+               }
+
+               metadata := writeEvent.Request.GetMetadata()
+               if metadata == nil {
+                       r.l.Warn().Msg("metadata is nil in 
InternalWriteRequest")
+                       continue
+               }
+
+               group := metadata.GetGroup()
+               streamName := metadata.GetName()
+               shardID := writeEvent.GetShardId()
+
+               copies, ok := r.groupRepo.copies(group)
+               if !ok {
+                       r.l.Error().Str("group", group).Msg("failed to get 
group copies")
+                       continue
+               }
+
+               for copyIdx := range copies {
+                       nodeID, err := r.nodeRegistry.Locate(group, streamName, 
shardID, copyIdx)
+                       if err != nil {
+                               r.l.Error().Err(err).Str("group", 
group).Str("stream", streamName).Uint32("shard", shardID).Uint32("copy", 
copyIdx).Msg("failed to locate node")
+                               continue
+                       }
+
+                       msg := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, 
writeEvent)
+                       if _, err := publisher.Publish(ctx, 
data.TopicStreamWrite, msg); err != nil {
+                               r.l.Error().Err(err).Str("node", 
nodeID).Msg("failed to publish message")
+                       }
+               }
+       }
+
+       return
+}
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 739d19a5..73f4642c 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -75,14 +75,19 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
                p.log.Warn().Msg("failed to cast node spec")
                return
        }
-       var hasDataRole bool
+       var okRole bool
        for _, r := range node.Roles {
-               if r == databasev1.Role_ROLE_DATA {
-                       hasDataRole = true
+               for _, allowed := range p.allowedRoles {
+                       if r == allowed {
+                               okRole = true
+                               break
+                       }
+               }
+               if okRole {
                        break
                }
        }
-       if !hasDataRole {
+       if !okRole {
                return
        }
 
diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go
index 43a43736..59ad104d 100644
--- a/banyand/queue/pub/client_test.go
+++ b/banyand/queue/pub/client_test.go
@@ -29,6 +29,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -146,6 +147,23 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                        verifyClientsWithGomega(g, p, data.TopicStreamWrite, 1, 
0, 2, 1)
                }, flags.EventuallyTimeout).Should(gomega.Succeed())
        })
+
+       ginkgo.It("should register and unregister liaison nodes", func() {
+               addr := getAddress()
+               closeFn := setup(addr, codes.OK, 200*time.Millisecond)
+               defer func() {
+                       closeFn()
+               }()
+
+               p := newPub(databasev1.Role_ROLE_LIAISON)
+               defer p.GracefulStop()
+               node := getDataNode("liaison-node", addr)
+               n := node.Spec.(*databasev1.Node)
+               n.Roles = []databasev1.Role{databasev1.Role_ROLE_LIAISON}
+
+               p.OnAddOrUpdate(node)
+               verifyClients(p, 1, 0, 1, 0)
+       })
 })
 
 func verifyClients(p *pub, active, evict, onAdd, onDelete int) {
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index e4b89744..a89b6104 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -22,6 +22,7 @@ import (
        "context"
        "fmt"
        "io"
+       "strings"
        "sync"
        "time"
 
@@ -54,22 +55,30 @@ var (
 
 type pub struct {
        schema.UnimplementedOnInitHandler
-       metadata   metadata.Repo
-       handlers   map[bus.Topic]schema.EventHandler
-       log        *logger.Logger
-       registered map[string]*databasev1.Node
-       active     map[string]*client
-       evictable  map[string]evictNode
-       closer     *run.Closer
-       caCertPath string
-       mu         sync.RWMutex
-       tlsEnabled bool
+       metadata     metadata.Repo
+       evictable    map[string]evictNode
+       log          *logger.Logger
+       registered   map[string]*databasev1.Node
+       active       map[string]*client
+       handlers     map[bus.Topic]schema.EventHandler
+       closer       *run.Closer
+       caCertPath   string
+       prefix       string
+       allowedRoles []databasev1.Role
+       mu           sync.RWMutex
+       tlsEnabled   bool
 }
 
 func (p *pub) FlagSet() *run.FlagSet {
+       prefixFlag := func(name string) string {
+               if p.prefix == "" {
+                       return name
+               }
+               return p.prefix + "-" + name
+       }
        fs := run.NewFlagSet("queue-client")
-       fs.BoolVar(&p.tlsEnabled, "internal-tls", false, "enable internal TLS")
-       fs.StringVar(&p.caCertPath, "internal-ca-cert", "", "CA certificate 
file to verify the internal data server")
+       fs.BoolVar(&p.tlsEnabled, prefixFlag("client-tls"), false, 
fmt.Sprintf("enable client TLS for %s", p.prefix))
+       fs.StringVar(&p.caCertPath, prefixFlag("client-ca-cert"), "", 
fmt.Sprintf("CA certificate file to verify the %s server", p.prefix))
        return fs
 }
 
@@ -243,34 +252,53 @@ func (p *pub) Publish(_ context.Context, topic bus.Topic, 
messages ...bus.Messag
        return p.publish(15*time.Second, topic, messages...)
 }
 
-// New returns a new queue client.
-func New(metadata metadata.Repo) queue.Client {
-       return &pub{
-               metadata:   metadata,
-               active:     make(map[string]*client),
-               evictable:  make(map[string]evictNode),
-               registered: make(map[string]*databasev1.Node),
-               handlers:   make(map[bus.Topic]schema.EventHandler),
-               closer:     run.NewCloser(1),
+// New returns a new queue client targeting the given node roles.
+// If no roles are passed, it defaults to databasev1.Role_ROLE_DATA.
+func New(metadata metadata.Repo, roles ...databasev1.Role) queue.Client {
+       if len(roles) == 0 {
+               roles = []databasev1.Role{databasev1.Role_ROLE_DATA}
+       }
+       var strBuilder strings.Builder
+       for _, role := range roles {
+               switch role {
+               case databasev1.Role_ROLE_DATA:
+                       strBuilder.WriteString("data")
+               case databasev1.Role_ROLE_LIAISON:
+                       strBuilder.WriteString("liaison")
+               default:
+                       logger.Panicf("unknown role %s", role)
+               }
        }
+       p := &pub{
+               metadata:     metadata,
+               active:       make(map[string]*client),
+               evictable:    make(map[string]evictNode),
+               registered:   make(map[string]*databasev1.Node),
+               handlers:     make(map[bus.Topic]schema.EventHandler),
+               closer:       run.NewCloser(1),
+               allowedRoles: roles,
+               prefix:       strBuilder.String(),
+       }
+       return p
 }
 
-// NewWithoutMetadata returns a new queue client without metadata.
+// NewWithoutMetadata returns a new queue client without metadata, defaulting 
to data nodes.
 func NewWithoutMetadata() queue.Client {
-       p := New(nil)
+       p := New(nil, databasev1.Role_ROLE_DATA)
        p.(*pub).log = logger.GetLogger("queue-client")
        return p
 }
 
-func (*pub) Name() string {
-       return "queue-client"
+func (p *pub) Name() string {
+       return "queue-client-" + p.prefix
 }
 
 func (p *pub) PreRun(context.Context) error {
        if p.metadata != nil {
                p.metadata.RegisterHandler("queue-client", schema.KindNode, p)
        }
-       p.log = logger.GetLogger("server-queue-pub")
+
+       p.log = logger.GetLogger("server-queue-pub-" + p.prefix)
        return nil
 }
 
diff --git a/banyand/queue/pub/pub_suite_test.go 
b/banyand/queue/pub/pub_suite_test.go
index 1405bb34..bb5c0f7d 100644
--- a/banyand/queue/pub/pub_suite_test.go
+++ b/banyand/queue/pub/pub_suite_test.go
@@ -214,11 +214,12 @@ func (m *mockHandler) OnDelete(_ schema.Metadata) {
        m.deleteCount++
 }
 
-func newPub() *pub {
-       p := NewWithoutMetadata().(*pub)
+func newPub(roles ...databasev1.Role) *pub {
+       p := New(nil, roles...)
+       p.(*pub).log = logger.GetLogger("queue-client")
        p.Register(data.TopicStreamWrite, &mockHandler{})
        p.Register(data.TopicMeasureWrite, &mockHandler{})
-       return p
+       return p.(*pub)
 }
 
 func getDataNode(name string, address string) schema.Metadata {
diff --git a/banyand/queue/pub/pub_tls_test.go 
b/banyand/queue/pub/pub_tls_test.go
index a68c070f..72dde4fa 100644
--- a/banyand/queue/pub/pub_tls_test.go
+++ b/banyand/queue/pub/pub_tls_test.go
@@ -76,7 +76,7 @@ func newTLSPub() *pub {
        return p
 }
 
-var _ = ginkgo.Describe("Broadcast over one‑way TLS", func() {
+var _ = ginkgo.FDescribe("Broadcast over one-way TLS", func() {
        var before []gleak.Goroutine
 
        ginkgo.BeforeEach(func() {
@@ -87,7 +87,7 @@ var _ = ginkgo.Describe("Broadcast over one‑way TLS", func() {
                        ShouldNot(gleak.HaveLeaked(before))
        })
 
-       ginkgo.It("establishes TLS and broadcasts a QueryRequest", func() {
+       ginkgo.FIt("establishes TLS and broadcasts a QueryRequest", func() {
                addr := getAddress()
                stop := tlsServer(addr)
                defer stop()
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index d669aea0..4ec97b2c 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -23,6 +23,7 @@ import (
        "net/http"
        "runtime/debug"
        "strconv"
+       "strings"
        "sync"
        "time"
 
@@ -66,23 +67,24 @@ var (
 )
 
 type server struct {
-       databasev1.UnimplementedSnapshotServiceServer
-       streamv1.UnimplementedStreamServiceServer
-       creds     credentials.TransportCredentials
-       omr       observability.MetricsRegistry
-       httpSrv   *http.Server
-       log       *logger.Logger
-       ser       *grpclib.Server
-       listeners map[bus.Topic][]bus.MessageListener
-       topicMap  map[string]bus.Topic
        clusterv1.UnimplementedServiceServer
+       streamv1.UnimplementedStreamServiceServer
+       databasev1.UnimplementedSnapshotServiceServer
+       creds          credentials.TransportCredentials
+       omr            observability.MetricsRegistry
        metrics        *metrics
+       ser            *grpclib.Server
+       listeners      map[bus.Topic][]bus.MessageListener
+       topicMap       map[string]bus.Topic
+       log            *logger.Logger
+       httpSrv        *http.Server
        clientCloser   context.CancelFunc
-       host           string
-       addr           string
        httpAddr       string
+       addr           string
+       host           string
        certFile       string
        keyFile        string
+       flagNamePrefix []string
        maxRecvMsgSize run.Bytes
        listenersLock  sync.RWMutex
        port           uint32
@@ -91,11 +93,13 @@ type server struct {
 }
 
 // NewServer returns a new gRPC server.
-func NewServer(omr observability.MetricsRegistry) queue.Server {
+func NewServer(omr observability.MetricsRegistry, flagNamePrefix ...string) 
queue.Server {
        return &server{
-               listeners: make(map[bus.Topic][]bus.MessageListener),
-               topicMap:  make(map[string]bus.Topic),
-               omr:       omr,
+               listeners:      make(map[bus.Topic][]bus.MessageListener),
+               topicMap:       make(map[string]bus.Topic),
+               omr:            omr,
+               maxRecvMsgSize: defaultRecvSize,
+               flagNamePrefix: flagNamePrefix,
        }
 }
 
@@ -110,7 +114,7 @@ func (s *server) Name() string {
 }
 
 func (s *server) Role() databasev1.Role {
-       return databasev1.Role_ROLE_DATA
+       return databasev1.Role_ROLE_UNSPECIFIED
 }
 
 func (s *server) GetPort() *uint32 {
@@ -119,14 +123,20 @@ func (s *server) GetPort() *uint32 {
 
 func (s *server) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("grpc")
-       s.maxRecvMsgSize = defaultRecvSize
-       fs.VarP(&s.maxRecvMsgSize, "max-recv-msg-size", "", "the size of max 
receiving message")
-       fs.BoolVar(&s.tls, "tls", false, "connection uses TLS if true, else 
plain TCP")
-       fs.StringVar(&s.certFile, "cert-file", "", "the TLS cert file")
-       fs.StringVar(&s.keyFile, "key-file", "", "the TLS key file")
-       fs.StringVar(&s.host, "grpc-host", "", "the host of banyand listens")
-       fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens")
-       fs.Uint32Var(&s.httpPort, "http-port", 17913, "the port of banyand http 
api listens")
+       prefix := strings.Join(s.flagNamePrefix, "-")
+       prefixFlag := func(name string) string {
+               if prefix == "" {
+                       return name
+               }
+               return prefix + "-" + name
+       }
+       fs.VarP(&s.maxRecvMsgSize, prefixFlag("max-recv-msg-size"), "", "the 
size of max receiving message")
+       fs.BoolVar(&s.tls, prefixFlag("tls"), false, "connection uses TLS if 
true, else plain TCP")
+       fs.StringVar(&s.certFile, prefixFlag("cert-file"), "", "the TLS cert 
file")
+       fs.StringVar(&s.keyFile, prefixFlag("key-file"), "", "the TLS key file")
+       fs.StringVar(&s.host, prefixFlag("grpc-host"), "", "the host of banyand 
listens")
+       fs.Uint32Var(&s.port, prefixFlag("grpc-port"), 18912, "the port of 
banyand listens")
+       fs.Uint32Var(&s.httpPort, prefixFlag("http-port"), 18913, "the port of 
banyand http api listens")
        return fs
 }
 
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index b4494eef..5e7beebe 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/dquery"
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
        "github.com/apache/skywalking-banyandb/banyand/liaison/http"
@@ -32,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/queue/pub"
+       "github.com/apache/skywalking-banyandb/banyand/queue/sub"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/node"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -45,35 +47,45 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
-       pipeline := pub.New(metaSvc)
+       tire1Client := pub.New(metaSvc, databasev1.Role_ROLE_LIAISON)
+       tire2Client := pub.New(metaSvc, databasev1.Role_ROLE_DATA)
        localPipeline := queue.Local()
-       measureNodeSel := 
node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc)
-       measureNodeRegistry := 
grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, pipeline, measureNodeSel)
-       metricSvc := observability.NewMetricService(metaSvc, pipeline, 
"liaison", measureNodeRegistry)
-       streamNodeSel := 
node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc)
+       measureLiaisonNodeSel := 
node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc)
+       measureLiaisonNodeRegistry := 
grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, tire1Client, 
measureLiaisonNodeSel)
+       measureDataNodeSel := 
node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc)
+       metricSvc := observability.NewMetricService(metaSvc, tire1Client, 
"liaison", measureLiaisonNodeRegistry)
+       internalPipeline := sub.NewServer(metricSvc, "liaison-server")
+       streamLiaisonNodeSel := 
node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc)
+       streamDataNodeSel := 
node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc)
        propertyNodeSel := 
node.NewRoundRobinSelector(data.TopicPropertyUpdate.String(), metaSvc)
        topNPipeline := queue.Local()
-       dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline, 
topNPipeline, metricSvc)
+       dQuery, err := dquery.NewService(metaSvc, localPipeline, tire2Client, 
topNPipeline, metricSvc)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate distributed query 
service")
        }
-       grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, 
topNPipeline, metaSvc, grpc.NodeRegistries{
-               MeasureNodeRegistry:  measureNodeRegistry,
-               StreamNodeRegistry:   
grpc.NewClusterNodeRegistry(data.TopicStreamWrite, pipeline, streamNodeSel),
-               PropertyNodeRegistry: 
grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, pipeline, 
propertyNodeSel),
-       }, metricSvc, dQuery)
+       grpcServer := grpc.NewServer(ctx, tire1Client, tire2Client, 
localPipeline, topNPipeline, metaSvc, grpc.NodeRegistries{
+               MeasureLiaisonNodeRegistry: measureLiaisonNodeRegistry,
+               MeasureDataNodeRegistry:    
grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, tire2Client, 
measureDataNodeSel),
+               StreamLiaisonNodeRegistry:  
grpc.NewClusterNodeRegistry(data.TopicStreamWrite, tire1Client, 
streamLiaisonNodeSel),
+               StreamDataNodeRegistry:     
grpc.NewClusterNodeRegistry(data.TopicStreamWrite, tire2Client, 
streamDataNodeSel),
+               PropertyNodeRegistry:       
grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, tire2Client, 
propertyNodeSel),
+       }, metricSvc, dQuery, internalPipeline)
        profSvc := observability.NewProfService()
        httpServer := http.NewServer()
        var units []run.Unit
        units = append(units, runners...)
        units = append(units,
                metaSvc,
+               metricSvc,
                localPipeline,
-               pipeline,
-               measureNodeSel,
-               streamNodeSel,
+               internalPipeline,
+               tire1Client,
+               tire2Client,
+               measureLiaisonNodeSel,
+               measureDataNodeSel,
+               streamLiaisonNodeSel,
+               streamDataNodeSel,
                propertyNodeSel,
-               metricSvc,
                dQuery,
                grpcServer,
                httpServer,
@@ -95,11 +107,11 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                                if err != nil {
                                        return err
                                }
-                               for _, sel := range 
[]node.Selector{measureNodeSel, streamNodeSel, propertyNodeSel} {
+                               for _, sel := range 
[]node.Selector{measureDataNodeSel, streamDataNodeSel, propertyNodeSel} {
                                        sel.SetNodeSelector(ls)
                                }
                        }
-                       node, err := common.GenerateNode(grpcServer.GetPort(), 
httpServer.GetPort())
+                       node, err := 
common.GenerateNode(internalPipeline.GetPort(), httpServer.GetPort())
                        if err != nil {
                                return err
                        }
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index b454515c..9562e706 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -43,45 +43,49 @@ import (
 func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
-       pipeline := queue.Local()
+       liaisonPipeline := queue.Local()
+       dataPipeline := queue.Local()
        metaSvc, err := embeddedserver.NewService(ctx)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
-       metricSvc := observability.NewMetricService(metaSvc, pipeline, 
"standalone", nil)
+       metricSvc := observability.NewMetricService(metaSvc, liaisonPipeline, 
"standalone", nil)
        pm := protector.NewMemory(metricSvc)
-       propertySvc, err := property.NewService(metaSvc, pipeline, metricSvc, 
pm)
+       propertySvc, err := property.NewService(metaSvc, dataPipeline, 
metricSvc, pm)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate property service")
        }
-       streamSvc, err := stream.NewService(metaSvc, pipeline, metricSvc, pm)
+       streamSvc, err := stream.NewService(metaSvc, dataPipeline, metricSvc, 
pm)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate stream service")
        }
        var srvMetrics *grpcprom.ServerMetrics
        srvMetrics.UnaryServerInterceptor()
        srvMetrics.UnaryServerInterceptor()
-       measureSvc, err := measure.NewService(metaSvc, pipeline, nil, 
metricSvc, pm)
+       measureSvc, err := measure.NewService(metaSvc, dataPipeline, nil, 
metricSvc, pm)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
-       q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, 
pipeline)
+       q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, 
dataPipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
        nr := grpc.NewLocalNodeRegistry()
-       grpcServer := grpc.NewServer(ctx, pipeline, pipeline, nil, metaSvc, 
grpc.NodeRegistries{
-               MeasureNodeRegistry:  nr,
-               StreamNodeRegistry:   nr,
-               PropertyNodeRegistry: nr,
-       }, metricSvc, measureSvc)
+       grpcServer := grpc.NewServer(ctx, liaisonPipeline, dataPipeline, 
dataPipeline, nil, metaSvc, grpc.NodeRegistries{
+               MeasureLiaisonNodeRegistry: nr,
+               MeasureDataNodeRegistry:    nr,
+               StreamDataNodeRegistry:     nr,
+               StreamLiaisonNodeRegistry:  nr,
+               PropertyNodeRegistry:       nr,
+       }, metricSvc, measureSvc, liaisonPipeline)
        profSvc := observability.NewProfService()
        httpServer := http.NewServer()
 
        var units []run.Unit
        units = append(units, runners...)
        units = append(units,
-               pipeline,
+               liaisonPipeline,
+               dataPipeline,
                metaSvc,
                metricSvc,
                pm,
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 53f87cad..3cfc128f 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -259,7 +259,7 @@ func DataNodeWithAddrAndDir(etcdEndpoint string, flags 
...string) (string, strin
 
 // LiaisonNode runs a liaison node.
 func LiaisonNode(etcdEndpoint string, flags ...string) (string, func()) {
-       ports, err := test.AllocateFreePorts(2)
+       ports, err := test.AllocateFreePorts(3)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        addr := fmt.Sprintf("%s:%d", host, ports[0])
        httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
@@ -269,6 +269,8 @@ func LiaisonNode(etcdEndpoint string, flags ...string) 
(string, func()) {
                fmt.Sprintf("--grpc-port=%d", ports[0]),
                "--http-host="+host,
                fmt.Sprintf("--http-port=%d", ports[1]),
+               "--liaison-server-grpc-host="+host,
+               fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]),
                "--http-grpc-addr="+addr,
                "--etcd-endpoints", etcdEndpoint,
                "--node-host-provider", "flag",
@@ -277,7 +279,7 @@ func LiaisonNode(etcdEndpoint string, flags ...string) 
(string, func()) {
        closeFn := CMD(flags...)
        gomega.Eventually(helpers.HTTPHealthCheck(httpAddr, ""), 
testflags.EventuallyTimeout).Should(gomega.Succeed())
        gomega.Eventually(func() (map[string]*databasev1.Node, error) {
-               return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0]))
+               return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[2]))
        }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1))
        return addr, closeFn
 }
diff --git a/test/integration/distributed/setup/node_test.go 
b/test/integration/distributed/setup/node_test.go
index b120d6bb..cb7d4977 100644
--- a/test/integration/distributed/setup/node_test.go
+++ b/test/integration/distributed/setup/node_test.go
@@ -38,8 +38,8 @@ const host = "127.0.0.1"
 var _ = Describe("Node registration", func() {
        It("should register/unregister a liaison node successfully", func() {
                namespace := "liaison-test"
-               nodeHost := "liaison-1"
-               ports, err := test.AllocateFreePorts(2)
+               nodeHost := "localhost"
+               ports, err := test.AllocateFreePorts(3)
                Expect(err).NotTo(HaveOccurred())
                addr := fmt.Sprintf("%s:%d", host, ports[0])
                httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
@@ -50,16 +50,18 @@ var _ = Describe("Node registration", func() {
                        "--http-host="+host,
                        fmt.Sprintf("--http-port=%d", ports[1]),
                        "--http-grpc-addr="+addr,
+                       "--liaison-server-grpc-host="+host,
+                       "--liaison-server-grpc-port="+fmt.Sprintf("%d", 
ports[2]),
                        "--etcd-endpoints", etcdEndpoint,
                        "--node-host-provider", "flag",
                        "--node-host", nodeHost)
                Eventually(helpers.HTTPHealthCheck(httpAddr, ""), 
flags.EventuallyTimeout).Should(Succeed())
                Eventually(func() (map[string]*databasev1.Node, error) {
-                       return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+                       return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2]))
                }, flags.EventuallyTimeout).Should(HaveLen(1))
                closeFn()
                Eventually(func() (map[string]*databasev1.Node, error) {
-                       return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+                       return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2]))
                }, flags.EventuallyTimeout).Should(BeNil())
        })
        It("should register/unregister a data node successfully", func() {
diff --git a/test/integration/etcd/client_test.go 
b/test/integration/etcd/client_test.go
index ae22e81d..d4357c36 100644
--- a/test/integration/etcd/client_test.go
+++ b/test/integration/etcd/client_test.go
@@ -48,7 +48,7 @@ const host = "127.0.0.1"
 
 const namespace = "liaison-test"
 
-const nodeHost = "liaison-1"
+const nodeHost = "127.0.0.1"
 
 var _ = Describe("Client Test", func() {
        var (
@@ -108,7 +108,7 @@ var _ = Describe("Client Test", func() {
                adminClient.UserGrantRole(context.Background(), username, 
"root")
                adminClient.AuthEnable(context.Background())
 
-               ports, err := test.AllocateFreePorts(2)
+               ports, err := test.AllocateFreePorts(3)
                Expect(err).NotTo(HaveOccurred())
                addr := fmt.Sprintf("%s:%d", host, ports[0])
                httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
@@ -119,6 +119,8 @@ var _ = Describe("Client Test", func() {
                        "--http-host="+host,
                        fmt.Sprintf("--http-port=%d", ports[1]),
                        "--http-grpc-addr="+addr,
+                       "--liaison-server-grpc-host="+host,
+                       fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]),
                        "--node-host-provider", "flag",
                        "--node-host", nodeHost,
                        "--etcd-endpoints", etcdEndpoint,
@@ -128,7 +130,7 @@ var _ = Describe("Client Test", func() {
 
                Eventually(helpers.HTTPHealthCheck(httpAddr, ""), 
flags.EventuallyTimeout).Should(Succeed())
                Eventually(func() (map[string]*databasev1.Node, error) {
-                       return listKeys(etcdEndpoint, username, password, 
clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+                       return listKeys(etcdEndpoint, username, password, 
clientConfig, fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2]))
                }, flags.EventuallyTimeout).Should(HaveLen(1))
        })
 
@@ -150,7 +152,7 @@ var _ = Describe("Client Test", func() {
                etcdEndpoint := etcdServer.Config().ListenClientUrls[0].String()
                defer etcdServer.Close()
 
-               ports, err := test.AllocateFreePorts(2)
+               ports, err := test.AllocateFreePorts(3)
                Expect(err).NotTo(HaveOccurred())
                addr := fmt.Sprintf("%s:%d", host, ports[0])
                httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
@@ -161,6 +163,8 @@ var _ = Describe("Client Test", func() {
                        "--http-host="+host,
                        fmt.Sprintf("--http-port=%d", ports[1]),
                        "--http-grpc-addr="+addr,
+                       "--liaison-server-grpc-host="+host,
+                       fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]),
                        "--node-host-provider", "flag",
                        "--node-host", nodeHost,
                        "--etcd-endpoints", etcdEndpoint,
@@ -169,7 +173,7 @@ var _ = Describe("Client Test", func() {
 
                Eventually(helpers.HTTPHealthCheck(httpAddr, ""), 
flags.EventuallyTimeout).Should(Succeed())
                Eventually(func() (map[string]*databasev1.Node, error) {
-                       return listKeys(etcdEndpoint, "", "", clientConfig, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+                       return listKeys(etcdEndpoint, "", "", clientConfig, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2]))
                }, flags.EventuallyTimeout).Should(HaveLen(1))
        })
 
@@ -195,7 +199,7 @@ var _ = Describe("Client Test", func() {
                etcdEndpoint := etcdServer.Config().ListenClientUrls[0].String()
                defer etcdServer.Close()
 
-               ports, err := test.AllocateFreePorts(2)
+               ports, err := test.AllocateFreePorts(3)
                Expect(err).NotTo(HaveOccurred())
                addr := fmt.Sprintf("%s:%d", host, ports[0])
                httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
@@ -206,6 +210,8 @@ var _ = Describe("Client Test", func() {
                        "--http-host="+host,
                        fmt.Sprintf("--http-port=%d", ports[1]),
                        "--http-grpc-addr="+addr,
+                       "--liaison-server-grpc-host="+host,
+                       fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]),
                        "--node-host-provider", "flag",
                        "--node-host", nodeHost,
                        "--etcd-endpoints", etcdEndpoint,
@@ -216,7 +222,7 @@ var _ = Describe("Client Test", func() {
 
                Eventually(helpers.HTTPHealthCheck(httpAddr, ""), 
flags.EventuallyTimeout).Should(Succeed())
                Eventually(func() (map[string]*databasev1.Node, error) {
-                       return listKeys(etcdEndpoint, "", "", clientConfig, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+                       return listKeys(etcdEndpoint, "", "", clientConfig, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[2]))
                }, flags.EventuallyTimeout).Should(HaveLen(1))
        })
 })
diff --git a/test/integration/standalone/other/disk_test.go 
b/test/integration/standalone/other/disk_test.go
index d2741447..2252fd0a 100644
--- a/test/integration/standalone/other/disk_test.go
+++ b/test/integration/standalone/other/disk_test.go
@@ -58,7 +58,7 @@ var _ = g.Describe("Disk", func() {
        })
        g.It(" is a standalone server, blocking writing, with disk full", 
func() {
                addr, _, deferFn := setup.Standalone(
-                       "--measure-max-disk-usage-percent",
+                       "--liaison-measure-max-disk-usage-percent",
                        "0",
                )
                defer deferFn()
@@ -99,13 +99,13 @@ var _ = g.Describe("Disk", func() {
                ctx := context.Background()
                test_measure.PreloadSchema(ctx, schemaRegistry)
                g.By("Starting data node 0")
-               closeDataNode0 := setup.DataNode(ep,
-                       "--measure-max-disk-usage-percent",
-                       "0")
+               closeDataNode0 := setup.DataNode(ep)
                g.By("Starting data node 1")
                closeDataNode1 := setup.DataNode(ep)
                g.By("Starting liaison node")
-               liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+               liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep,
+                       "--liaison-measure-max-disk-usage-percent",
+                       "0")
                defer func() {
                        closerLiaisonNode()
                        closeDataNode0()
@@ -128,11 +128,10 @@ var _ = g.Describe("Disk", func() {
                                successNum++
                        } else {
                                errNum++
-                               
gm.Expect(resp.GetStatus()).To(gm.Equal(modelv1.Status_name[int32(modelv1.Status_STATUS_DISK_FULL)]))
                        }
                }
                gm.Expect(errNum).To(gm.BeNumerically(">", 0))
-               gm.Expect(successNum).To(gm.BeNumerically(">", 0))
+               gm.Expect(successNum).To(gm.BeEquivalentTo(0))
        })
 })
 


Reply via email to