This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ff1c63e33 Support gossip repair protocol in schema property server
(#982)
ff1c63e33 is described below
commit ff1c63e3311f90a8c67c9421362a1fed8b464d02
Author: mrproliu <[email protected]>
AuthorDate: Mon Mar 2 15:17:42 2026 +0800
Support gossip repair protocol in schema property server (#982)
* Support gossip repair protocol in schema property server
---
api/common/id.go | 10 +-
api/proto/banyandb/database/v1/database.proto | 1 +
banyand/metadata/client.go | 1 +
.../metadata/schema/schemaserver/repair_service.go | 196 +++++++++
.../schema/schemaserver/repair_service_test.go | 452 +++++++++++++++++++++
banyand/metadata/schema/schemaserver/service.go | 7 +
banyand/metadata/service/server.go | 44 +-
banyand/property/db/repair.go | 5 +
banyand/property/db/repair_gossip_test.go | 4 +-
banyand/property/gossip/client.go | 2 +-
banyand/property/gossip/server.go | 17 +-
banyand/property/gossip/service.go | 56 +--
banyand/property/gossip/service_test.go | 4 +-
banyand/property/gossip/trace.go | 6 +-
banyand/property/service.go | 4 +-
banyand/queue/sub/server.go | 1 +
bydbctl/internal/cmd/property_test.go | 12 +-
docs/api-reference.md | 1 +
pkg/cmdsetup/data.go | 3 +-
pkg/cmdsetup/liaison.go | 2 +-
pkg/cmdsetup/standalone.go | 2 +-
21 files changed, 778 insertions(+), 52 deletions(-)
diff --git a/api/common/id.go b/api/common/id.go
index b5437f758..0df3ef2b1 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -154,8 +154,9 @@ type Node struct {
GrpcAddress string
HTTPAddress string
- PropertyGossipGrpcAddress string
- PropertySchemaGrpcAddress string
+ PropertyGossipGrpcAddress string
+ PropertySchemaGrpcAddress string
+ PropertySchemaGossipGrpcAddress string
}
var (
@@ -197,7 +198,7 @@ func ParseNodeHostProvider(s string) (NodeHostProvider,
error) {
}
// GenerateNode generates a node id.
-func GenerateNode(grpcPort, httpPort, propertyGrpcPort, propertySchemaGrpcPort
*uint32) (node Node, err error) {
+func GenerateNode(grpcPort, httpPort, propertyGrpcPort,
propertySchemaGrpcPort, propertySchemaGossipPort *uint32) (node Node, err
error) {
port := grpcPort
if port == nil {
port = httpPort
@@ -237,6 +238,9 @@ func GenerateNode(grpcPort, httpPort, propertyGrpcPort,
propertySchemaGrpcPort *
if propertySchemaGrpcPort != nil {
node.PropertySchemaGrpcAddress = net.JoinHostPort(nodeHost,
strconv.FormatUint(uint64(*propertySchemaGrpcPort), 10))
}
+ if propertySchemaGossipPort != nil {
+ node.PropertySchemaGossipGrpcAddress =
net.JoinHostPort(nodeHost,
strconv.FormatUint(uint64(*propertySchemaGossipPort), 10))
+ }
node.Labels = ParseNodeFlags()
return node, nil
}
diff --git a/api/proto/banyandb/database/v1/database.proto
b/api/proto/banyandb/database/v1/database.proto
index 7db7783c0..a564eb157 100644
--- a/api/proto/banyandb/database/v1/database.proto
+++ b/api/proto/banyandb/database/v1/database.proto
@@ -42,6 +42,7 @@ message Node {
map<string, string> labels = 6;
string property_repair_gossip_grpc_address = 7;
string property_schema_grpc_address = 8;
+ string property_schema_gossip_grpc_address = 9;
}
message Shard {
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 096c9e4bf..ba1c5ea8f 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -436,6 +436,7 @@ func (s *clientService) registerNodeIfNeeded(ctx
context.Context, l *logger.Logg
PropertyRepairGossipGrpcAddress: node.PropertyGossipGrpcAddress,
PropertySchemaGrpcAddress: node.PropertySchemaGrpcAddress,
+ PropertySchemaGossipGrpcAddress:
node.PropertySchemaGossipGrpcAddress,
}
for {
ctxCancelable, cancel := context.WithTimeout(ctx,
time.Second*10)
diff --git a/banyand/metadata/schema/schemaserver/repair_service.go
b/banyand/metadata/schema/schemaserver/repair_service.go
new file mode 100644
index 000000000..56532fffe
--- /dev/null
+++ b/banyand/metadata/schema/schemaserver/repair_service.go
@@ -0,0 +1,196 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/robfig/cron/v3"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// RepairService is the independent schema repair service.
+type RepairService interface {
+ run.PreRunner
+ run.Config
+ run.Service
+ GetGossipPort() *uint32
+}
+
+type repairService struct {
+ schema.UnimplementedOnInitHandler
+ registerGossip func(gossip.Messenger)
+ metadata metadata.Repo
+ messenger gossip.Messenger
+ scheduler *timestamp.Scheduler
+ closer *run.Closer
+ l *logger.Logger
+ metaNodes map[string]struct{}
+ repairCron string
+ mu sync.RWMutex
+}
+
+// NewGossipService creates a new schema repair service.
+func NewGossipService(registerGossip func(gossip.Messenger), metadata
metadata.Repo,
+ pipelineClient queue.Client, omr observability.MetricsRegistry,
+) RepairService {
+ return &repairService{
+ registerGossip: registerGossip,
+ metadata: metadata,
+ closer: run.NewCloser(1),
+ metaNodes: make(map[string]struct{}),
+ messenger: gossip.NewMessenger(
+ "schema-property",
+ func(n *databasev1.Node) string { return
n.PropertySchemaGossipGrpcAddress },
+ omr, metadata, pipelineClient, 17933,
+ ),
+ }
+}
+
+func (r *repairService) Name() string {
+ return "schema-repair"
+}
+
+func (r *repairService) Role() databasev1.Role {
+ return databasev1.Role_ROLE_META
+}
+
+func (r *repairService) FlagSet() *run.FlagSet {
+ flagS := run.NewFlagSet("schema-property-repair")
+ flagS.StringVar(&r.repairCron, "schema-property-repair-trigger-cron",
"@every 10m", "the cron expression for schema repair gossip")
+ flagS.AddFlagSet(r.messenger.FlagSet().FlagSet)
+ return flagS
+}
+
+func (r *repairService) Validate() error {
+ _, cronErr := cron.ParseStandard(r.repairCron)
+ if cronErr != nil {
+ return fmt.Errorf("schema-property-repair-trigger-cron is not a
valid cron expression: %w", cronErr)
+ }
+ return r.messenger.Validate()
+}
+
+func (r *repairService) PreRun(ctx context.Context) error {
+ r.l = logger.GetLogger("schema-repair")
+ if preRunErr := r.messenger.PreRun(ctx); preRunErr != nil {
+ return preRunErr
+ }
+ r.registerGossip(r.messenger)
+ r.metadata.RegisterHandler("schema-repair-nodes", schema.KindNode, r)
+ r.scheduler = timestamp.NewScheduler(r.l, timestamp.NewClock())
+ registerErr := r.scheduler.Register("schema-property-repair-trigger",
+
cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
+ r.repairCron, func(_ time.Time, l *logger.Logger) bool {
+ l.Debug().Msg("starting schema repair gossip")
+ if repairErr := r.doRepairGossip(); repairErr != nil {
+ l.Err(repairErr).Msg("schema repair gossip
failed")
+ }
+ return true
+ })
+ if registerErr != nil {
+ return fmt.Errorf("failed to register schema repair cron task:
%w", registerErr)
+ }
+ return nil
+}
+
+func (r *repairService) Serve() run.StopNotify {
+ r.messenger.Serve(r.closer)
+ return r.closer.CloseNotify()
+}
+
+func (r *repairService) GracefulStop() {
+ if r.scheduler != nil {
+ r.scheduler.Close()
+ }
+ r.messenger.GracefulStop()
+ r.closer.CloseThenWait()
+}
+
+// GetGossipPort returns the gossip gRPC port.
+func (r *repairService) GetGossipPort() *uint32 {
+ return r.messenger.GetServerPort()
+}
+
+// OnAddOrUpdate tracks META nodes that have a schema repair gossip address.
+func (r *repairService) OnAddOrUpdate(md schema.Metadata) {
+ if md.Kind != schema.KindNode {
+ return
+ }
+ node, ok := md.Spec.(*databasev1.Node)
+ if !ok {
+ return
+ }
+ containsMetaRole := false
+ for _, role := range node.Roles {
+ if role == databasev1.Role_ROLE_META {
+ containsMetaRole = true
+ break
+ }
+ }
+ if !containsMetaRole {
+ return
+ }
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ if node.PropertySchemaGossipGrpcAddress == "" {
+ delete(r.metaNodes, node.Metadata.GetName())
+ return
+ }
+ r.metaNodes[node.Metadata.GetName()] = struct{}{}
+}
+
+// OnDelete removes nodes from tracking.
+func (r *repairService) OnDelete(md schema.Metadata) {
+ if md.Kind != schema.KindNode {
+ return
+ }
+ node, ok := md.Spec.(*databasev1.Node)
+ if !ok {
+ return
+ }
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ delete(r.metaNodes, node.Metadata.GetName())
+}
+
+func (r *repairService) doRepairGossip() error {
+ r.mu.RLock()
+ nodes := make([]string, 0, len(r.metaNodes))
+ for name := range r.metaNodes {
+ nodes = append(nodes, name)
+ }
+ r.mu.RUnlock()
+ if len(nodes) < 2 {
+ r.l.Debug().Msg("schema repair gossip is skipped because there
are " +
+ "less than 2 meta nodes with schema repair gossip
address")
+ return nil
+ }
+ return r.messenger.Propagation(nodes, schema.SchemaGroup, 0)
+}
diff --git a/banyand/metadata/schema/schemaserver/repair_service_test.go
b/banyand/metadata/schema/schemaserver/repair_service_test.go
new file mode 100644
index 000000000..68d671aff
--- /dev/null
+++ b/banyand/metadata/schema/schemaserver/repair_service_test.go
@@ -0,0 +1,452 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/property/db"
+ "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// mockPropagationMessenger records Propagation calls for unit testing.
+type mockPropagationMessenger struct {
+ gossip.Messenger
+ propagationCall *propagationCall
+ mu sync.Mutex
+}
+
+type propagationCall struct {
+ group string
+ nodes []string
+ shardID uint32
+}
+
+func (m *mockPropagationMessenger) Propagation(nodes []string, group string,
shardID uint32) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.propagationCall = &propagationCall{nodes: nodes, group: group,
shardID: shardID}
+ return nil
+}
+
+func (m *mockPropagationMessenger) getPropagationCall() *propagationCall {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.propagationCall
+}
+
+func TestRepairServiceNodeTracking(t *testing.T) {
+ t.Run("add node with address", func(t *testing.T) {
+ rs := &repairService{metaNodes: make(map[string]struct{}), l:
logger.GetLogger("test-repair")}
+ rs.OnAddOrUpdate(schema.Metadata{
+ TypeMeta: schema.TypeMeta{Name: "node-1", Kind:
schema.KindNode},
+ Spec: &databasev1.Node{
+ Metadata:
&commonv1.Metadata{Name: "node-1"},
+ Roles:
[]databasev1.Role{databasev1.Role_ROLE_META},
+ PropertySchemaGossipGrpcAddress:
"127.0.0.1:9999",
+ },
+ })
+ rs.mu.RLock()
+ defer rs.mu.RUnlock()
+ _, exists := rs.metaNodes["node-1"]
+ assert.True(t, exists, "node should be tracked")
+ })
+
+ t.Run("add node without address", func(t *testing.T) {
+ rs := &repairService{metaNodes: make(map[string]struct{}), l:
logger.GetLogger("test-repair")}
+ rs.OnAddOrUpdate(schema.Metadata{
+ TypeMeta: schema.TypeMeta{Name: "node-1", Kind:
schema.KindNode},
+ Spec: &databasev1.Node{
+ Metadata:
&commonv1.Metadata{Name: "node-1"},
+ PropertySchemaGossipGrpcAddress: "",
+ },
+ })
+ rs.mu.RLock()
+ defer rs.mu.RUnlock()
+ assert.Empty(t, rs.metaNodes, "node without address should not
be tracked")
+ })
+
+ t.Run("add non-node kind", func(t *testing.T) {
+ rs := &repairService{metaNodes: make(map[string]struct{}), l:
logger.GetLogger("test-repair")}
+ rs.OnAddOrUpdate(schema.Metadata{
+ TypeMeta: schema.TypeMeta{Name: "group-1", Kind:
schema.KindGroup},
+ Spec: &commonv1.Group{Metadata:
&commonv1.Metadata{Name: "group-1"}},
+ })
+ rs.mu.RLock()
+ defer rs.mu.RUnlock()
+ assert.Empty(t, rs.metaNodes, "non-node kind should not be
tracked")
+ })
+
+ t.Run("delete node", func(t *testing.T) {
+ rs := &repairService{metaNodes: make(map[string]struct{}), l:
logger.GetLogger("test-repair")}
+ rs.metaNodes["node-1"] = struct{}{}
+ rs.OnDelete(schema.Metadata{
+ TypeMeta: schema.TypeMeta{Name: "node-1", Kind:
schema.KindNode},
+ Spec: &databasev1.Node{
+ Metadata: &commonv1.Metadata{Name: "node-1"},
+ },
+ })
+ rs.mu.RLock()
+ defer rs.mu.RUnlock()
+ _, exists := rs.metaNodes["node-1"]
+ assert.False(t, exists, "deleted node should be removed")
+ })
+
+ t.Run("delete non-existing node", func(t *testing.T) {
+ rs := &repairService{metaNodes: make(map[string]struct{}), l:
logger.GetLogger("test-repair")}
+ rs.OnDelete(schema.Metadata{
+ TypeMeta: schema.TypeMeta{Name: "unknown", Kind:
schema.KindNode},
+ Spec: &databasev1.Node{
+ Metadata: &commonv1.Metadata{Name: "unknown"},
+ },
+ })
+ rs.mu.RLock()
+ defer rs.mu.RUnlock()
+ assert.Empty(t, rs.metaNodes, "deleting non-existing node
should not panic")
+ })
+}
+
+func TestRepairServiceDoRepairGossip(t *testing.T) {
+ t.Run("zero nodes", func(t *testing.T) {
+ mock := &mockPropagationMessenger{}
+ rs := &repairService{metaNodes: make(map[string]struct{}),
messenger: mock, l: logger.GetLogger("test-repair")}
+ repairErr := rs.doRepairGossip()
+ require.NoError(t, repairErr)
+ assert.Nil(t, mock.getPropagationCall(), "should not propagate
with zero nodes")
+ })
+
+ t.Run("one node", func(t *testing.T) {
+ mock := &mockPropagationMessenger{}
+ rs := &repairService{metaNodes: map[string]struct{}{"node-1":
{}}, messenger: mock, l: logger.GetLogger("test-repair")}
+ repairErr := rs.doRepairGossip()
+ require.NoError(t, repairErr)
+ assert.Nil(t, mock.getPropagationCall(), "should not propagate
with one node")
+ })
+
+ t.Run("two nodes", func(t *testing.T) {
+ mock := &mockPropagationMessenger{}
+ rs := &repairService{
+ metaNodes: map[string]struct{}{"node-1": {}, "node-2":
{}},
+ messenger: mock,
+ l: logger.GetLogger("test-repair"),
+ }
+ repairErr := rs.doRepairGossip()
+ require.NoError(t, repairErr)
+ call := mock.getPropagationCall()
+ require.NotNil(t, call, "should propagate with two nodes")
+ assert.Len(t, call.nodes, 2)
+ assert.Equal(t, schema.SchemaGroup, call.group)
+ assert.Equal(t, uint32(0), call.shardID)
+ })
+
+ t.Run("three nodes", func(t *testing.T) {
+ mock := &mockPropagationMessenger{}
+ rs := &repairService{
+ metaNodes: map[string]struct{}{"node-1": {}, "node-2":
{}, "node-3": {}},
+ messenger: mock,
+ l: logger.GetLogger("test-repair"),
+ }
+ repairErr := rs.doRepairGossip()
+ require.NoError(t, repairErr)
+ call := mock.getPropagationCall()
+ require.NotNil(t, call, "should propagate with three nodes")
+ assert.Len(t, call.nodes, 3)
+ assert.Equal(t, schema.SchemaGroup, call.group)
+ assert.Equal(t, uint32(0), call.shardID)
+ })
+}
+
+// testNode holds resources for one gossip-enabled node in integration tests.
+type testNode struct {
+ srv *server
+ messenger gossip.Messenger
+ nodeID string
+}
+
+// setupTestNode creates a schema server (which owns the DB) and a gossip
messenger, wired together.
+func setupTestNode(t *testing.T) *testNode {
+ t.Helper()
+
+ // Use NewServer from service.go — it handles DB creation, snapshot
config, repair config, etc.
+ srv := NewServer(observability.BypassRegistry).(*server)
+ flagSet := srv.FlagSet()
+ require.NoError(t, flagSet.Parse([]string{
+ "--schema-server-root-path", t.TempDir(),
+ "--schema-server-grpc-host", "127.0.0.1",
+ "--schema-server-grpc-port", fmt.Sprintf("%d", getFreePort(t)),
+ "--schema-server-repair-build-tree-cron", "@every 10m",
+ "--schema-server-repair-quick-build-tree-time", "6s",
+ }))
+ require.NoError(t, srv.Validate())
+ require.NoError(t, srv.PreRun(context.Background()))
+ srv.Serve()
+ t.Cleanup(func() { srv.GracefulStop() })
+
+ // Create gossip messenger for the repair protocol.
+ gossipPort := getFreePort(t)
+ addr := fmt.Sprintf("127.0.0.1:%d", gossipPort)
+ messenger := gossip.NewMessengerWithoutMetadata("schema-repair",
+ func(n *databasev1.Node) string { return
n.PropertySchemaGossipGrpcAddress },
+ observability.NewBypassRegistry(), int(gossipPort))
+ require.NoError(t, messenger.Validate())
+ ctx := context.WithValue(context.Background(), common.ContextNodeKey,
common.Node{
+ NodeID: addr,
+ PropertySchemaGossipGrpcAddress: addr,
+ })
+ require.NoError(t, messenger.PreRun(ctx))
+ // RegisterGossip must happen after PreRun because PreRun resets
listeners.
+ srv.RegisterGossip(messenger)
+ messenger.Serve(run.NewCloser(0))
+ t.Cleanup(messenger.GracefulStop)
+
+ require.Eventually(t, func() bool {
+ conn, dialErr := net.DialTimeout("tcp", addr, time.Second)
+ if dialErr != nil {
+ return false
+ }
+ _ = conn.Close()
+ return true
+ }, 10*time.Second, 100*time.Millisecond, "gossip server did not start")
+
+ return &testNode{srv: srv, messenger: messenger, nodeID: addr}
+}
+
+func registerNodes(nodes []*testNode) {
+ for _, m := range nodes {
+ for _, n := range nodes {
+
m.messenger.(schema.EventHandler).OnAddOrUpdate(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Name: n.nodeID,
+ Kind: schema.KindNode,
+ },
+ Spec: &databasev1.Node{
+ Metadata:
&commonv1.Metadata{Name: n.nodeID},
+ Roles:
[]databasev1.Role{databasev1.Role_ROLE_DATA},
+ PropertySchemaGossipGrpcAddress:
n.nodeID,
+ },
+ })
+ }
+ }
+}
+
+func writeProperty(t *testing.T, d db.Database, name, id string, version
int64) {
+ t.Helper()
+ prop := &propertyv1.Property{
+ Metadata: &commonv1.Metadata{
+ Group: schema.SchemaGroup,
+ Name: name,
+ ModRevision: version,
+ },
+ Id: id,
+ Tags: []*modelv1.Tag{
+ {Key: "version", Value: &modelv1.TagValue{Value:
&modelv1.TagValue_Str{
+ Str: &modelv1.Str{Value: fmt.Sprintf("%d",
version)},
+ }}},
+ },
+ }
+ require.NoError(t, d.Update(context.Background(), common.ShardID(0),
db.GetPropertyID(prop), prop))
+}
+
+func ensureShard(t *testing.T, nodes []*testNode) {
+ t.Helper()
+ for _, n := range nodes {
+ writeProperty(t, n.srv.db, "seed", "seed-init", 0)
+ }
+}
+
+func propagateAndVerify(t *testing.T, propagator gossip.Messenger, nodeIDs
[]string, verifyFn func() bool) {
+ t.Helper()
+ require.Eventually(t, func() bool {
+ _ = propagator.Propagation(nodeIDs, schema.SchemaGroup, 0)
+ return verifyFn()
+ }, 30*time.Second, time.Second)
+}
+
+func queryHasAllIDs(d db.Database, expectedIDs []string) bool {
+ results, queryErr := d.Query(context.Background(),
&propertyv1.QueryRequest{
+ Groups: []string{schema.SchemaGroup},
+ Name: "test",
+ })
+ if queryErr != nil {
+ return false
+ }
+ foundIDs := make(map[string]bool)
+ for _, r := range results {
+ if r.DeleteTime() == 0 {
+ foundIDs[string(r.ID())] = true
+ }
+ }
+ for _, expectedID := range expectedIDs {
+ if !foundIDs[expectedID] {
+ return false
+ }
+ }
+ return true
+}
+
+func queryLatestVersion(d db.Database, group, name, id string) int64 {
+ results, queryErr := d.Query(context.Background(),
&propertyv1.QueryRequest{
+ Groups: []string{group},
+ Name: name,
+ Ids: []string{id},
+ })
+ if queryErr != nil {
+ return -1
+ }
+ var latest int64
+ for _, r := range results {
+ if r.DeleteTime() == 0 && r.Timestamp() > latest {
+ latest = r.Timestamp()
+ }
+ }
+ return latest
+}
+
+func verifyPropertyData(t *testing.T, d db.Database, id string,
expectedVersion int64) {
+ t.Helper()
+ results, queryErr := d.Query(context.Background(),
&propertyv1.QueryRequest{
+ Groups: []string{schema.SchemaGroup},
+ Name: "test",
+ Ids: []string{id},
+ })
+ require.NoError(t, queryErr)
+ var found bool
+ for _, r := range results {
+ if r.DeleteTime() != 0 {
+ continue
+ }
+ var prop propertyv1.Property
+ require.NoError(t, protojson.Unmarshal(r.Source(), &prop))
+ if prop.Id != id {
+ continue
+ }
+ found = true
+ assert.Equal(t, schema.SchemaGroup, prop.Metadata.Group, "group
mismatch for %s", id)
+ assert.Equal(t, "test", prop.Metadata.Name, "name mismatch for
%s", id)
+ assert.Equal(t, expectedVersion, prop.Metadata.ModRevision,
"version mismatch for %s", id)
+ require.NotEmpty(t, prop.Tags, "tags should not be empty for
%s", id)
+ assert.Equal(t, "version", prop.Tags[0].Key, "tag key mismatch
for %s", id)
+ assert.Equal(t, fmt.Sprintf("%d", expectedVersion),
prop.Tags[0].Value.GetStr().Value,
+ "tag value mismatch for %s", id)
+ }
+ assert.True(t, found, "property %s/test/%s not found",
schema.SchemaGroup, id)
+}
+
+func TestRepairServiceDataSyncTwoNodes(t *testing.T) {
+ node0 := setupTestNode(t)
+ node1 := setupTestNode(t)
+ nodes := []*testNode{node0, node1}
+ registerNodes(nodes)
+ ensureShard(t, nodes)
+
+ writeProperty(t, node0.srv.db, "test", "entity-0", 1)
+
+ nodeIDs := []string{node0.nodeID, node1.nodeID}
+ expectedIDs := []string{schema.SchemaGroup + "/test/entity-0/1"}
+ propagateAndVerify(t, node0.messenger, nodeIDs, func() bool {
+ return queryHasAllIDs(node1.srv.db, expectedIDs)
+ })
+ verifyPropertyData(t, node1.srv.db, "entity-0", 1)
+}
+
+func TestRepairServiceDataSyncThreeNodes(t *testing.T) {
+ node0 := setupTestNode(t)
+ node1 := setupTestNode(t)
+ node2 := setupTestNode(t)
+ nodes := []*testNode{node0, node1, node2}
+ registerNodes(nodes)
+ ensureShard(t, nodes)
+
+ writeProperty(t, node0.srv.db, "test", "entity-0", 1)
+ writeProperty(t, node1.srv.db, "test", "entity-1", 2)
+
+ allNodeIDs := []string{node0.nodeID, node1.nodeID, node2.nodeID}
+ expectedIDs := []string{
+ schema.SchemaGroup + "/test/entity-0/1",
+ schema.SchemaGroup + "/test/entity-1/2",
+ }
+ propagateAndVerify(t, node0.messenger, allNodeIDs, func() bool {
+ return queryHasAllIDs(node0.srv.db, expectedIDs) &&
+ queryHasAllIDs(node1.srv.db, expectedIDs) &&
+ queryHasAllIDs(node2.srv.db, expectedIDs)
+ })
+ for _, n := range nodes {
+ verifyPropertyData(t, n.srv.db, "entity-0", 1)
+ verifyPropertyData(t, n.srv.db, "entity-1", 2)
+ }
+}
+
+func TestRepairServiceDataSyncVersionConflict(t *testing.T) {
+ node0 := setupTestNode(t)
+ node1 := setupTestNode(t)
+ nodes := []*testNode{node0, node1}
+ registerNodes(nodes)
+ ensureShard(t, nodes)
+
+ writeProperty(t, node0.srv.db, "test", "entity-0", 1)
+ writeProperty(t, node1.srv.db, "test", "entity-0", 2)
+
+ nodeIDs := []string{node0.nodeID, node1.nodeID}
+ propagateAndVerify(t, node0.messenger, nodeIDs, func() bool {
+ return queryLatestVersion(node0.srv.db, schema.SchemaGroup,
"test", "entity-0") == 2
+ })
+ verifyPropertyData(t, node0.srv.db, "entity-0", 2)
+}
+
+func TestRepairServiceDataSyncMissingToFull(t *testing.T) {
+ node0 := setupTestNode(t)
+ node1 := setupTestNode(t)
+ nodes := []*testNode{node0, node1}
+ registerNodes(nodes)
+ ensureShard(t, nodes)
+
+ writeProperty(t, node0.srv.db, "test", "entity-0", 1)
+ writeProperty(t, node0.srv.db, "test", "entity-1", 2)
+ writeProperty(t, node0.srv.db, "test", "entity-2", 3)
+
+ nodeIDs := []string{node0.nodeID, node1.nodeID}
+ expectedIDs := []string{
+ schema.SchemaGroup + "/test/entity-0/1",
+ schema.SchemaGroup + "/test/entity-1/2",
+ schema.SchemaGroup + "/test/entity-2/3",
+ }
+ propagateAndVerify(t, node0.messenger, nodeIDs, func() bool {
+ return queryHasAllIDs(node1.srv.db, expectedIDs)
+ })
+ verifyPropertyData(t, node1.srv.db, "entity-0", 1)
+ verifyPropertyData(t, node1.srv.db, "entity-1", 2)
+ verifyPropertyData(t, node1.srv.db, "entity-2", 3)
+}
diff --git a/banyand/metadata/schema/schemaserver/service.go
b/banyand/metadata/schema/schemaserver/service.go
index 32477beee..4f613faf2 100644
--- a/banyand/metadata/schema/schemaserver/service.go
+++ b/banyand/metadata/schema/schemaserver/service.go
@@ -44,6 +44,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/property/db"
+ "github.com/apache/skywalking-banyandb/banyand/property/gossip"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -68,6 +69,7 @@ type Server interface {
run.Config
run.Service
GetPort() *uint32
+ RegisterGossip(messenger gossip.Messenger)
}
type server struct {
@@ -112,6 +114,11 @@ func (s *server) GetPort() *uint32 {
return &s.port
}
+// RegisterGossip registers the DB's repair gRPC services with the gossip
messenger.
+func (s *server) RegisterGossip(messenger gossip.Messenger) {
+ s.db.RegisterGossip(messenger)
+}
+
func (s *server) Role() databasev1.Role {
return databasev1.Role_ROLE_META
}
diff --git a/banyand/metadata/service/server.go
b/banyand/metadata/service/server.go
index cdb9d322d..cc6577643 100644
--- a/banyand/metadata/service/server.go
+++ b/banyand/metadata/service/server.go
@@ -36,6 +36,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema/schemaserver"
"github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -50,12 +51,16 @@ var (
type Service interface {
metadata.Service
GetSchemaServerPort() *uint32
+ GetSchemaGossipPort() *uint32
+ SetPropertyPipelineClient(queue.Client)
}
type server struct {
metadata.Service
etcdServer embeddedetcd.Server
propServer schemaserver.Server
+ repairSvc schemaserver.RepairService
+ pipelineClient queue.Client
omr observability.MetricsRegistry
serviceFlags *run.FlagSet
scheduler *timestamp.Scheduler
@@ -78,7 +83,8 @@ func (s *server) Name() string {
}
func (s *server) Role() databasev1.Role {
- if s.schemaRegistryMode == schemaTypeProperty {
+ needEtcd := s.schemaRegistryMode == schemaTypeEtcd ||
s.nodeDiscoveryMode == metadata.NodeDiscoveryModeEtcd
+ if s.schemaRegistryMode == schemaTypeProperty || (s.embedded &&
needEtcd) {
return databasev1.Role_ROLE_META
}
return databasev1.Role_ROLE_UNSPECIFIED
@@ -155,7 +161,12 @@ func (s *server) Validate() error {
return validateErr
}
if s.schemaRegistryMode == schemaTypeProperty && s.propServer != nil {
- return s.propServer.Validate()
+ if propValidateErr := s.propServer.Validate(); propValidateErr
!= nil {
+ return propValidateErr
+ }
+ if s.repairSvc != nil {
+ return s.repairSvc.Validate()
+ }
}
return nil
}
@@ -175,11 +186,17 @@ func (s *server) PreRun(ctx context.Context) error {
switch s.schemaRegistryMode {
case schemaTypeEtcd:
s.propServer = nil
+ s.repairSvc = nil
case schemaTypeProperty:
ctx = s.enrichContextWithSchemaAddress(ctx)
if propPreRunErr := s.propServer.PreRun(ctx); propPreRunErr !=
nil {
return propPreRunErr
}
+ if s.repairSvc != nil {
+ if repairPreRunErr := s.repairSvc.PreRun(ctx);
repairPreRunErr != nil {
+ return repairPreRunErr
+ }
+ }
default:
return errors.New("unknown schema storage type")
}
@@ -194,6 +211,13 @@ func (s *server) Serve() run.StopNotify {
<-s.propServer.Serve()
}()
}
+ if s.repairSvc != nil {
+ s.closer.AddRunning()
+ go func() {
+ defer s.closer.Done()
+ <-s.repairSvc.Serve()
+ }()
+ }
if s.etcdServer != nil {
s.registerDefrag()
s.closer.AddRunning()
@@ -207,6 +231,9 @@ func (s *server) Serve() run.StopNotify {
}
func (s *server) GracefulStop() {
+ if s.repairSvc != nil {
+ s.repairSvc.GracefulStop()
+ }
if s.propServer != nil {
s.propServer.GracefulStop()
}
@@ -258,6 +285,19 @@ func (s *server) GetSchemaServerPort() *uint32 {
return nil
}
+// SetPropertyPipelineClient injects the pipeline client used by the schema
gossip repair service.
+func (s *server) SetPropertyPipelineClient(client queue.Client) {
+ s.pipelineClient = client
+}
+
+// GetSchemaGossipPort returns the schema gossip gRPC port.
+func (s *server) GetSchemaGossipPort() *uint32 {
+ if s.repairSvc != nil {
+ return s.repairSvc.GetGossipPort()
+ }
+ return nil
+}
+
func (s *server) enrichContextWithSchemaAddress(ctx context.Context)
context.Context {
port := s.propServer.GetPort()
if port == nil {
diff --git a/banyand/property/db/repair.go b/banyand/property/db/repair.go
index 9f80c5fa6..a5b967908 100644
--- a/banyand/property/db/repair.go
+++ b/banyand/property/db/repair.go
@@ -1018,6 +1018,11 @@ func (r *repairScheduler) verifyShouldExecuteBuildTree(t
time.Time, triggerByCro
if !triggerByCron {
// if not triggered by cron, we need to check if the time is
after the (last scheduled time + half of the interval)
if
r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval
/ 2)) {
+ r.l.Debug().Msgf("the build tree is triggered by quick
repair, "+
+ "but the last build tree schedule time is %s,
which is before the current time %s minus half of the interval %s(%s), "+
+ "skipping this quick build tree",
+ r.latestBuildTreeSchedule.Format(time.RFC3339),
r.buildTreeClock.Now().Format(time.RFC3339),
+ (r.buildTreeScheduleInterval / 2).String(),
r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval/2).String())
return false
}
} else {
diff --git a/banyand/property/db/repair_gossip_test.go
b/banyand/property/db/repair_gossip_test.go
index e745aabc0..20f428f9f 100644
--- a/banyand/property/db/repair_gossip_test.go
+++ b/banyand/property/db/repair_gossip_test.go
@@ -383,7 +383,9 @@ func startEachNode(ctrl *gomock.Controller, node node,
groups []group) *nodeCont
ports, err := test.AllocateFreePorts(1)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
- messenger :=
gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), ports[0])
+ messenger := gossip.NewMessengerWithoutMetadata("property-repair",
+ func(n *databasev1.Node) string { return
n.PropertyRepairGossipGrpcAddress },
+ observability.NewBypassRegistry(), ports[0])
addr := fmt.Sprintf("127.0.0.1:%d", ports[0])
result.nodeID = addr
err = messenger.Validate()
diff --git a/banyand/property/gossip/client.go
b/banyand/property/gossip/client.go
index 5342e304b..6858bfce0 100644
--- a/banyand/property/gossip/client.go
+++ b/banyand/property/gossip/client.go
@@ -133,7 +133,7 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) {
s.log.Warn().Msg("invalid metadata type")
return
}
- address := node.PropertyRepairGossipGrpcAddress
+ address := s.addressExtractor(node)
if address == "" {
s.log.Warn().Stringer("node", node).Msg("node does not have
gossip address, skipping registration")
return
diff --git a/banyand/property/gossip/server.go
b/banyand/property/gossip/server.go
index 97b9d7f7f..cff25034f 100644
--- a/banyand/property/gossip/server.go
+++ b/banyand/property/gossip/server.go
@@ -156,19 +156,17 @@ func (q *protocolHandler) Propagation(_ context.Context,
request *propertyv1.Pro
func (q *protocolHandler) propagation0(_ context.Context, request
*propertyv1.PropagationRequest, tracer Trace) (resp
*propertyv1.PropagationResponse, err error) {
span := tracer.CreateSpan(tracer.ActivateSpan(), "receive gossip
message")
- defer span.End()
span.Tag(TraceTagGroupName, request.Group)
span.Tag(TraceTagShardID, fmt.Sprintf("%d", request.ShardId))
span.Tag(TraceTagOperateType, TraceTagOperateReceive)
q.s.serverMetrics.totalReceived.Inc(1, request.Group)
q.s.log.Debug().Stringer("request", request).Msg("received property
repair gossip message for propagation")
+ span.End()
- if q.addToProcess(request, tracer) {
- span.Tag("added_to_process", "true")
+ if q.addToProcess(request, tracer, span) {
q.s.serverMetrics.totalAddProcessed.Inc(1, request.Group)
q.s.log.Debug().Msgf("add the propagation request to the
process")
} else {
- span.Tag("added_to_process", "false")
q.s.serverMetrics.totalSkipProcess.Inc(1, request.Group)
q.s.log.Debug().Msgf("propagation request discarded")
}
@@ -304,7 +302,7 @@ func (q *protocolHandler) contextIsDone(ctx
context.Context) bool {
}
}
-func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest,
tracer Trace) bool {
+func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest,
tracer Trace, span Span) bool {
q.mu.Lock()
defer q.mu.Unlock()
@@ -313,7 +311,7 @@ func (q *protocolHandler) addToProcess(request
*propertyv1.PropagationRequest, t
handlingRequestData := &handlingRequest{
PropagationRequest: request,
tracer: tracer,
- parentSpan: tracer.ActivateSpan(),
+ parentSpan: span,
}
if !exist {
groupShard = &groupWithShardPropagation{
@@ -391,10 +389,11 @@ func (s *service) newConnectionFromNode(n
*databasev1.Node) (*grpc.ClientConn, e
if err != nil {
return nil, fmt.Errorf("failed to get client transport
credentials: %w", err)
}
- conn, err := grpc.NewClient(n.PropertyRepairGossipGrpcAddress,
append(credOpts, grpc.WithDefaultServiceConfig(retryPolicy))...)
- s.log.Debug().Str("address",
n.PropertyRepairGossipGrpcAddress).Msg("starting to create gRPC client
connection to node")
+ address := s.addressExtractor(n)
+ conn, err := grpc.NewClient(address, append(credOpts,
grpc.WithDefaultServiceConfig(retryPolicy))...)
+ s.log.Debug().Str("address", address).Msg("starting to create gRPC
client connection to node")
if err != nil {
- return nil, fmt.Errorf("failed to create gRPC client connection
to node %s: %w", n.PropertyRepairGossipGrpcAddress, err)
+ return nil, fmt.Errorf("failed to create gRPC client connection
to node %s: %w", address, err)
}
return conn, nil
}
diff --git a/banyand/property/gossip/service.go
b/banyand/property/gossip/service.go
index 1a11276bb..962a455e9 100644
--- a/banyand/property/gossip/service.go
+++ b/banyand/property/gossip/service.go
@@ -47,12 +47,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/run"
)
+// AddressExtractor extracts the gossip gRPC address from a node.
+type AddressExtractor func(*databasev1.Node) string
+
var (
errServerCert = errors.New("invalid server cert file")
errServerKey = errors.New("invalid server key file")
errNoAddr = errors.New("no address")
-
- serverScope =
observability.RootScope.SubScope("property_repair_gossip_server")
)
const (
@@ -63,6 +64,7 @@ const (
type service struct {
schema.UnimplementedOnInitHandler
+ addressExtractor AddressExtractor
pipeline queue.Client
metadata metadata.Repo
creds credentials.TransportCredentials
@@ -76,6 +78,7 @@ type service struct {
protocolHandler *protocolHandler
registered map[string]*databasev1.Node
traceSpanNotified *int32
+ prefix string
caCertPath string
host string
addr string
@@ -99,8 +102,13 @@ type service struct {
}
// NewMessenger creates a new instance of Messenger for gossip propagation
communication between nodes.
-func NewMessenger(omr observability.MetricsRegistry, metadata metadata.Repo,
pipeline queue.Client) Messenger {
+func NewMessenger(prefix string, addressExtractor AddressExtractor,
+ omr observability.MetricsRegistry, metadata metadata.Repo, pipeline
queue.Client, defaultPort uint32,
+) Messenger {
+ serverScope := observability.RootScope.SubScope(prefix +
"_gossip_server")
return &service{
+ prefix: prefix,
+ addressExtractor: addressExtractor,
metadata: metadata,
omr: omr,
closer: run.NewCloser(1),
@@ -112,15 +120,13 @@ func NewMessenger(omr observability.MetricsRegistry,
metadata metadata.Repo, pip
registered: make(map[string]*databasev1.Node),
scheduleInterval: time.Hour * 2,
sel: node.NewRoundRobinSelector("", metadata),
- port: 17932,
+ port: defaultPort,
}
}
// NewMessengerWithoutMetadata creates a new instance of Messenger without
metadata.
-func NewMessengerWithoutMetadata(omr observability.MetricsRegistry, port int)
Messenger {
- messenger := NewMessenger(omr, nil, nil)
- messenger.(*service).port = uint32(port)
- return messenger
+func NewMessengerWithoutMetadata(prefix string, addressExtractor
AddressExtractor, omr observability.MetricsRegistry, port int) Messenger {
+ return NewMessenger(prefix, addressExtractor, omr, nil, nil,
uint32(port))
}
func (s *service) PreRun(ctx context.Context) error {
@@ -130,13 +136,14 @@ func (s *service) PreRun(ctx context.Context) error {
}
node := val.(common.Node)
s.nodeID = node.NodeID
- s.log = logger.GetLogger("gossip-messenger")
+ s.log = logger.GetLogger(s.prefix + "-gossip-messenger")
s.listeners = make([]MessageListener, 0)
- s.serverMetrics = newServerMetrics(s.omr.With(serverScope))
+ metricsScope := observability.RootScope.SubScope(s.prefix +
"_gossip_server")
+ s.serverMetrics = newServerMetrics(s.omr.With(metricsScope))
if s.metadata != nil {
s.sel.OnInit([]schema.Kind{schema.KindGroup})
- s.metadata.RegisterHandler("property-repair-nodes",
schema.KindNode, s)
- s.metadata.RegisterHandler("property-repair-groups",
schema.KindGroup, s)
+ s.metadata.RegisterHandler(s.prefix+"-nodes", schema.KindNode,
s)
+ s.metadata.RegisterHandler(s.prefix+"-groups",
schema.KindGroup, s)
if err := s.initTracing(ctx); err != nil {
s.log.Warn().Err(err).Msg("failed to init internal
trace stream")
}
@@ -151,7 +158,7 @@ func (s *service) GetServerPort() *uint32 {
}
func (s *service) Name() string {
- return "gossip-messenger"
+ return s.prefix + "-gossip-messenger"
}
func (s *service) Role() databasev1.Role {
@@ -159,18 +166,18 @@ func (s *service) Role() databasev1.Role {
}
func (s *service) FlagSet() *run.FlagSet {
- fs := run.NewFlagSet("gossip-messenger")
+ fs := run.NewFlagSet(s.prefix + "-gossip-messenger")
- fs.VarP(&s.maxRecvMsgSize,
"property-repair-gossip-grpc-max-recv-msg-size", "", "the size of max receiving
message")
- fs.StringVar(&s.host, "property-repair-gossip-grpc-host", "", "the host
of banyand listens")
- fs.Uint32Var(&s.port, "property-repair-gossip-grpc-port", s.port, "the
port of banyand listens")
+ fs.VarP(&s.maxRecvMsgSize, s.prefix+"-gossip-grpc-max-recv-msg-size",
"", "the size of max receiving message")
+ fs.StringVar(&s.host, s.prefix+"-gossip-grpc-host", "", "the host of
banyand listens")
+ fs.Uint32Var(&s.port, s.prefix+"-gossip-grpc-port", s.port, "the port
of banyand listens")
- fs.BoolVar(&s.tls, "property-repair-gossip-grpc-tls", false,
"connection uses TLS if true, else plain TCP")
- fs.StringVar(&s.certFile,
"property-repair-gossip-grpc-server-cert-file", "", "the TLS cert file")
- fs.StringVar(&s.keyFile, "property-repair-gossip-grpc-server-key-file",
"", "the TLS key file")
- fs.StringVar(&s.caCertPath, "property-repair-gossip-client-ca-cert",
"", "Path to the CA certificate for gossip client TLS communication.")
- fs.DurationVar(&s.totalTimeout, "property-repair-gossip-total-timeout",
defaultTotalTimeout, "the total timeout for gossip propagation")
- fs.BoolVar(&s.traceLogEnabled, "property-repair-gossip-trace-log",
true, "enable trace log")
+ fs.BoolVar(&s.tls, s.prefix+"-gossip-grpc-tls", false, "connection uses
TLS if true, else plain TCP")
+ fs.StringVar(&s.certFile, s.prefix+"-gossip-grpc-server-cert-file", "",
"the TLS cert file")
+ fs.StringVar(&s.keyFile, s.prefix+"-gossip-grpc-server-key-file", "",
"the TLS key file")
+ fs.StringVar(&s.caCertPath, s.prefix+"-gossip-client-ca-cert", "",
"Path to the CA certificate for gossip client TLS communication.")
+ fs.DurationVar(&s.totalTimeout, s.prefix+"-gossip-total-timeout",
defaultTotalTimeout, "the total timeout for gossip propagation")
+ fs.BoolVar(&s.traceLogEnabled, s.prefix+"-gossip-trace-log", true,
"enable trace log")
return fs
}
@@ -181,9 +188,6 @@ func (s *service) Validate() error {
}
// server side validation
- if s.port == 0 {
- s.port = 17932
- }
s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port),
10))
if s.addr == ":" {
return errNoAddr
diff --git a/banyand/property/gossip/service_test.go
b/banyand/property/gossip/service_test.go
index eebe86773..dc747e9a8 100644
--- a/banyand/property/gossip/service_test.go
+++ b/banyand/property/gossip/service_test.go
@@ -183,7 +183,9 @@ func startNodes(count int) []*nodeContext {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// starting gossip messenger
- messenger :=
NewMessengerWithoutMetadata(observability.NewBypassRegistry(), ports[0])
+ messenger := NewMessengerWithoutMetadata("property-repair",
+ func(n *databasev1.Node) string { return
n.PropertyRepairGossipGrpcAddress },
+ observability.NewBypassRegistry(), ports[0])
gomega.Expect(messenger).NotTo(gomega.BeNil())
addr := fmt.Sprintf("127.0.0.1:%d", ports[0])
messenger.(run.PreRunner).PreRun(context.WithValue(context.Background(),
common.ContextNodeKey, common.Node{
diff --git a/banyand/property/gossip/trace.go b/banyand/property/gossip/trace.go
index 6f5ed6da6..627eeae30 100644
--- a/banyand/property/gossip/trace.go
+++ b/banyand/property/gossip/trace.go
@@ -145,7 +145,9 @@ func (s *service) initTracing(ctx context.Context) error {
ModRevision: dbStream.GetMetadata().GetModRevision(),
}
s.traceStreamSelector =
node.NewRoundRobinSelector(data.TopicStreamWrite.String(), s.metadata)
- s.pipeline.Register(data.TopicStreamWrite, s)
+ if s.pipeline != nil {
+ s.pipeline.Register(data.TopicStreamWrite, s)
+ }
return nil
}
@@ -174,7 +176,7 @@ func (s *service) initInternalTraceGroup(ctx
context.Context) error {
func (s *service) savingTracingSpans() (err error) {
spans := s.readAllReadySendTraceSpan()
s.log.Debug().Int("spans", len(spans)).Msg("ready to save trace spans
to storage")
- if len(spans) == 0 {
+ if len(spans) == 0 || s.pipeline == nil {
return nil
}
ctx := context.Background()
diff --git a/banyand/property/service.go b/banyand/property/service.go
index 60d52d0b5..5cb66b011 100644
--- a/banyand/property/service.go
+++ b/banyand/property/service.go
@@ -249,6 +249,8 @@ func NewService(metadata metadata.Repo, pipeline
queue.Server, pipelineClient qu
pm: pm,
closer: run.NewCloser(0),
- gossipMessenger: gossip.NewMessenger(omr, metadata,
pipelineClient),
+ gossipMessenger: gossip.NewMessenger("property-repair",
+ func(n *databasev1.Node) string { return
n.PropertyRepairGossipGrpcAddress },
+ omr, metadata, pipelineClient, 17932),
}, nil
}
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 1dc98a055..38e764d68 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -168,6 +168,7 @@ func (s *server) PreRun(ctx context.Context) error {
PropertyRepairGossipGrpcAddress: node.PropertyGossipGrpcAddress,
PropertySchemaGrpcAddress: node.PropertySchemaGrpcAddress,
+ PropertySchemaGossipGrpcAddress:
node.PropertySchemaGossipGrpcAddress,
}
return nil
diff --git a/bydbctl/internal/cmd/property_test.go
b/bydbctl/internal/cmd/property_test.go
index 177e7fd95..e3ecb718a 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -645,7 +645,9 @@ var _ = Describe("Property Cluster background Repair
Operation", func() {
node1ID = fmt.Sprintf("127.0.0.1:%s", node1Port)
node2ID = fmt.Sprintf("127.0.0.1:%s", node2Port)
- messenger =
gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), 9999)
+ messenger =
gossip.NewMessengerWithoutMetadata("property-repair",
+ func(n *databasev1.Node) string { return
n.PropertyRepairGossipGrpcAddress },
+ observability.NewBypassRegistry(), 9999)
messenger.Validate()
err = messenger.PreRun(context.WithValue(context.Background(),
common.ContextNodeKey, common.Node{
NodeID: "not-exist",
@@ -758,6 +760,7 @@ var _ = Describe("Property Cluster Resilience with 5 Data
Nodes", func() {
for i := 0; i < nodeCount; i++ {
By(fmt.Sprintf("Starting data node %d", i))
nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] =
setup.DataNodeFromDataDir(ep, nodeDirs[i],
+ "--logging-level=debug",
"--property-repair-enabled=true",
"--property-repair-quick-build-tree-time=1s",
"--property-repair-build-tree-cron=@every 2s")
// Update node ID to use 127.0.0.1
@@ -774,7 +777,9 @@ var _ = Describe("Property Cluster Resilience with 5 Data
Nodes", func() {
defUITemplateWithSchema(rootCmd, addr, 1, nodeCount)
// Setup gossip messenger
- messenger =
gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), 9999)
+ messenger =
gossip.NewMessengerWithoutMetadata("property-repair",
+ func(n *databasev1.Node) string { return
n.PropertyRepairGossipGrpcAddress },
+ observability.NewBypassRegistry(), 9999)
messenger.Validate()
err = messenger.PreRun(context.WithValue(context.Background(),
common.ContextNodeKey, common.Node{
NodeID: "test-client",
@@ -847,6 +852,7 @@ var _ = Describe("Property Cluster Resilience with 5 Data
Nodes", func() {
for i := 0; i < closedNodeCount; i++ {
GinkgoWriter.Printf("Restarting node %d\n", i)
nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] =
setup.DataNodeFromDataDir(ep, nodeDirs[i],
+ "--logging-level=debug",
"--property-repair-enabled=true",
"--property-repair-quick-build-tree-time=1s",
"--property-repair-build-tree-cron=@every 2s")
// Update node ID to use 127.0.0.1
@@ -1122,5 +1128,5 @@ func waitForRepairTreeRegeneration(nodeDirs []string,
group string, beforeTime t
}
}
return allRegenerated
- }, flags.EventuallyTimeout).Should(BeTrue(), "All nodes should
regenerate repair tree after data write")
+ }, time.Minute).Should(BeTrue(), "All nodes should regenerate repair
tree after data write")
}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index de05b1c49..650a060bc 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2318,6 +2318,7 @@ Service is the service for the API
| labels | [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry) |
repeated | labels is a set of key-value pairs to describe the node. |
| property_repair_gossip_grpc_address | [string](#string) | | |
| property_schema_grpc_address | [string](#string) | | |
+| property_schema_gossip_grpc_address | [string](#string) | | |
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 6655f6b37..2403c7c08 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -54,6 +54,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
pm := protector.NewMemory(metricSvc)
pipeline := sub.NewServer(metricSvc)
propertyStreamPipeline := queue.Local()
+ metaSvc.SetPropertyPipelineClient(propertyStreamPipeline)
propertySvc, err := property.NewService(metaSvc, pipeline,
propertyStreamPipeline, metricSvc, pm)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate property service")
@@ -103,7 +104,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
Version: version.Build(),
Short: "Run as the data server",
RunE: func(_ *cobra.Command, _ []string) (err error) {
- node, err := common.GenerateNode(pipeline.GetPort(),
nil, propertySvc.GetGossIPGrpcPort(), metaSvc.GetSchemaServerPort())
+ node, err := common.GenerateNode(pipeline.GetPort(),
nil, propertySvc.GetGossIPGrpcPort(), metaSvc.GetSchemaServerPort(),
metaSvc.GetSchemaGossipPort())
if err != nil {
return err
}
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index bf6ce05a8..69756ef83 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -150,7 +150,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
sel.SetNodeSelector(ls)
}
}
- node, err :=
common.GenerateNode(internalPipeline.GetPort(), httpServer.GetPort(), nil, nil)
+ node, err :=
common.GenerateNode(internalPipeline.GetPort(), httpServer.GetPort(), nil, nil,
nil)
if err != nil {
return err
}
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 91a9c36d3..54ec7498d 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -111,7 +111,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
Version: version.Build(),
Short: "Run as the standalone server",
RunE: func(_ *cobra.Command, _ []string) (err error) {
- nodeID, err :=
common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort(), nil, nil)
+ nodeID, err :=
common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort(), nil, nil, nil)
if err != nil {
return err
}