This is an automated email from the ASF dual-hosted git repository.
wu-sheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 65df753ed feat(queue): add NewNodeSchemaStatusClient to queue.Client
(Phase 2 prep) (#1109)
65df753ed is described below
commit 65df753ed08979ec46737afa3ea72f82e318e297
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun May 3 12:34:34 2026 +0800
feat(queue): add NewNodeSchemaStatusClient to queue.Client (Phase 2 prep)
(#1109)
---
CHANGES.md | 3 +++
banyand/queue/local.go | 8 ++++++++
banyand/queue/pub/pub.go | 14 ++++++++++++++
banyand/queue/queue.go | 14 ++++++++++++++
4 files changed, 39 insertions(+)
diff --git a/CHANGES.md b/CHANGES.md
index ebbdb96b0..a0c3dc0a9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -31,6 +31,9 @@ Release Notes.
- Add tombstone retention/GC (default 7 days, configurable via
`--schema-server-tombstone-retention`) with a per-cache count cap to bound
memory under bulk deletes.
- Reject `Create` with `updated_at <= tombstone.delete_time` to prevent
replayed creates from overwriting newer deletes.
- Guard `pkg/schema/cache` against out-of-order `EventDelete` events; expose
monotonic `LatestModRevision` watermark.
+- Schema consistency (Phase 2 in progress): cluster-wide barrier groundwork.
Internal-only; no client-facing surface impact yet.
+ - Add `NodeSchemaStatusService` (`GetMaxRevision`, `GetKeyRevisions`,
`GetAbsentKeys`) registered on every cluster member that holds a schema cache,
so peer liaisons and data nodes can be probed identically by the upcoming
barrier fan-out (#1108).
+ - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the
barrier fan-out can borrow the existing tier1/tier2 connection pools instead of
opening a parallel mesh.
### Bug Fixes
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index e59269785..362e61bbb 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -24,6 +24,7 @@ import (
"time"
"github.com/apache/skywalking-banyandb/api/common"
+ clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
"github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -179,6 +180,13 @@ func (l *local) NewChunkedSyncClient(node string, _
uint32) (ChunkedSyncClient,
return &localChunkedSyncClient{local: l.local, node: node}, nil
}
+// NewNodeSchemaStatusClient returns ErrNotImplemented because the standalone
+// pipeline has no peer to dial. The cluster barrier fan-out (Step 2.2) treats
+// this sentinel as a signal to degrade to in-process self-probing.
+func (*local) NewNodeSchemaStatusClient(_ string)
(clusterv1.NodeSchemaStatusServiceClient, error) {
+ return nil, ErrNotImplemented
+}
+
type localChunkedSyncClient struct {
local *bus.Bus
node string
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index e7b975176..1217009b4 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -564,3 +564,17 @@ func (p *pub) NewChunkedSyncClientWithConfig(node string,
config *ChunkedSyncCli
func (p *pub) HealthyNodes() []string {
return p.connMgr.ActiveNames()
}
+
+// NewNodeSchemaStatusClient borrows the underlying *grpc.ClientConn from the
+// connection pool and wraps it as a clusterv1.NodeSchemaStatusServiceClient
+// so the cluster-barrier fan-out (Step 2.2) can probe peer caches without
+// opening parallel connections. The returned client shares the conn's HTTP/2
+// streams with normal queue traffic; per-RPC contexts carry their own
+// deadlines.
+func (p *pub) NewNodeSchemaStatusClient(node string)
(clusterv1.NodeSchemaStatusServiceClient, error) {
+ c, ok := p.connMgr.GetClient(node)
+ if !ok {
+ return nil, fmt.Errorf("no active client for node %s", node)
+ }
+ return clusterv1.NewNodeSchemaStatusServiceClient(c.conn), nil
+}
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 76494a0ae..1b3e06936 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -19,6 +19,7 @@ package queue
import (
"context"
+ "errors"
"sync"
"time"
@@ -32,6 +33,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/run"
)
+// ErrNotImplemented is returned by Client implementations that have no peers
+// to dial — notably the standalone local pipeline. Callers (e.g. the cluster
+// barrier fan-out) treat this sentinel as "no peer to probe; skip" rather
+// than as a hard failure, so the standalone path degrades to in-process
+// self-probing without a special-cased branch in the caller.
+var ErrNotImplemented = errors.New("not implemented")
+
// Queue builds a data transmission tunnel between subscribers and publishers.
//
//go:generate mockgen -destination=./queue_mock.go -package=queue
github.com/apache/skywalking-banyandb/pkg/bus MessageListener
@@ -51,6 +59,12 @@ type Client interface {
route.TableProvider
NewBatchPublisher(timeout time.Duration) BatchPublisher
NewChunkedSyncClient(node string, chunkSize uint32) (ChunkedSyncClient,
error)
+ // NewNodeSchemaStatusClient returns a client for the cluster.v1
+ // NodeSchemaStatusService against the named node, sharing the
underlying
+ // *grpc.ClientConn from this queue client's existing connection pool.
+ // The standalone local pipeline returns ErrNotImplemented so callers
can
+ // treat it as "no peer to probe" without a type-switch.
+ NewNodeSchemaStatusClient(node string)
(clusterv1.NodeSchemaStatusServiceClient, error)
Register(bus.Topic, schema.EventHandler)
OnAddOrUpdate(md schema.Metadata)
GracefulStop()