This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-step-2-2-prep-queue-client in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7937245efd86e43c280c9edf1fabd0dc4185157b Author: Hongtao Gao <[email protected]> AuthorDate: Fri May 1 06:44:49 2026 +0000 feat(queue): add NewNodeSchemaStatusClient to queue.Client (Phase 2 prep) The Phase 2 Step 2.2 cluster-barrier fan-out probes peer liaisons and data nodes via clusterv1.NodeSchemaStatusService (the per-node server that landed in #1108). Rather than open a parallel connection mesh, the fan-out borrows the *grpc.ClientConn that pub already maintains for each tier — same conns, same auth, same health-checks, same lifecycle. This commit is the enabling shim: - New queue.Client interface method NewNodeSchemaStatusClient(node) that mirrors the existing NewChunkedSyncClient pattern: take a node name, look it up in the underlying ConnManager, return a typed RPC client wrapping the conn. Internal-only API; no proto changes; no client-facing surface. - pub implementation borrows c.conn from the ConnManager pool and wraps it with clusterv1.NewNodeSchemaStatusServiceClient. ~10 LOC, mirrors pub.NewChunkedSyncClientWithConfig at line 543. - local stub returns ErrNotImplemented (new sentinel exported from banyand/queue) because the standalone pipeline has no peer to dial. Step 2.2's fan-out uses errors.Is to fall back to in-process self- probing without a type-switch on the queue client. - pipeline_mock.go is gitignored (per .gitignore line 60) and regenerated on each build; no need to commit the regen output. CHANGES.md: opened a "Schema consistency (Phase 2 in progress)" sub- group under 0.11.0 — Phase 2 work continues to land in 0.11.0 until the section closes. Lists the merged Step 2.1 service and this prep shim. Subsequent Step 2.2/2.3/2.4 PRs accrete sub-bullets. via [HAPI](https://hapi.run) --- CHANGES.md | 3 +++ banyand/queue/local.go | 8 ++++++++ banyand/queue/pub/pub.go | 14 ++++++++++++++ banyand/queue/queue.go | 14 ++++++++++++++ 4 files changed, 39 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index ebbdb96b0..a0c3dc0a9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -31,6 +31,9 @@ Release Notes. - Add tombstone retention/GC (default 7 days, configurable via `--schema-server-tombstone-retention`) with a per-cache count cap to bound memory under bulk deletes. - Reject `Create` with `updated_at <= tombstone.delete_time` to prevent replayed creates from overwriting newer deletes. - Guard `pkg/schema/cache` against out-of-order `EventDelete` events; expose monotonic `LatestModRevision` watermark. +- Schema consistency (Phase 2 in progress): cluster-wide barrier groundwork. Internal-only; no client-facing surface impact yet. + - Add `NodeSchemaStatusService` (`GetMaxRevision`, `GetKeyRevisions`, `GetAbsentKeys`) registered on every cluster member that holds a schema cache, so peer liaisons and data nodes can be probed identically by the upcoming barrier fan-out (#1108). + - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the barrier fan-out can borrow the existing tier1/tier2 connection pools instead of opening a parallel mesh. ### Bug Fixes diff --git a/banyand/queue/local.go b/banyand/queue/local.go index e59269785..362e61bbb 100644 --- a/banyand/queue/local.go +++ b/banyand/queue/local.go @@ -24,6 +24,7 @@ import ( "time" "github.com/apache/skywalking-banyandb/api/common" + clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route" "github.com/apache/skywalking-banyandb/banyand/metadata" @@ -179,6 +180,13 @@ func (l *local) NewChunkedSyncClient(node string, _ uint32) (ChunkedSyncClient, return &localChunkedSyncClient{local: l.local, node: node}, nil } +// NewNodeSchemaStatusClient returns ErrNotImplemented because the standalone +// pipeline has no peer to dial. The cluster barrier fan-out (Step 2.2) treats +// this sentinel as a signal to degrade to in-process self-probing. +func (*local) NewNodeSchemaStatusClient(_ string) (clusterv1.NodeSchemaStatusServiceClient, error) { + return nil, ErrNotImplemented +} + type localChunkedSyncClient struct { local *bus.Bus node string diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index e7b975176..1217009b4 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -564,3 +564,17 @@ func (p *pub) NewChunkedSyncClientWithConfig(node string, config *ChunkedSyncCli func (p *pub) HealthyNodes() []string { return p.connMgr.ActiveNames() } + +// NewNodeSchemaStatusClient borrows the underlying *grpc.ClientConn from the +// connection pool and wraps it as a clusterv1.NodeSchemaStatusServiceClient +// so the cluster-barrier fan-out (Step 2.2) can probe peer caches without +// opening parallel connections. The returned client shares the conn's HTTP/2 +// streams with normal queue traffic; per-RPC contexts carry their own +// deadlines. +func (p *pub) NewNodeSchemaStatusClient(node string) (clusterv1.NodeSchemaStatusServiceClient, error) { + c, ok := p.connMgr.GetClient(node) + if !ok { + return nil, fmt.Errorf("no active client for node %s", node) + } + return clusterv1.NewNodeSchemaStatusServiceClient(c.conn), nil +} diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go index 76494a0ae..1b3e06936 100644 --- a/banyand/queue/queue.go +++ b/banyand/queue/queue.go @@ -19,6 +19,7 @@ package queue import ( "context" + "errors" "sync" "time" @@ -32,6 +33,13 @@ import ( "github.com/apache/skywalking-banyandb/pkg/run" ) +// ErrNotImplemented is returned by Client implementations that have no peers +// to dial — notably the standalone local pipeline. Callers (e.g. the cluster +// barrier fan-out) treat this sentinel as "no peer to probe; skip" rather +// than as a hard failure, so the standalone path degrades to in-process +// self-probing without a special-cased branch in the caller. +var ErrNotImplemented = errors.New("not implemented") + // Queue builds a data transmission tunnel between subscribers and publishers. // //go:generate mockgen -destination=./queue_mock.go -package=queue github.com/apache/skywalking-banyandb/pkg/bus MessageListener @@ -51,6 +59,12 @@ type Client interface { route.TableProvider NewBatchPublisher(timeout time.Duration) BatchPublisher NewChunkedSyncClient(node string, chunkSize uint32) (ChunkedSyncClient, error) + // NewNodeSchemaStatusClient returns a client for the cluster.v1 + // NodeSchemaStatusService against the named node, sharing the underlying + // *grpc.ClientConn from this queue client's existing connection pool. + // The standalone local pipeline returns ErrNotImplemented so callers can + // treat it as "no peer to probe" without a type-switch. + NewNodeSchemaStatusClient(node string) (clusterv1.NodeSchemaStatusServiceClient, error) Register(bus.Topic, schema.EventHandler) OnAddOrUpdate(md schema.Metadata) GracefulStop()
