This is an automated email from the ASF dual-hosted git repository.

wu-sheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 65df753ed feat(queue): add NewNodeSchemaStatusClient to queue.Client 
(Phase 2 prep) (#1109)
65df753ed is described below

commit 65df753ed08979ec46737afa3ea72f82e318e297
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun May 3 12:34:34 2026 +0800

    feat(queue): add NewNodeSchemaStatusClient to queue.Client (Phase 2 prep) 
(#1109)
---
 CHANGES.md               |  3 +++
 banyand/queue/local.go   |  8 ++++++++
 banyand/queue/pub/pub.go | 14 ++++++++++++++
 banyand/queue/queue.go   | 14 ++++++++++++++
 4 files changed, 39 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index ebbdb96b0..a0c3dc0a9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -31,6 +31,9 @@ Release Notes.
   - Add tombstone retention/GC (default 7 days, configurable via 
`--schema-server-tombstone-retention`) with a per-cache count cap to bound 
memory under bulk deletes.
   - Reject `Create` with `updated_at <= tombstone.delete_time` to prevent 
replayed creates from overwriting newer deletes.
   - Guard `pkg/schema/cache` against out-of-order `EventDelete` events; expose 
monotonic `LatestModRevision` watermark.
+- Schema consistency (Phase 2 in progress): cluster-wide barrier groundwork. 
Internal-only; no client-facing surface impact yet.
+  - Add `NodeSchemaStatusService` (`GetMaxRevision`, `GetKeyRevisions`, 
`GetAbsentKeys`) registered on every cluster member that holds a schema cache, 
so peer liaisons and data nodes can be probed identically by the upcoming 
barrier fan-out (#1108).
+  - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the 
barrier fan-out can borrow the existing tier1/tier2 connection pools instead of 
opening a parallel mesh.
 
 ### Bug Fixes
 
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index e59269785..362e61bbb 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -24,6 +24,7 @@ import (
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -179,6 +180,13 @@ func (l *local) NewChunkedSyncClient(node string, _ 
uint32) (ChunkedSyncClient,
        return &localChunkedSyncClient{local: l.local, node: node}, nil
 }
 
+// NewNodeSchemaStatusClient returns ErrNotImplemented because the standalone
+// pipeline has no peer to dial. The cluster barrier fan-out (Step 2.2) treats
+// this sentinel as a signal to degrade to in-process self-probing.
+func (*local) NewNodeSchemaStatusClient(_ string) 
(clusterv1.NodeSchemaStatusServiceClient, error) {
+       return nil, ErrNotImplemented
+}
+
 type localChunkedSyncClient struct {
        local *bus.Bus
        node  string
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index e7b975176..1217009b4 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -564,3 +564,17 @@ func (p *pub) NewChunkedSyncClientWithConfig(node string, 
config *ChunkedSyncCli
 func (p *pub) HealthyNodes() []string {
        return p.connMgr.ActiveNames()
 }
+
+// NewNodeSchemaStatusClient borrows the underlying *grpc.ClientConn from the
+// connection pool and wraps it as a clusterv1.NodeSchemaStatusServiceClient
+// so the cluster-barrier fan-out (Step 2.2) can probe peer caches without
+// opening parallel connections. The returned client shares the conn's HTTP/2
+// streams with normal queue traffic; per-RPC contexts carry their own
+// deadlines.
+func (p *pub) NewNodeSchemaStatusClient(node string) 
(clusterv1.NodeSchemaStatusServiceClient, error) {
+       c, ok := p.connMgr.GetClient(node)
+       if !ok {
+               return nil, fmt.Errorf("no active client for node %s", node)
+       }
+       return clusterv1.NewNodeSchemaStatusServiceClient(c.conn), nil
+}
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 76494a0ae..1b3e06936 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -19,6 +19,7 @@ package queue
 
 import (
        "context"
+       "errors"
        "sync"
        "time"
 
@@ -32,6 +33,13 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+// ErrNotImplemented is returned by Client implementations that have no peers
+// to dial — notably the standalone local pipeline. Callers (e.g. the cluster
+// barrier fan-out) treat this sentinel as "no peer to probe; skip" rather
+// than as a hard failure, so the standalone path degrades to in-process
+// self-probing without a special-cased branch in the caller.
+var ErrNotImplemented = errors.New("not implemented")
+
 // Queue builds a data transmission tunnel between subscribers and publishers.
 //
 //go:generate mockgen -destination=./queue_mock.go -package=queue 
github.com/apache/skywalking-banyandb/pkg/bus MessageListener
@@ -51,6 +59,12 @@ type Client interface {
        route.TableProvider
        NewBatchPublisher(timeout time.Duration) BatchPublisher
        NewChunkedSyncClient(node string, chunkSize uint32) (ChunkedSyncClient, 
error)
+       // NewNodeSchemaStatusClient returns a client for the cluster.v1
+       // NodeSchemaStatusService against the named node, sharing the 
underlying
+       // *grpc.ClientConn from this queue client's existing connection pool.
+       // The standalone local pipeline returns ErrNotImplemented so callers 
can
+       // treat it as "no peer to probe" without a type-switch.
+       NewNodeSchemaStatusClient(node string) 
(clusterv1.NodeSchemaStatusServiceClient, error)
        Register(bus.Topic, schema.EventHandler)
        OnAddOrUpdate(md schema.Metadata)
        GracefulStop()

Reply via email to