This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 62b418e5 Add new gRPC and HTTP service for fetching all nodes has 
discovery in each liaison/data node (#927)
62b418e5 is described below

commit 62b418e564a6146e9a56ad61d4126fe59b398373
Author: mrproliu <[email protected]>
AuthorDate: Sat Jan 10 11:36:04 2026 +0800

    Add new gRPC and HTTP service for fetching all nodes has discovery in each 
liaison/data node (#927)
    
    * Add new gRPC and HTTP service for fetching all nodes has discovery in 
each liaison/data node
    
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 api/proto/banyandb/database/v1/rpc.proto           |  26 +++
 banyand/backup/lifecycle/cluster_state.go          | 127 +++++++++++
 banyand/backup/lifecycle/service.go                | 251 ++++++++++++++++++++-
 banyand/backup/lifecycle/steps.go                  |   9 +-
 banyand/liaison/grpc/registry_test.go              |   2 +-
 .../node.go => liaison/grpc/route/route_table.go}  |  19 +-
 banyand/liaison/grpc/server.go                     |  35 ++-
 banyand/liaison/grpc/server_test.go                |   6 +-
 banyand/liaison/http/server.go                     |   1 +
 banyand/metadata/dns/dns.go                        |   6 +-
 banyand/property/gossip/message.go                 |   2 +
 banyand/property/gossip/service.go                 |  22 ++
 banyand/property/property.go                       |   2 +
 banyand/property/service.go                        |   8 +
 banyand/queue/local.go                             |  15 ++
 banyand/queue/pub/pub.go                           |  34 +++
 banyand/queue/queue.go                             |   3 +
 banyand/queue/sub/node.go                          |  24 ++
 banyand/queue/sub/server.go                        |  72 +++---
 docs/api-reference.md                              |  74 ++++++
 pkg/cmdsetup/data.go                               |   5 +
 pkg/cmdsetup/liaison.go                            |   8 +-
 pkg/cmdsetup/standalone.go                         |   2 +-
 .../cluster_state/cluster_state_suite_test.go      | 141 ++++++++++++
 24 files changed, 834 insertions(+), 60 deletions(-)

diff --git a/api/proto/banyandb/database/v1/rpc.proto 
b/api/proto/banyandb/database/v1/rpc.proto
index 4c8d106f..6de71e80 100644
--- a/api/proto/banyandb/database/v1/rpc.proto
+++ b/api/proto/banyandb/database/v1/rpc.proto
@@ -684,3 +684,29 @@ message GetCurrentNodeResponse {
 service NodeQueryService {
   rpc GetCurrentNode(GetCurrentNodeRequest) returns (GetCurrentNodeResponse) {}
 }
+
+message GetClusterStateRequest {}
+
+// RouteTable represents a collection of nodes grouped by their health state.
+// It provides a view of nodes that are registered, actively healthy, and 
those being evicted.
+message RouteTable {
+  // registered contains all nodes that have been discovered and registered in 
this route.
+  repeated banyandb.database.v1.Node registered = 1;
+  // active contains node names (Node.Metadata.Name) that are currently 
healthy and can handle requests.
+  repeated string active = 2;
+  // evictable contains node names (Node.Metadata.Name) that are unhealthy and 
being retried before eviction.
+  repeated string evictable = 3;
+}
+
+message GetClusterStateResponse {
+  // Liaison node: map's key could be "tire1" and "tire2". tire1 route traffic 
between liaison nodes, tire2 spread data among data nodes
+  // Data node: map's key could be "property" for gossip.
+  // Lifecycle agent: map's key could be the next stage's name.
+  map<string, RouteTable> route_tables = 1;
+}
+
+service ClusterStateService {
+  rpc GetClusterState(GetClusterStateRequest) returns 
(GetClusterStateResponse) {
+    option (google.api.http) = {get: "/v1/cluster/state"};
+  }
+}
diff --git a/banyand/backup/lifecycle/cluster_state.go 
b/banyand/backup/lifecycle/cluster_state.go
new file mode 100644
index 00000000..17a4ddd2
--- /dev/null
+++ b/banyand/backup/lifecycle/cluster_state.go
@@ -0,0 +1,127 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "context"
+       "sync"
+       "time"
+
+       "google.golang.org/protobuf/proto"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+// clusterStateManager manages the aggregated RouteTable snapshot from all 
lifecycle groups.
+type clusterStateManager struct {
+       lastUpdateTime  time.Time
+       aggregatedTable *databasev1.RouteTable
+       mu              sync.RWMutex
+}
+
+func (m *clusterStateManager) addRouteTable(rt *databasev1.RouteTable) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       if m.aggregatedTable == nil {
+               m.aggregatedTable = &databasev1.RouteTable{
+                       Registered: []*databasev1.Node{},
+                       Active:     []string{},
+                       Evictable:  []string{},
+               }
+       }
+
+       // deduplicate registered nodes using map keyed by node name
+       nodeMap := make(map[string]*databasev1.Node)
+       for _, node := range m.aggregatedTable.Registered {
+               if node != nil && node.Metadata != nil {
+                       nodeMap[node.Metadata.Name] = node
+               }
+       }
+       for _, node := range rt.Registered {
+               if node != nil && node.Metadata != nil {
+                       nodeMap[node.Metadata.Name] = node
+               }
+       }
+
+       // deduplicate active node names
+       activeSet := make(map[string]bool)
+       for _, name := range m.aggregatedTable.Active {
+               activeSet[name] = true
+       }
+       for _, name := range rt.Active {
+               activeSet[name] = true
+       }
+
+       // Deduplicate evictable node names
+       evictableSet := make(map[string]bool)
+       for _, name := range m.aggregatedTable.Evictable {
+               evictableSet[name] = true
+       }
+       for _, name := range rt.Evictable {
+               evictableSet[name] = true
+       }
+
+       // Convert maps back to slices
+       m.aggregatedTable.Registered = make([]*databasev1.Node, 0, len(nodeMap))
+       for _, node := range nodeMap {
+               m.aggregatedTable.Registered = 
append(m.aggregatedTable.Registered, node)
+       }
+
+       m.aggregatedTable.Active = make([]string, 0, len(activeSet))
+       for name := range activeSet {
+               m.aggregatedTable.Active = append(m.aggregatedTable.Active, 
name)
+       }
+
+       m.aggregatedTable.Evictable = make([]string, 0, len(evictableSet))
+       for name := range evictableSet {
+               m.aggregatedTable.Evictable = 
append(m.aggregatedTable.Evictable, name)
+       }
+
+       m.lastUpdateTime = time.Now()
+}
+
+func (m *clusterStateManager) getSnapshot() *databasev1.RouteTable {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       if m.aggregatedTable == nil {
+               return &databasev1.RouteTable{
+                       Registered: []*databasev1.Node{},
+                       Active:     []string{},
+                       Evictable:  []string{},
+               }
+       }
+
+       // deep copy to avoid concurrent modification
+       return proto.Clone(m.aggregatedTable).(*databasev1.RouteTable)
+}
+
+// GetClusterState implements the ClusterStateService.GetClusterState RPC.
+// It returns the aggregated cluster state under the "lifecycle" key.
+func (l *lifecycleService) GetClusterState(_ context.Context, _ 
*databasev1.GetClusterStateRequest) (*databasev1.GetClusterStateResponse, 
error) {
+       routeTable := l.clusterStateMgr.getSnapshot()
+
+       routeTables := map[string]*databasev1.RouteTable{
+               "lifecycle": routeTable,
+       }
+
+       return &databasev1.GetClusterStateResponse{
+               RouteTables: routeTables,
+       }, nil
+}
diff --git a/banyand/backup/lifecycle/service.go 
b/banyand/backup/lifecycle/service.go
index c001d72f..d64193b5 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -20,16 +20,27 @@ package lifecycle
 import (
        "context"
        "encoding/json"
-       "errors"
        "fmt"
+       "net"
+       "net/http"
        "os"
        "path/filepath"
        "sort"
+       "strconv"
+       "sync"
        "time"
 
        "github.com/benbjohnson/clock"
+       "github.com/go-chi/chi/v5"
+       grpc_validator 
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+       "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
+       "github.com/pkg/errors"
        "github.com/robfig/cron/v3"
-       "google.golang.org/grpc"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/grpc/health"
+       "google.golang.org/grpc/health/grpc_health_v1"
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -42,24 +53,34 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/healthcheck"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
 )
 
 type service interface {
        run.Config
+       run.PreRunner
        run.Service
 }
 
 var _ service = (*lifecycleService)(nil)
 
 type lifecycleService struct {
+       databasev1.UnimplementedClusterStateServiceServer
        metadata          metadata.Repo
        omr               observability.MetricsRegistry
        pm                protector.Memory
+       clusterStateMgr   *clusterStateManager
        l                 *logger.Logger
        sch               *timestamp.Scheduler
+       grpcServer        *grpclib.Server
+       httpSrv           *http.Server
+       tlsReloader       *pkgtls.Reloader
+       clientCloser      context.CancelFunc
+       stopCh            chan struct{}
        measureRoot       string
        streamRoot        string
        traceRoot         string
@@ -68,17 +89,26 @@ type lifecycleService struct {
        schedule          string
        cert              string
        gRPCAddr          string
+       lifecycleHost     string
+       lifecycleGRPCAddr string
+       lifecycleHTTPAddr string
+       lifecycleCertFile string
+       lifecycleKeyFile  string
+       lifecycleGRPCPort uint32
+       lifecycleHTTPPort uint32
        maxExecutionTimes int
        enableTLS         bool
        insecure          bool
+       lifecycleTLS      bool
        chunkSize         run.Bytes
 }
 
 // NewService creates a new lifecycle service.
 func NewService(meta metadata.Repo) run.Unit {
        ls := &lifecycleService{
-               metadata: meta,
-               omr:      observability.BypassRegistry,
+               metadata:        meta,
+               omr:             observability.BypassRegistry,
+               clusterStateMgr: &clusterStateManager{},
        }
        ls.pm = protector.NewMemory(ls.omr)
        return ls
@@ -105,10 +135,52 @@ func (l *lifecycleService) FlagSet() *run.FlagSet {
        flagS.IntVar(&l.maxExecutionTimes, "max-execution-times", 0, "Maximum 
number of times to execute the lifecycle migration. 0 means no limit.")
        l.chunkSize = run.Bytes(1024 * 1024)
        flagS.VarP(&l.chunkSize, "chunk-size", "", "Chunk size in bytes for 
streaming data during migration (default: 1MB)")
+
+       // Lifecycle server flags
+       flagS.BoolVar(&l.lifecycleTLS, "lifecycle-tls", false, "connection uses 
TLS if true, else plain TCP")
+       flagS.StringVar(&l.lifecycleCertFile, "lifecycle-cert-file", "", "the 
TLS cert file")
+       flagS.StringVar(&l.lifecycleKeyFile, "lifecycle-key-file", "", "the TLS 
key file")
+       flagS.StringVar(&l.lifecycleHost, "lifecycle-grpc-host", "", "the host 
of lifecycle server listens")
+       flagS.Uint32Var(&l.lifecycleGRPCPort, "lifecycle-grpc-port", 17912, 
"the port of lifecycle server listens")
+       flagS.Uint32Var(&l.lifecycleHTTPPort, "lifecycle-http-port", 17913, 
"the port of lifecycle http api listens")
+
        return flagS
 }
 
 func (l *lifecycleService) Validate() error {
+       if l.schedule != "" {
+               l.lifecycleGRPCAddr = net.JoinHostPort(l.lifecycleHost, 
strconv.FormatUint(uint64(l.lifecycleGRPCPort), 10))
+               if l.lifecycleGRPCAddr == ":" {
+                       return errors.New("no gRPC address")
+               }
+               l.lifecycleHTTPAddr = net.JoinHostPort(l.lifecycleHost, 
strconv.FormatUint(uint64(l.lifecycleHTTPPort), 10))
+               if l.lifecycleHTTPAddr == ":" {
+                       return errors.New("no HTTP address")
+               }
+               if l.lifecycleTLS {
+                       if l.lifecycleCertFile == "" {
+                               return errors.New("missing cert file when TLS 
is enabled")
+                       }
+                       if l.lifecycleKeyFile == "" {
+                               return errors.New("missing key file when TLS is 
enabled")
+                       }
+               }
+       }
+       return nil
+}
+
+// PreRun initializes the lifecycle service and its embedded server.
+func (l *lifecycleService) PreRun(_ context.Context) error {
+       l.l = logger.GetLogger("lifecycle")
+
+       if l.schedule != "" && l.lifecycleTLS {
+               var err error
+               l.tlsReloader, err = pkgtls.NewReloader(l.lifecycleCertFile, 
l.lifecycleKeyFile, l.l)
+               if err != nil {
+                       return errors.Wrap(err, "failed to initialize TLS 
reloader")
+               }
+       }
+
        return nil
 }
 
@@ -116,6 +188,44 @@ func (l *lifecycleService) GracefulStop() {
        if l.sch != nil {
                l.sch.Close()
        }
+
+       l.l.Info().Msg("Stopping lifecycle server")
+
+       if l.tlsReloader != nil {
+               l.tlsReloader.Stop()
+       }
+
+       if l.clientCloser != nil {
+               l.clientCloser()
+       }
+
+       // Stop HTTP server
+       if l.httpSrv != nil {
+               ctx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
+               defer cancel()
+               if shutdownErr := l.httpSrv.Shutdown(ctx); shutdownErr != nil {
+                       l.l.Warn().Err(shutdownErr).Msg("HTTP server shutdown 
error")
+               }
+       }
+
+       // Stop gRPC server
+       if l.grpcServer != nil {
+               stopped := make(chan struct{})
+               go func() {
+                       l.grpcServer.GracefulStop()
+                       close(stopped)
+               }()
+
+               t := time.NewTimer(10 * time.Second)
+               select {
+               case <-t.C:
+                       l.grpcServer.Stop()
+                       l.l.Info().Msg("Lifecycle server force stopped")
+               case <-stopped:
+                       t.Stop()
+                       l.l.Info().Msg("Lifecycle server stopped gracefully")
+               }
+       }
 }
 
 func (l *lifecycleService) Name() string {
@@ -124,6 +234,13 @@ func (l *lifecycleService) Name() string {
 
 func (l *lifecycleService) Serve() run.StopNotify {
        l.l = logger.GetLogger("lifecycle")
+       l.stopCh = make(chan struct{})
+
+       // Start gRPC/HTTP servers when schedule is set
+       if l.schedule != "" {
+               l.startServers()
+       }
+
        done := make(chan struct{})
        if l.schedule == "" {
                defer close(done)
@@ -152,11 +269,125 @@ func (l *lifecycleService) Serve() run.StopNotify {
        })
        if err != nil {
                l.l.Error().Err(err).Msg("failed to register lifecycle 
migration schedule")
+               close(done)
                return done
        }
+
+       // Wait for either migration completion or server stop
+       go func() {
+               select {
+               case <-done:
+                       // Migration completed
+               case <-l.stopCh:
+                       // Server stopped
+                       close(done)
+               }
+       }()
+
        return done
 }
 
+func (l *lifecycleService) startServers() {
+       // Setup gRPC server
+       var opts []grpclib.ServerOption
+       if l.lifecycleTLS && l.tlsReloader != nil {
+               if startErr := l.tlsReloader.Start(); startErr != nil {
+                       l.l.Error().Err(startErr).Msg("Failed to start TLS 
reloader")
+                       close(l.stopCh)
+                       return
+               }
+               tlsConfig := l.tlsReloader.GetTLSConfig()
+               creds := credentials.NewTLS(tlsConfig)
+               opts = append(opts, grpclib.Creds(creds))
+       }
+
+       opts = append(opts,
+               grpclib.ChainUnaryInterceptor(
+                       grpc_validator.UnaryServerInterceptor(),
+               ),
+       )
+
+       l.grpcServer = grpclib.NewServer(opts...)
+       databasev1.RegisterClusterStateServiceServer(l.grpcServer, l)
+       grpc_health_v1.RegisterHealthServer(l.grpcServer, health.NewServer())
+
+       // Setup HTTP server
+       var ctx context.Context
+       ctx, l.clientCloser = context.WithCancel(context.Background())
+
+       clientOpts := make([]grpclib.DialOption, 0, 1)
+       if l.lifecycleTLS && l.tlsReloader != nil {
+               tlsConfig := l.tlsReloader.GetTLSConfig()
+               creds := credentials.NewTLS(tlsConfig)
+               clientOpts = append(clientOpts, 
grpclib.WithTransportCredentials(creds))
+       } else {
+               clientOpts = append(clientOpts, 
grpclib.WithTransportCredentials(insecure.NewCredentials()))
+       }
+
+       client, err := healthcheck.NewClient(ctx, l.l, l.lifecycleGRPCAddr, 
clientOpts)
+       if err != nil {
+               l.l.Error().Err(err).Msg("Failed to create health check client")
+               close(l.stopCh)
+               return
+       }
+
+       gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client))
+       if registerErr := 
databasev1.RegisterClusterStateServiceHandlerFromEndpoint(ctx, gwMux, 
l.lifecycleGRPCAddr, clientOpts); registerErr != nil {
+               l.l.Error().Err(registerErr).Msg("Failed to register cluster 
state service")
+               close(l.stopCh)
+               return
+       }
+
+       mux := chi.NewRouter()
+       mux.Mount("/api", http.StripPrefix("/api", gwMux))
+
+       l.httpSrv = &http.Server{
+               Addr:              l.lifecycleHTTPAddr,
+               Handler:           mux,
+               ReadHeaderTimeout: 3 * time.Second,
+       }
+
+       // Start both servers
+       var wg sync.WaitGroup
+       wg.Add(2)
+
+       // gRPC server goroutine
+       go func() {
+               defer wg.Done()
+               lis, listenErr := net.Listen("tcp", l.lifecycleGRPCAddr)
+               if listenErr != nil {
+                       l.l.Error().Err(listenErr).Msg("Failed to listen on 
gRPC addr")
+                       return
+               }
+               l.l.Info().Str("addr", l.lifecycleGRPCAddr).Msg("Lifecycle gRPC 
server listening")
+               if serveErr := l.grpcServer.Serve(lis); serveErr != nil {
+                       l.l.Error().Err(serveErr).Msg("gRPC server error")
+               }
+       }()
+
+       // HTTP server goroutine
+       go func() {
+               defer wg.Done()
+               l.l.Info().Str("addr", l.lifecycleHTTPAddr).Msg("Lifecycle HTTP 
server listening")
+               var serveErr error
+               if l.lifecycleTLS && l.tlsReloader != nil {
+                       l.httpSrv.TLSConfig = l.tlsReloader.GetTLSConfig()
+                       serveErr = l.httpSrv.ListenAndServeTLS("", "")
+               } else {
+                       serveErr = l.httpSrv.ListenAndServe()
+               }
+               if serveErr != nil && serveErr != http.ErrServerClosed {
+                       l.l.Error().Err(serveErr).Msg("HTTP server error")
+               }
+       }()
+
+       // Wait for both servers to stop
+       go func() {
+               wg.Wait()
+               close(l.stopCh)
+       }()
+}
+
 func (l *lifecycleService) action() error {
        ctx := context.Background()
        progress := LoadProgress(l.progressFilePath, l.l)
@@ -564,7 +795,7 @@ func (l *lifecycleService) getGroupsToProcess(ctx 
context.Context, progress *Pro
 func (l *lifecycleService) processStreamGroup(ctx context.Context, g 
*commonv1.Group,
        streamDir string, nodes []*databasev1.Node, labels map[string]string, 
progress *Progress,
 ) {
-       group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr)
        if err != nil {
                l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
                return
@@ -663,7 +894,7 @@ func (l *lifecycleService) deleteExpiredStreamSegments(ctx 
context.Context, g *c
                return
        }
 
-       resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, 
func(conn *grpc.ClientConn) (*streamv1.DeleteExpiredSegmentsResponse, error) {
+       resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, 
func(conn *grpclib.ClientConn) (*streamv1.DeleteExpiredSegmentsResponse, error) 
{
                client := streamv1.NewStreamServiceClient(conn)
                return client.DeleteExpiredSegments(ctx, 
&streamv1.DeleteExpiredSegmentsRequest{
                        Group:           g.Metadata.Name,
@@ -683,7 +914,7 @@ func (l *lifecycleService) deleteExpiredStreamSegments(ctx 
context.Context, g *c
 func (l *lifecycleService) processMeasureGroup(ctx context.Context, g 
*commonv1.Group, measureDir string,
        nodes []*databasev1.Node, labels map[string]string, progress *Progress,
 ) {
-       group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr)
        if err != nil {
                l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
                return
@@ -746,7 +977,7 @@ func (l *lifecycleService) deleteExpiredMeasureSegments(ctx 
context.Context, g *
                return
        }
 
-       resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, 
func(conn *grpc.ClientConn) (*measurev1.DeleteExpiredSegmentsResponse, error) {
+       resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, 
func(conn *grpclib.ClientConn) (*measurev1.DeleteExpiredSegmentsResponse, 
error) {
                client := measurev1.NewMeasureServiceClient(conn)
                return client.DeleteExpiredSegments(ctx, 
&measurev1.DeleteExpiredSegmentsRequest{
                        Group:           g.Metadata.Name,
@@ -769,7 +1000,7 @@ func (l *lifecycleService) deleteExpiredTraceSegments(ctx 
context.Context, g *co
                return
        }
 
-       resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, 
func(conn *grpc.ClientConn) (*tracev1.DeleteExpiredSegmentsResponse, error) {
+       resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, 
func(conn *grpclib.ClientConn) (*tracev1.DeleteExpiredSegmentsResponse, error) {
                client := tracev1.NewTraceServiceClient(conn)
                return client.DeleteExpiredSegments(ctx, 
&tracev1.DeleteExpiredSegmentsRequest{
                        Group:           g.Metadata.Name,
@@ -789,7 +1020,7 @@ func (l *lifecycleService) deleteExpiredTraceSegments(ctx 
context.Context, g *co
 func (l *lifecycleService) processTraceGroup(ctx context.Context, g 
*commonv1.Group, traceDir string,
        nodes []*databasev1.Node, labels map[string]string, progress *Progress,
 ) {
-       group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr)
        if err != nil {
                l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
                return
diff --git a/banyand/backup/lifecycle/steps.go 
b/banyand/backup/lifecycle/steps.go
index 6e405e87..1ca5c333 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -101,8 +101,9 @@ func (gc *GroupConfig) Close() {
        }
 }
 
-func parseGroup(g *commonv1.Group, nodeLabels map[string]string, nodes 
[]*databasev1.Node,
-       l *logger.Logger, metadata metadata.Repo,
+func parseGroup(
+       g *commonv1.Group, nodeLabels map[string]string, nodes 
[]*databasev1.Node,
+       l *logger.Logger, metadata metadata.Repo, clusterStateMgr 
*clusterStateManager,
 ) (*GroupConfig, error) {
        ro := g.ResourceOpts
        if ro == nil {
@@ -172,6 +173,10 @@ func parseGroup(g *commonv1.Group, nodeLabels 
map[string]string, nodes []*databa
        if !existed {
                return nil, errors.New("no nodes matched")
        }
+
+       if t := client.GetRouteTable(); t != nil {
+               clusterStateMgr.addRouteTable(t)
+       }
        return &GroupConfig{
                Group:           g,
                TargetShardNum:  nst.ShardNum,
diff --git a/banyand/liaison/grpc/registry_test.go 
b/banyand/liaison/grpc/registry_test.go
index 95415a08..4bf50291 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -189,7 +189,7 @@ func setupForRegistry() func() {
        tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, pipeline, 
metaSvc, grpc.NodeRegistries{
                MeasureLiaisonNodeRegistry: nr,
                PropertyNodeRegistry:       nr,
-       }, metricSvc, nil)
+       }, metricSvc, nil, nil)
        preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
        var flags []string
        metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/queue/sub/node.go 
b/banyand/liaison/grpc/route/route_table.go
similarity index 55%
copy from banyand/queue/sub/node.go
copy to banyand/liaison/grpc/route/route_table.go
index 3ddd5d2a..0fe568fd 100644
--- a/banyand/queue/sub/node.go
+++ b/banyand/liaison/grpc/route/route_table.go
@@ -15,16 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package sub
+// Package route defines interfaces and types for providing route table 
information.
+package route
 
 import (
-       "context"
-
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 )
 
-func (s *server) GetCurrentNode(context.Context, 
*databasev1.GetCurrentNodeRequest) (*databasev1.GetCurrentNodeResponse, error) {
-       return &databasev1.GetCurrentNodeResponse{
-               Node: s.curNode,
-       }, nil
+// TableProvider provides route table information for the cluster state API.
+// Implementations should return a RouteTable with all registered nodes and 
their health states.
+type TableProvider interface {
+       // GetRouteTable returns the current route table state.
+       // The returned RouteTable contains:
+       // - Registered: all nodes known to this provider (full Node 
information)
+       // - Active: node names (Node.Metadata.Name) that are currently healthy
+       // - Evictable: node names (Node.Metadata.Name) that are unhealthy and 
being retried
+       // The returned RouteTable is a copy and safe for concurrent access.
+       GetRouteTable() *databasev1.RouteTable
 }
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 07460bd7..c0e02ec4 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -43,6 +43,7 @@ import (
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
        "github.com/apache/skywalking-banyandb/banyand/liaison/pkg/auth"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -88,6 +89,7 @@ type NodeRegistries struct {
 
 type server struct {
        databasev1.UnimplementedSnapshotServiceServer
+       databasev1.UnimplementedClusterStateServiceServer
        omr        observability.MetricsRegistry
        schemaRepo metadata.Repo
        protector  protector.Memory
@@ -111,6 +113,7 @@ type server struct {
        groupRepo    *groupRepo
        *indexRuleBindingRegistryServer
        metrics                  *metrics
+       routeTableProviders      map[string]route.TableProvider
        keyFile                  string
        authConfigFile           string
        addr                     string
@@ -132,7 +135,7 @@ type server struct {
 // NewServer returns a new gRPC server.
 func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster 
queue.Client,
        schemaRegistry metadata.Repo, nr NodeRegistries, omr 
observability.MetricsRegistry,
-       protectorService protector.Memory,
+       protectorService protector.Memory, routeProviders 
map[string]route.TableProvider,
 ) Server {
        gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)}
        er := &entityRepo{entitiesMap: make(map[identity]partition.Locator), 
measureMap: make(map[identity]*databasev1.Measure)}
@@ -198,9 +201,10 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
                traceRegistryServer: &traceRegistryServer{
                        schemaRegistry: schemaRegistry,
                },
-               schemaRepo:   schemaRegistry,
-               authReloader: auth.InitAuthReloader(),
-               protector:    protectorService,
+               schemaRepo:          schemaRegistry,
+               authReloader:        auth.InitAuthReloader(),
+               protector:           protectorService,
+               routeTableProviders: routeProviders,
        }
        s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC, 
traceSVC, s.propertyServer}
        s.queryAccessLogRecorders = []queryAccessLogRecorder{streamSVC, 
measureSVC, traceSVC, s.propertyServer}
@@ -422,6 +426,7 @@ func (s *server) Serve() run.StopNotify {
        databasev1.RegisterSnapshotServiceServer(s.ser, s)
        databasev1.RegisterPropertyRegistryServiceServer(s.ser, 
s.propertyRegistryServer)
        databasev1.RegisterTraceRegistryServiceServer(s.ser, 
s.traceRegistryServer)
+       databasev1.RegisterClusterStateServiceServer(s.ser, s)
        grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
 
        s.stopCh = make(chan struct{})
@@ -538,6 +543,28 @@ func (s *server) calculateGrpcBufferSizes() (int32, int32) 
{
        return connWindowSize, streamWindowSize
 }
 
+// GetClusterState returns the current state of all nodes in the cluster 
organized by route tables.
+func (s *server) GetClusterState(_ context.Context, _ 
*databasev1.GetClusterStateRequest) (*databasev1.GetClusterStateResponse, 
error) {
+       if s.routeTableProviders == nil {
+               return &databasev1.GetClusterStateResponse{RouteTables: 
map[string]*databasev1.RouteTable{}}, nil
+       }
+
+       routeTables := make(map[string]*databasev1.RouteTable, 
len(s.routeTableProviders))
+       for routeKey, provider := range s.routeTableProviders {
+               if provider == nil {
+                       s.log.Warn().Str("routeKey", routeKey).Msg("route table 
provider is nil")
+                       continue
+               }
+
+               routeTable := provider.GetRouteTable()
+               if routeTable != nil {
+                       routeTables[routeKey] = routeTable
+               }
+       }
+
+       return &databasev1.GetClusterStateResponse{RouteTables: routeTables}, 
nil
+}
+
 func (s *server) GracefulStop() {
        s.log.Info().Msg("stopping")
        if s.tls && s.tlsReloader != nil {
diff --git a/banyand/liaison/grpc/server_test.go 
b/banyand/liaison/grpc/server_test.go
index 2b7fb119..4c879bae 100644
--- a/banyand/liaison/grpc/server_test.go
+++ b/banyand/liaison/grpc/server_test.go
@@ -105,14 +105,14 @@ func TestNewServerWithProtector(t *testing.T) {
        protectorService := &mockProtector{state: protector.StateLow}
 
        // Create server with protector - should not panic
-       server := NewServer(context.Background(), nil, nil, nil, nil, 
NodeRegistries{}, nil, protectorService)
+       server := NewServer(context.Background(), nil, nil, nil, nil, 
NodeRegistries{}, nil, protectorService, nil)
        assert.NotNil(t, server)
 }
 
 // TestNewServerWithoutProtector verifies nil protector handling.
 func TestNewServerWithoutProtector(t *testing.T) {
        // Server creation should not fail with nil protector (fail open)
-       server := NewServer(context.Background(), nil, nil, nil, nil, 
NodeRegistries{}, nil, nil)
+       server := NewServer(context.Background(), nil, nil, nil, nil, 
NodeRegistries{}, nil, nil, nil)
        assert.NotNil(t, server)
 }
 
@@ -397,7 +397,7 @@ func setupTestServer(t *testing.T, protectorService 
protector.Memory) (string, f
                StreamLiaisonNodeRegistry:  nr,
                PropertyNodeRegistry:       nr,
                TraceLiaisonNodeRegistry:   nr,
-       }, metricSvc, protectorService)
+       }, metricSvc, protectorService, nil)
 
        // Configure server - use a fixed port for testing
        grpcServer.(*server).host = "localhost"
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index 8f6282fd..3a6e5b28 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -357,6 +357,7 @@ func (p *server) initGRPCClient() error {
                
databasev1.RegisterTopNAggregationRegistryServiceHandlerFromEndpoint(p.grpcCtx, 
p.gwMux, p.grpcAddr, opts),
                
databasev1.RegisterSnapshotServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux, 
p.grpcAddr, opts),
                
databasev1.RegisterPropertyRegistryServiceHandlerFromEndpoint(p.grpcCtx, 
p.gwMux, p.grpcAddr, opts),
+               
databasev1.RegisterClusterStateServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux, 
p.grpcAddr, opts),
                streamv1.RegisterStreamServiceHandlerFromEndpoint(p.grpcCtx, 
p.gwMux, p.grpcAddr, opts),
                measurev1.RegisterMeasureServiceHandlerFromEndpoint(p.grpcCtx, 
p.gwMux, p.grpcAddr, opts),
                
propertyv1.RegisterPropertyServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux, 
p.grpcAddr, opts),
diff --git a/banyand/metadata/dns/dns.go b/banyand/metadata/dns/dns.go
index f3af3d52..2664f5df 100644
--- a/banyand/metadata/dns/dns.go
+++ b/banyand/metadata/dns/dns.go
@@ -49,7 +49,7 @@ type Service struct {
        closer            *run.Closer
        log               *logger.Logger
        metrics           *metrics
-       handlers          map[string]schema.EventHandler
+       handlers          []schema.EventHandler
        lastSuccessfulDNS map[string][]string
        srvAddresses      []string
        caCertPaths       []string
@@ -110,7 +110,7 @@ func NewService(cfg Config) (*Service, error) {
                tlsEnabled:        cfg.TLSEnabled,
                caCertPaths:       cfg.CACertPaths,
                nodeCache:         make(map[string]*databasev1.Node),
-               handlers:          make(map[string]schema.EventHandler),
+               handlers:          make([]schema.EventHandler, 0),
                lastSuccessfulDNS: make(map[string][]string),
                pathToReloader:    make(map[string]*pkgtls.Reloader),
                srvAddrToPath:     make(map[string]string),
@@ -532,7 +532,7 @@ func (s *Service) RegisterHandler(name string, handler 
schema.EventHandler) {
        s.handlersMutex.Lock()
        defer s.handlersMutex.Unlock()
 
-       s.handlers[name] = handler
+       s.handlers = append(s.handlers, handler)
        s.log.Debug().Str("handler", name).Msg("Registered DNS node discovery 
handler")
 }
 
diff --git a/banyand/property/gossip/message.go 
b/banyand/property/gossip/message.go
index b6b3c9a5..30a5969e 100644
--- a/banyand/property/gossip/message.go
+++ b/banyand/property/gossip/message.go
@@ -24,6 +24,7 @@ import (
        "google.golang.org/grpc"
 
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -60,6 +61,7 @@ type MessageClient interface {
 // MessageServer is an interface that defines methods for subscribing to 
topics and receiving messages in a gossip protocol.
 type MessageServer interface {
        run.Unit
+       route.TableProvider
        // Subscribe allows subscribing to a topic to receive messages.
        Subscribe(listener MessageListener)
        // RegisterServices registers the gRPC services with the provided 
server.
diff --git a/banyand/property/gossip/service.go 
b/banyand/property/gossip/service.go
index db44a65d..be377d53 100644
--- a/banyand/property/gossip/service.go
+++ b/banyand/property/gossip/service.go
@@ -254,6 +254,28 @@ func (s *service) Serve(stopCh chan struct{}) {
        }()
 }
 
+// GetRouteTable implements RouteTableProvider interface.
+// For gossip messenger, all registered nodes are considered active since 
gossip
+// protocol doesn't track separate health states.
+func (s *service) GetRouteTable() *databasev1.RouteTable {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+
+       registered := make([]*databasev1.Node, 0, len(s.registered))
+
+       for _, node := range s.registered {
+               if node != nil {
+                       registered = append(registered, node)
+               }
+       }
+
+       return &databasev1.RouteTable{
+               Registered: registered,
+               Active:     []string{},
+               Evictable:  []string{},
+       }
+}
+
 func (s *service) GracefulStop() {
        if s.ser == nil {
                return
diff --git a/banyand/property/property.go b/banyand/property/property.go
index b58931d9..38948aaa 100644
--- a/banyand/property/property.go
+++ b/banyand/property/property.go
@@ -23,6 +23,7 @@ import (
        "strings"
 
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -32,6 +33,7 @@ type Service interface {
        run.PreRunner
        run.Config
        run.Service
+       route.TableProvider
 
        GetGossIPGrpcPort() *uint32
 }
diff --git a/banyand/property/service.go b/banyand/property/service.go
index bb82829d..275053e6 100644
--- a/banyand/property/service.go
+++ b/banyand/property/service.go
@@ -199,6 +199,14 @@ func (s *service) GetGossIPGrpcPort() *uint32 {
        return s.gossipMessenger.GetServerPort()
 }
 
+// GetRouteTable implements RouteTableProvider interface by delegating to 
gossipMessenger.
+func (s *service) GetRouteTable() *databasev1.RouteTable {
+       if s.gossipMessenger == nil {
+               return nil
+       }
+       return s.gossipMessenger.GetRouteTable()
+}
+
 // NewService returns a new service.
 func NewService(metadata metadata.Repo, pipeline queue.Server, pipelineClient 
queue.Client, omr observability.MetricsRegistry, pm protector.Memory) (Service, 
error) {
        return &service{
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 455b91af..a83343b8 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -24,6 +24,8 @@ import (
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -96,6 +98,9 @@ func (*local) GetPort() *uint32 {
        return nil
 }
 
+func (*local) SetRouteProviders(_ map[string]route.TableProvider) {
+}
+
 func (*local) Register(bus.Topic, schema.EventHandler) {
 }
 
@@ -103,6 +108,16 @@ func (*local) HealthyNodes() []string {
        return nil
 }
 
+// GetRouteTable returns an empty route table for local queue.
+// Local queue doesn't have distributed nodes, so all fields are empty.
+func (*local) GetRouteTable() *databasev1.RouteTable {
+       return &databasev1.RouteTable{
+               Registered: []*databasev1.Node{},
+               Active:     []string{},
+               Evictable:  []string{},
+       }
+}
+
 type localBatchPublisher struct {
        ctx      context.Context
        local    *bus.Bus
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 26d5320f..2ce624ab 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -319,6 +319,40 @@ func (p *pub) Publish(_ context.Context, topic bus.Topic, 
messages ...bus.Messag
        return p.publish(15*time.Second, topic, messages...)
 }
 
+// GetRouteTable implements RouteTableProvider interface.
+// Returns a RouteTable with all registered nodes and their health states.
+func (p *pub) GetRouteTable() *databasev1.RouteTable {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+
+       registered := make([]*databasev1.Node, 0, len(p.registered))
+       for _, node := range p.registered {
+               if node != nil {
+                       registered = append(registered, node)
+               }
+       }
+
+       active := make([]string, 0, len(p.active))
+       for nodeID := range p.active {
+               if node := p.registered[nodeID]; node != nil && node.Metadata 
!= nil {
+                       active = append(active, node.Metadata.Name)
+               }
+       }
+
+       evictable := make([]string, 0, len(p.evictable))
+       for nodeID := range p.evictable {
+               if node := p.registered[nodeID]; node != nil && node.Metadata 
!= nil {
+                       evictable = append(evictable, node.Metadata.Name)
+               }
+       }
+
+       return &databasev1.RouteTable{
+               Registered: registered,
+               Active:     active,
+               Evictable:  evictable,
+       }
+}
+
 // New returns a new queue client targeting the given node roles.
 // If no roles are passed, it defaults to databasev1.Role_ROLE_DATA.
 func New(metadata metadata.Repo, roles ...databasev1.Role) queue.Client {
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index defb57e5..c51dc294 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -24,6 +24,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/fs"
@@ -46,6 +47,7 @@ type Client interface {
        run.Unit
        bus.Publisher
        bus.Broadcaster
+       route.TableProvider
        NewBatchPublisher(timeout time.Duration) BatchPublisher
        NewChunkedSyncClient(node string, chunkSize uint32) (ChunkedSyncClient, 
error)
        Register(bus.Topic, schema.EventHandler)
@@ -60,6 +62,7 @@ type Server interface {
        bus.Subscriber
        RegisterChunkedSyncHandler(topic bus.Topic, handler ChunkedSyncHandler)
        GetPort() *uint32
+       SetRouteProviders(providers map[string]route.TableProvider)
 }
 
 // BatchPublisher is the interface for publishing data in batch.
diff --git a/banyand/queue/sub/node.go b/banyand/queue/sub/node.go
index 3ddd5d2a..b1e0b697 100644
--- a/banyand/queue/sub/node.go
+++ b/banyand/queue/sub/node.go
@@ -28,3 +28,27 @@ func (s *server) GetCurrentNode(context.Context, 
*databasev1.GetCurrentNodeReque
                Node: s.curNode,
        }, nil
 }
+
+func (s *server) GetClusterState(context.Context, 
*databasev1.GetClusterStateRequest) (*databasev1.GetClusterStateResponse, 
error) {
+       s.routeTableProviderMu.RLock()
+       defer s.routeTableProviderMu.RUnlock()
+
+       if s.routeTableProvider == nil {
+               return &databasev1.GetClusterStateResponse{RouteTables: 
map[string]*databasev1.RouteTable{}}, nil
+       }
+
+       routeTables := make(map[string]*databasev1.RouteTable, 
len(s.routeTableProvider))
+       for routeKey, provider := range s.routeTableProvider {
+               if provider == nil {
+                       s.log.Warn().Str("routeKey", routeKey).Msg("route table 
provider is nil")
+                       continue
+               }
+
+               routeTable := provider.GetRouteTable()
+               if routeTable != nil {
+                       routeTables[routeKey] = routeTable
+               }
+       }
+
+       return &databasev1.GetClusterStateResponse{RouteTables: routeTables}, 
nil
+}
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index c2d45922..25ebc76d 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -47,6 +47,7 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -71,39 +72,41 @@ var (
 )
 
 type server struct {
-       clusterv1.UnimplementedServiceServer
        clusterv1.UnimplementedChunkedSyncServiceServer
        streamv1.UnimplementedStreamServiceServer
        databasev1.UnimplementedSnapshotServiceServer
        databasev1.UnimplementedNodeQueryServiceServer
-       creds               credentials.TransportCredentials
-       tlsReloader         *pkgtls.Reloader
-       omr                 observability.MetricsRegistry
-       metrics             *metrics
-       ser                 *grpclib.Server
-       listeners           map[bus.Topic][]bus.MessageListener
-       topicMap            map[string]bus.Topic
-       chunkedSyncHandlers map[bus.Topic]queue.ChunkedSyncHandler
-       log                 *logger.Logger
-       httpSrv             *http.Server
-       curNode             *databasev1.Node
-       clientCloser        context.CancelFunc
-       httpAddr            string
-       addr                string
-       host                string
-       certFile            string
-       keyFile             string
-       flagNamePrefix      string
-       maxRecvMsgSize      run.Bytes
-       listenersLock       sync.RWMutex
-       port                uint32
-       httpPort            uint32
-       tls                 bool
-       // Chunk ordering configuration
-       enableChunkReordering bool
-       maxChunkBufferSize    uint32
+       databasev1.UnimplementedClusterStateServiceServer
+       clusterv1.UnimplementedServiceServer
+       omr                   observability.MetricsRegistry
+       creds                 credentials.TransportCredentials
+       curNode               *databasev1.Node
+       metrics               *metrics
+       ser                   *grpclib.Server
+       listeners             map[bus.Topic][]bus.MessageListener
+       topicMap              map[string]bus.Topic
+       chunkedSyncHandlers   map[bus.Topic]queue.ChunkedSyncHandler
+       log                   *logger.Logger
+       httpSrv               *http.Server
+       tlsReloader           *pkgtls.Reloader
+       clientCloser          context.CancelFunc
+       routeTableProvider    map[string]route.TableProvider
+       certFile              string
+       addr                  string
+       keyFile               string
+       flagNamePrefix        string
+       httpAddr              string
+       host                  string
        chunkBufferTimeout    time.Duration
+       maxRecvMsgSize        run.Bytes
+       listenersLock         sync.RWMutex
+       routeTableProviderMu  sync.RWMutex
+       port                  uint32
+       httpPort              uint32
+       maxChunkBufferSize    uint32
        maxChunkGapSize       uint32
+       tls                   bool
+       enableChunkReordering bool
 }
 
 // NewServer returns a new gRPC server.
@@ -113,7 +116,7 @@ func NewServer(omr observability.MetricsRegistry) 
queue.Server {
 
 // NewServerWithPorts returns a new gRPC server with specified ports.
 func NewServerWithPorts(omr observability.MetricsRegistry, flagNamePrefix 
string, port, httpPort uint32) queue.Server {
-       return &server{
+       srv := &server{
                listeners:           make(map[bus.Topic][]bus.MessageListener),
                topicMap:            make(map[string]bus.Topic),
                chunkedSyncHandlers: 
make(map[bus.Topic]queue.ChunkedSyncHandler),
@@ -128,6 +131,7 @@ func NewServerWithPorts(omr observability.MetricsRegistry, 
flagNamePrefix string
                chunkBufferTimeout:    5 * time.Second,
                maxChunkGapSize:       5,
        }
+       return srv
 }
 
 func (s *server) PreRun(ctx context.Context) error {
@@ -283,6 +287,7 @@ func (s *server) Serve() run.StopNotify {
        grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
        databasev1.RegisterSnapshotServiceServer(s.ser, s)
        databasev1.RegisterNodeQueryServiceServer(s.ser, s)
+       databasev1.RegisterClusterStateServiceServer(s.ser, s)
        streamv1.RegisterStreamServiceServer(s.ser, &streamService{ser: s})
        measurev1.RegisterMeasureServiceServer(s.ser, &measureService{ser: s})
        tracev1.RegisterTraceServiceServer(s.ser, &traceService{ser: s})
@@ -308,6 +313,11 @@ func (s *server) Serve() run.StopNotify {
                close(stopCh)
                return stopCh
        }
+       if err := 
databasev1.RegisterClusterStateServiceHandlerFromEndpoint(ctx, gwMux, s.addr, 
clientOpts); err != nil {
+               s.log.Error().Err(err).Msg("Failed to register cluster state 
service")
+               close(stopCh)
+               return stopCh
+       }
        mux := chi.NewRouter()
        mux.Mount("/api", http.StripPrefix("/api", gwMux))
        s.httpSrv = &http.Server{
@@ -381,6 +391,12 @@ func (s *server) RegisterChunkedSyncHandler(topic 
bus.Topic, handler queue.Chunk
        s.chunkedSyncHandlers[topic] = handler
 }
 
+func (s *server) SetRouteProviders(providers map[string]route.TableProvider) {
+       s.routeTableProviderMu.Lock()
+       s.routeTableProvider = providers
+       s.routeTableProviderMu.Unlock()
+}
+
 type metrics struct {
        totalStarted  meter.Counter
        totalFinished meter.Counter
diff --git a/docs/api-reference.md b/docs/api-reference.md
index adbacc1d..64ed1353 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -165,6 +165,9 @@
     - [TagType](#banyandb-database-v1-TagType)
   
 - [banyandb/database/v1/rpc.proto](#banyandb_database_v1_rpc-proto)
+    - [GetClusterStateRequest](#banyandb-database-v1-GetClusterStateRequest)
+    - [GetClusterStateResponse](#banyandb-database-v1-GetClusterStateResponse)
+    - 
[GetClusterStateResponse.RouteTablesEntry](#banyandb-database-v1-GetClusterStateResponse-RouteTablesEntry)
     - [GetCurrentNodeRequest](#banyandb-database-v1-GetCurrentNodeRequest)
     - [GetCurrentNodeResponse](#banyandb-database-v1-GetCurrentNodeResponse)
     - 
[GroupRegistryServiceCreateRequest](#banyandb-database-v1-GroupRegistryServiceCreateRequest)
@@ -227,6 +230,7 @@
     - 
[PropertyRegistryServiceListResponse](#banyandb-database-v1-PropertyRegistryServiceListResponse)
     - 
[PropertyRegistryServiceUpdateRequest](#banyandb-database-v1-PropertyRegistryServiceUpdateRequest)
     - 
[PropertyRegistryServiceUpdateResponse](#banyandb-database-v1-PropertyRegistryServiceUpdateResponse)
+    - [RouteTable](#banyandb-database-v1-RouteTable)
     - [Snapshot](#banyandb-database-v1-Snapshot)
     - [SnapshotRequest](#banyandb-database-v1-SnapshotRequest)
     - [SnapshotRequest.Group](#banyandb-database-v1-SnapshotRequest-Group)
@@ -268,6 +272,7 @@
     - 
[TraceRegistryServiceUpdateRequest](#banyandb-database-v1-TraceRegistryServiceUpdateRequest)
     - 
[TraceRegistryServiceUpdateResponse](#banyandb-database-v1-TraceRegistryServiceUpdateResponse)
   
+    - [ClusterStateService](#banyandb-database-v1-ClusterStateService)
     - [GroupRegistryService](#banyandb-database-v1-GroupRegistryService)
     - 
[IndexRuleBindingRegistryService](#banyandb-database-v1-IndexRuleBindingRegistryService)
     - 
[IndexRuleRegistryService](#banyandb-database-v1-IndexRuleRegistryService)
@@ -2636,6 +2641,47 @@ Type determine the index structure under the hood
 
 
 
+<a name="banyandb-database-v1-GetClusterStateRequest"></a>
+
+### GetClusterStateRequest
+
+
+
+
+
+
+
+<a name="banyandb-database-v1-GetClusterStateResponse"></a>
+
+### GetClusterStateResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| route_tables | 
[GetClusterStateResponse.RouteTablesEntry](#banyandb-database-v1-GetClusterStateResponse-RouteTablesEntry)
 | repeated | Liaison node: map&#39;s key could be &#34;tire1&#34; and 
&#34;tire2&#34;. tire1 route traffic between liaison nodes, tire2 spread data 
among data nodes Data node: map&#39;s key could be &#34;property&#34; for 
gossip. Lifecycle agent: map&#39;s key could be the next stage&#39;s name. |
+
+
+
+
+
+
+<a name="banyandb-database-v1-GetClusterStateResponse-RouteTablesEntry"></a>
+
+### GetClusterStateResponse.RouteTablesEntry
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| key | [string](#string) |  |  |
+| value | [RouteTable](#banyandb-database-v1-RouteTable) |  |  |
+
+
+
+
+
+
 <a name="banyandb-database-v1-GetCurrentNodeRequest"></a>
 
 ### GetCurrentNodeRequest
@@ -3530,6 +3576,24 @@ Type determine the index structure under the hood
 
 
 
+<a name="banyandb-database-v1-RouteTable"></a>
+
+### RouteTable
+RouteTable represents a collection of nodes grouped by their health state.
+It provides a view of nodes that are registered, actively healthy, and those 
being evicted.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| registered | [Node](#banyandb-database-v1-Node) | repeated | registered 
contains all nodes that have been discovered and registered in this route. |
+| active | [string](#string) | repeated | active contains node names 
(Node.Metadata.Name) that are currently healthy and can handle requests. |
+| evictable | [string](#string) | repeated | evictable contains node names 
(Node.Metadata.Name) that are unhealthy and being retried before eviction. |
+
+
+
+
+
+
 <a name="banyandb-database-v1-Snapshot"></a>
 
 ### Snapshot
@@ -4132,6 +4196,16 @@ Type determine the index structure under the hood
  
 
 
+<a name="banyandb-database-v1-ClusterStateService"></a>
+
+### ClusterStateService
+
+
+| Method Name | Request Type | Response Type | Description |
+| ----------- | ------------ | ------------- | ------------|
+| GetClusterState | 
[GetClusterStateRequest](#banyandb-database-v1-GetClusterStateRequest) | 
[GetClusterStateResponse](#banyandb-database-v1-GetClusterStateResponse) |  |
+
+
 <a name="banyandb-database-v1-GroupRegistryService"></a>
 
 ### GroupRegistryService
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index ff90cb2e..46d8a579 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -24,6 +24,7 @@ import (
        "github.com/spf13/cobra"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -57,6 +58,10 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate property service")
        }
+       pipeline.SetRouteProviders(map[string]route.TableProvider{
+               "property": propertySvc,
+       })
+
        streamSvc, err := stream.NewService(metaSvc, pipeline, metricSvc, pm, 
propertyStreamPipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate stream service")
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index d613115e..a3c19c2c 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -28,6 +28,7 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/dquery"
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
        "github.com/apache/skywalking-banyandb/banyand/liaison/http"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -89,12 +90,17 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                l.Fatal().Err(err).Msg("failed to initiate distributed query 
service")
        }
 
+       routeProviders := map[string]route.TableProvider{
+               "tire1": tire1Client,
+               "tire2": tire2Client,
+       }
+
        grpcServer := grpc.NewServer(ctx, tire1Client, tire2Client, 
localPipeline, metaSvc, grpc.NodeRegistries{
                MeasureLiaisonNodeRegistry: measureLiaisonNodeRegistry,
                StreamLiaisonNodeRegistry:  
grpc.NewClusterNodeRegistry(data.TopicStreamWrite, tire1Client, 
streamLiaisonNodeSel),
                PropertyNodeRegistry:       
grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, tire2Client, 
propertyNodeSel),
                TraceLiaisonNodeRegistry:   
grpc.NewClusterNodeRegistry(data.TopicTraceWrite, tire1Client, 
traceLiaisonNodeSel),
-       }, metricSvc, pm)
+       }, metricSvc, pm, routeProviders)
        profSvc := observability.NewProfService()
        httpServer := http.NewServer(grpcServer.GetAuthReloader())
        var units []run.Unit
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 14d80fbb..5fb8c42e 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -82,7 +82,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
                StreamLiaisonNodeRegistry:  nr,
                PropertyNodeRegistry:       nr,
                TraceLiaisonNodeRegistry:   nr,
-       }, metricSvc, pm)
+       }, metricSvc, pm, nil)
        profSvc := observability.NewProfService()
        httpServer := http.NewServer(grpcServer.GetAuthReloader())
 
diff --git 
a/test/integration/distributed/cluster_state/cluster_state_suite_test.go 
b/test/integration/distributed/cluster_state/cluster_state_suite_test.go
new file mode 100644
index 00000000..0449e5a5
--- /dev/null
+++ b/test/integration/distributed/cluster_state/cluster_state_suite_test.go
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cluster_state_test
+
+import (
+       "context"
+       "fmt"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+func TestClusterState(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Distributed Get Cluster State Suite")
+}
+
+var (
+       dataConnection    *grpc.ClientConn
+       liaisonConnection *grpc.ClientConn
+       srcDir            string
+       deferFunc         func()
+       goods             []gleak.Goroutine
+       dataAddr          string
+       liaisonAddr       string
+       ep                string
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       goods = gleak.Goroutines()
+       By("Starting etcd server")
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       var spaceDef func()
+       srcDir, spaceDef, err = test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       ep = fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(srcDir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       Expect(err).ShouldNot(HaveOccurred())
+       <-server.ReadyNotify()
+       schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{ep}),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       defer schemaRegistry.Close()
+       By("Starting data node")
+       var closeDataNode0 func()
+       dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep, 
"--property-repair-enabled=true")
+       By("Starting liaison node")
+       var closerLiaisonNode func()
+       liaisonAddr, closerLiaisonNode = setup.LiaisonNode(ep)
+       time.Sleep(flags.ConsistentlyTimeout)
+       deferFunc = func() {
+               closerLiaisonNode()
+               closeDataNode0()
+               _ = server.Close()
+               <-server.StopNotify()
+               spaceDef()
+       }
+       liaisonConnection, err = grpchelper.Conn(liaisonAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(err).NotTo(HaveOccurred())
+       dataConnection, err = grpchelper.Conn(dataAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(err).NotTo(HaveOccurred())
+       return nil
+}, func(_ []byte) {
+})
+
+var _ = Describe("ClusterState API", func() {
+       It("Check cluster state", func() {
+               client := 
databasev1.NewClusterStateServiceClient(dataConnection)
+               state, err := client.GetClusterState(context.Background(), 
&databasev1.GetClusterStateRequest{})
+               Expect(err).NotTo(HaveOccurred())
+               Expect(state.GetRouteTables()).To(HaveKey("property"))
+               client = 
databasev1.NewClusterStateServiceClient(liaisonConnection)
+               state, err = client.GetClusterState(context.Background(), 
&databasev1.GetClusterStateRequest{})
+               Expect(err).NotTo(HaveOccurred())
+               Expect(state.GetRouteTables()).To(HaveKey("tire1"))
+               Expect(state.GetRouteTables()).To(HaveKey("tire2"))
+       })
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if dataConnection != nil {
+               Expect(dataConnection.Close()).To(Succeed())
+       }
+       if liaisonConnection != nil {
+               Expect(liaisonConnection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Lifecycle Suite", func(report Report) {
+       if report.SuiteSucceeded {
+               if deferFunc != nil {
+                       deferFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       }
+})

Reply via email to